diff --git a/packages/apify/src/platform_event_manager.ts b/packages/apify/src/platform_event_manager.ts index 872237bb9d..0f9a7f3167 100644 --- a/packages/apify/src/platform_event_manager.ts +++ b/packages/apify/src/platform_event_manager.ts @@ -1,4 +1,4 @@ -import { EventManager, EventType } from '@crawlee/core'; +import { EventManager, EventType, serviceLocator } from '@crawlee/core'; import { WebSocket } from 'ws'; import { ACTOR_ENV_VARS, ACTOR_EVENT_NAMES } from '@apify/consts'; @@ -48,8 +48,23 @@ export class PlatformEventManager extends EventManager { /** Websocket connection to Actor events. */ private eventsWs?: WebSocket; - constructor(override readonly config = Configuration.getGlobalConfig()) { - super(); + constructor( + readonly config: Configuration = Configuration.getGlobalConfig() as Configuration, + ) { + super({ + persistStateIntervalMillis: config.persistStateIntervalMillis, + }); + } + + /** + * Creates a `PlatformEventManager` from a (resolved) Configuration, mirroring + * `LocalEventManager.fromConfig()` from crawlee. Falls back to the global + * configuration if none is provided. + */ + static fromConfig(config?: Configuration): PlatformEventManager { + return new PlatformEventManager( + config ?? (serviceLocator.getConfiguration() as Configuration), + ); } /** diff --git a/test/apify/actor.test.ts b/test/apify/actor.test.ts index 4138db1012..3de4ce9ba8 100644 --- a/test/apify/actor.test.ts +++ b/test/apify/actor.test.ts @@ -1,6 +1,6 @@ import { createPublicKey } from 'node:crypto'; -import { EventType, StorageManager } from '@crawlee/core'; +import { EventType, serviceLocator, StorageManager } from '@crawlee/core'; import { sleep } from '@crawlee/utils'; import type { ApifyEnv } from 'apify'; import { @@ -1095,7 +1095,9 @@ describe('Actor', () => { const migratingSpy = vitest.fn(persistResource(50)); const persistStateSpy = vitest.fn(persistResource(50)); - const events = Configuration.getEventManager(); + // crawlee v4 removed `Configuration.getEventManager()`; the + // event manager now lives on the global service locator. + const events = serviceLocator.getEventManager(); events.on(EventType.PERSIST_STATE, persistStateSpy); events.on(EventType.MIGRATING, migratingSpy); diff --git a/test/apify/events.test.ts b/test/apify/events.test.ts index d6ec40d3ec..2629da62c3 100644 --- a/test/apify/events.test.ts +++ b/test/apify/events.test.ts @@ -1,4 +1,4 @@ -import { EventType } from '@crawlee/core'; +import { EventType, serviceLocator } from '@crawlee/core'; import type { Dictionary } from '@crawlee/utils'; import { sleep } from '@crawlee/utils'; import { Actor, Configuration, PlatformEventManager } from 'apify'; @@ -6,25 +6,35 @@ import { WebSocketServer } from 'ws'; import { ACTOR_ENV_VARS, APIFY_ENV_VARS } from '@apify/consts'; +import { resetGlobalState } from '../resetGlobalState.js'; + describe('events', () => { let wss: WebSocketServer = null!; - const config = Configuration.getGlobalConfig(); let events: PlatformEventManager = null!; beforeEach(() => { + // Set env vars BEFORE creating the Configuration — crawlee v4 resolves + // env-var-backed fields eagerly at construction, so a global config + // built earlier in the run wouldn't see `actorEventsWsUrl` and + // `events.init()` would silently never open the websocket. + process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL] = + 'ws://localhost:9099/someRunId'; + process.env[APIFY_ENV_VARS.TOKEN] = 'dummy'; + wss = new WebSocketServer({ port: 9099 }); + // Drop the cached Configuration so it picks up the env vars we just set. + resetGlobalState(); + const config = Configuration.getGlobalConfig(); events = new PlatformEventManager(config); - config.useEventManager(events); + serviceLocator.setEventManager(events); vitest.useFakeTimers(); - process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL] = - 'ws://localhost:9099/someRunId'; - process.env[APIFY_ENV_VARS.TOKEN] = 'dummy'; }); afterEach(async () => { vitest.useRealTimers(); delete process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL]; delete process.env[APIFY_ENV_VARS.TOKEN]; + resetGlobalState(); await new Promise((resolve) => { wss.close(resolve); }); @@ -130,7 +140,7 @@ describe('events', () => { test('should send persist state events in regular interval', async () => { const eventsReceived = []; - const interval = config.persistStateIntervalMillis; + const interval = events.config.persistStateIntervalMillis; events.on(EventType.PERSIST_STATE, (data) => eventsReceived.push(data)); await events.init();