diff --git a/backend/e2e-test/mocks/keystore.ts b/backend/e2e-test/mocks/keystore.ts index d112b54b0bc..1e59a69a357 100644 --- a/backend/e2e-test/mocks/keystore.ts +++ b/backend/e2e-test/mocks/keystore.ts @@ -21,6 +21,11 @@ export const mockKeyStore = (): TKeyStoreFactory => { store[key] = value; return "OK"; }, + setItemWithExpiryNX: async (key, _expiryInSeconds, value) => { + if (store[key] !== undefined) return null; + store[key] = value; + return "OK"; + }, deleteItem: async (key) => { delete store[key]; return 1; diff --git a/backend/src/keystore/keystore.ts b/backend/src/keystore/keystore.ts index 73d8df9a712..a5e780d9608 100644 --- a/backend/src/keystore/keystore.ts +++ b/backend/src/keystore/keystore.ts @@ -99,7 +99,9 @@ export const KeyStorePrefixes = { ProjectSSEConnection: (projectId: string, connectionId: string) => `project-sse-conn:${projectId}:${connectionId}` as const, - ProjectDeleteLock: (projectId: string) => `project-delete-lock-${projectId}` as const + ProjectDeleteLock: (projectId: string) => `project-delete-lock-${projectId}` as const, + + TelemetryIdentifyIdentity: (dedupKey: string) => `telemetry-identify-identity:${dedupKey}` as const }; export const KeyStoreTtls = { @@ -110,7 +112,8 @@ export const KeyStoreTtls = { ProjectPermissionDalVersionTtl: "15m", // Project permission DAL version TTL MfaSessionInSeconds: 300, // 5 minutes WebAuthnChallengeInSeconds: 300, // 5 minutes - ProjectSSEConnectionTtlSeconds: 180 // Must be > heartbeat interval (60s) * 2 + ProjectSSEConnectionTtlSeconds: 180, // Must be > heartbeat interval (60s) * 2 + TelemetryIdentifyIdentityInSeconds: 600 // 10 minutes }; type TDeleteItems = { @@ -140,6 +143,12 @@ export type TKeyStoreFactory = { value: string | number | Buffer, prefix?: string ) => Promise<"OK">; + setItemWithExpiryNX: ( + key: string, + expiryInSeconds: number | string, + value: string | number | Buffer, + prefix?: string + ) => Promise<"OK" | null>; deleteItem: (key: string) => Promise; deleteItemsByKeyIn: (keys: string[]) => Promise; deleteItems: (arg: TDeleteItems) => Promise; @@ -221,6 +230,13 @@ export const keyStoreFactory = ( prefix?: string ) => primaryRedis.set(prefix ? `${prefix}:${key}` : key, value, "EX", expiryInSeconds); + const setItemWithExpiryNX = async ( + key: string, + expiryInSeconds: number | string, + value: string | number | Buffer, + prefix?: string + ) => primaryRedis.set(prefix ? `${prefix}:${key}` : key, value, "EX", expiryInSeconds, "NX"); + const deleteItem = async (key: string) => primaryRedis.del(key); const deleteItemsByKeyIn = async (keys: string[]) => { @@ -376,6 +392,7 @@ export const keyStoreFactory = ( getItem, setExpiry, setItemWithExpiry, + setItemWithExpiryNX, deleteItem, deleteItems, incrementBy, diff --git a/backend/src/keystore/memory.ts b/backend/src/keystore/memory.ts index 556da56e2ab..44e25eb5bcf 100644 --- a/backend/src/keystore/memory.ts +++ b/backend/src/keystore/memory.ts @@ -22,6 +22,11 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => { store[key] = value; return "OK"; }, + setItemWithExpiryNX: async (key, _expiryInSeconds, value) => { + if (store[key] !== undefined) return null; + store[key] = value; + return "OK"; + }, deleteItem: async (key) => { delete store[key]; return 1; diff --git a/backend/src/server/plugins/auth/inject-identity.ts b/backend/src/server/plugins/auth/inject-identity.ts index 8b7e7efa89c..69f3b953d64 100644 --- a/backend/src/server/plugins/auth/inject-identity.ts +++ b/backend/src/server/plugins/auth/inject-identity.ts @@ -241,6 +241,17 @@ export const injectIdentity = fp( } requestContext.set("identityAuthInfo", identityAuthInfo); + + // Fire-and-forget: enrich PostHog person record for this machine identity + void server.services.telemetry + .identifyIdentity(identity.identityId, { + name: identity.identityName, + authMethod: identity.authMethod + }) + .catch((error) => { + req.log.error(error, `Failed to enrich PostHog identity [identityId=${identity.identityId}]`); + }); + break; } case AuthMode.SERVICE_TOKEN: { diff --git a/backend/src/services/telemetry/telemetry-service.ts b/backend/src/services/telemetry/telemetry-service.ts index 59613446185..788cb6a47a2 100644 --- a/backend/src/services/telemetry/telemetry-service.ts +++ b/backend/src/services/telemetry/telemetry-service.ts @@ -3,7 +3,7 @@ import { PostHog } from "posthog-node"; import { TLicenseServiceFactory } from "@app/ee/services/license/license-service"; import { InstanceType } from "@app/ee/services/license/license-types"; -import { TKeyStoreFactory } from "@app/keystore/keystore"; +import { KeyStorePrefixes, KeyStoreTtls, TKeyStoreFactory } from "@app/keystore/keystore"; import { getConfig } from "@app/lib/config/env"; import { request } from "@app/lib/config/request"; import { crypto } from "@app/lib/crypto/cryptography"; @@ -38,7 +38,7 @@ export type TTelemetryServiceFactory = ReturnType; licenseService: Pick; orgDAL: Pick; @@ -391,6 +391,52 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme } }; + // In-memory fallback dedup set to limit blast radius during Redis outages + const inMemoryIdentityDedup = new Set(); + + const identifyIdentity = async ( + identityId: string, + properties: { + name?: string; + authMethod?: string; + } + ) => { + if (postHog && identityId) { + const instanceType = licenseService.getInstanceType(); + if (instanceType === InstanceType.Cloud) { + const dedupKey = `${identityId}-${properties.authMethod ?? ""}`; + try { + const cacheKey = KeyStorePrefixes.TelemetryIdentifyIdentity(dedupKey); + // Atomic SET NX + EX: only the first caller within the TTL window proceeds + const wasSet = await keyStore.setItemWithExpiryNX( + cacheKey, + KeyStoreTtls.TelemetryIdentifyIdentityInSeconds, + "1" + ); + if (!wasSet) return; + } catch (error) { + logger.error(error, `Failed to check PostHog identity dedup cache [identityId=${identityId}]`); + // In-memory fallback to limit blast radius during Redis outage + if (inMemoryIdentityDedup.has(dedupKey)) return; + inMemoryIdentityDedup.add(dedupKey); + const timer = setTimeout( + () => inMemoryIdentityDedup.delete(dedupKey), + KeyStoreTtls.TelemetryIdentifyIdentityInSeconds * 1000 + ); + timer.unref(); + // falls through intentionally: first caller during Redis outage still identifies + } + + const distinctId = `identity-${identityId}`; + try { + postHog.identify({ distinctId, properties }); + } catch (err) { + logger.error(err, `Failed to call postHog.identify for machine identity [identityId=${identityId}]`); + } + } + } + }; + const flushAll = async () => { if (postHog) { await postHog.shutdownAsync(); @@ -401,6 +447,7 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme sendLoopsEvent, sendPostHogEvents, identifyUser, + identifyIdentity, processAggregatedEvents, flushAll, getBucketForDistinctId