Skip to content
21 changes: 18 additions & 3 deletions packages/apify/src/platform_event_manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventManager, EventType } from '@crawlee/core';
import { EventManager, EventType, serviceLocator } from '@crawlee/core';
import { WebSocket } from 'ws';

import { ACTOR_ENV_VARS, ACTOR_EVENT_NAMES } from '@apify/consts';
Expand Down Expand Up @@ -48,8 +48,23 @@ export class PlatformEventManager extends EventManager {
/** Websocket connection to Actor events. */
private eventsWs?: WebSocket;

constructor(override readonly config = Configuration.getGlobalConfig()) {
super();
constructor(
readonly config: Configuration = Configuration.getGlobalConfig() as Configuration,
) {
super({
persistStateIntervalMillis: config.persistStateIntervalMillis,
});
}

/**
* Creates a `PlatformEventManager` from a (resolved) Configuration, mirroring
* `LocalEventManager.fromConfig()` from crawlee. Falls back to the global
* configuration if none is provided.
*/
static fromConfig(config?: Configuration): PlatformEventManager {
return new PlatformEventManager(
config ?? (serviceLocator.getConfiguration() as Configuration),
);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions test/apify/actor.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createPublicKey } from 'node:crypto';

import { EventType, StorageManager } from '@crawlee/core';
import { EventType, serviceLocator, StorageManager } from '@crawlee/core';
import { sleep } from '@crawlee/utils';
import type { ApifyEnv } from 'apify';
import {
Expand Down Expand Up @@ -1096,7 +1096,9 @@ describe('Actor', () => {

const migratingSpy = vitest.fn(persistResource(50));
const persistStateSpy = vitest.fn(persistResource(50));
const events = Configuration.getEventManager();
// crawlee v4 removed `Configuration.getEventManager()`; the
// event manager now lives on the global service locator.
const events = serviceLocator.getEventManager();

events.on(EventType.PERSIST_STATE, persistStateSpy);
events.on(EventType.MIGRATING, migratingSpy);
Expand Down
24 changes: 17 additions & 7 deletions test/apify/events.test.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
import { EventType } from '@crawlee/core';
import { EventType, serviceLocator } from '@crawlee/core';
import type { Dictionary } from '@crawlee/utils';
import { sleep } from '@crawlee/utils';
import { Actor, Configuration, PlatformEventManager } from 'apify';
import { WebSocketServer } from 'ws';

import { ACTOR_ENV_VARS, APIFY_ENV_VARS } from '@apify/consts';

import { resetGlobalState } from '../resetGlobalState.js';

describe('events', () => {
let wss: WebSocketServer = null!;
const config = Configuration.getGlobalConfig();
let events: PlatformEventManager = null!;

beforeEach(() => {
// Set env vars BEFORE creating the Configuration — crawlee v4 resolves
// env-var-backed fields eagerly at construction, so a global config
// built earlier in the run wouldn't see `actorEventsWsUrl` and
// `events.init()` would silently never open the websocket.
process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL] =
'ws://localhost:9099/someRunId';
process.env[APIFY_ENV_VARS.TOKEN] = 'dummy';

wss = new WebSocketServer({ port: 9099 });
// Drop the cached Configuration so it picks up the env vars we just set.
resetGlobalState();
const config = Configuration.getGlobalConfig();
events = new PlatformEventManager(config);
config.useEventManager(events);
serviceLocator.setEventManager(events);

vitest.useFakeTimers();
process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL] =
'ws://localhost:9099/someRunId';
process.env[APIFY_ENV_VARS.TOKEN] = 'dummy';
});
afterEach(async () => {
vitest.useRealTimers();
delete process.env[ACTOR_ENV_VARS.EVENTS_WEBSOCKET_URL];
delete process.env[APIFY_ENV_VARS.TOKEN];
resetGlobalState();
await new Promise((resolve) => {
wss.close(resolve);
});
Expand Down Expand Up @@ -130,7 +140,7 @@ describe('events', () => {

test('should send persist state events in regular interval', async () => {
const eventsReceived = [];
const interval = config.persistStateIntervalMillis;
const interval = events.config.persistStateIntervalMillis;

events.on(EventType.PERSIST_STATE, (data) => eventsReceived.push(data));
await events.init();
Expand Down