Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/e2e-test/mocks/keystore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ export const mockKeyStore = (): TKeyStoreFactory => {
store[key] = value;
return "OK";
},
setItemWithExpiryNX: async (key, _expiryInSeconds, value) => {
if (store[key] !== undefined) return null;
store[key] = value;
return "OK";
},
deleteItem: async (key) => {
delete store[key];
return 1;
Expand Down
21 changes: 19 additions & 2 deletions backend/src/keystore/keystore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export const KeyStorePrefixes = {
ProjectSSEConnection: (projectId: string, connectionId: string) =>
`project-sse-conn:${projectId}:${connectionId}` as const,

ProjectDeleteLock: (projectId: string) => `project-delete-lock-${projectId}` as const
ProjectDeleteLock: (projectId: string) => `project-delete-lock-${projectId}` as const,

TelemetryIdentifyIdentity: (dedupKey: string) => `telemetry-identify-identity:${dedupKey}` as const
};

export const KeyStoreTtls = {
Expand All @@ -110,7 +112,8 @@ export const KeyStoreTtls = {
ProjectPermissionDalVersionTtl: "15m", // Project permission DAL version TTL
MfaSessionInSeconds: 300, // 5 minutes
WebAuthnChallengeInSeconds: 300, // 5 minutes
ProjectSSEConnectionTtlSeconds: 180 // Must be > heartbeat interval (60s) * 2
ProjectSSEConnectionTtlSeconds: 180, // Must be > heartbeat interval (60s) * 2
TelemetryIdentifyIdentityInSeconds: 600 // 10 minutes
};

type TDeleteItems = {
Expand Down Expand Up @@ -140,6 +143,12 @@ export type TKeyStoreFactory = {
value: string | number | Buffer,
prefix?: string
) => Promise<"OK">;
setItemWithExpiryNX: (
key: string,
expiryInSeconds: number | string,
value: string | number | Buffer,
prefix?: string
) => Promise<"OK" | null>;
deleteItem: (key: string) => Promise<number>;
deleteItemsByKeyIn: (keys: string[]) => Promise<number>;
deleteItems: (arg: TDeleteItems) => Promise<number>;
Expand Down Expand Up @@ -221,6 +230,13 @@ export const keyStoreFactory = (
prefix?: string
) => primaryRedis.set(prefix ? `${prefix}:${key}` : key, value, "EX", expiryInSeconds);

const setItemWithExpiryNX = async (
key: string,
expiryInSeconds: number | string,
value: string | number | Buffer,
prefix?: string
) => primaryRedis.set(prefix ? `${prefix}:${key}` : key, value, "EX", expiryInSeconds, "NX");

const deleteItem = async (key: string) => primaryRedis.del(key);

const deleteItemsByKeyIn = async (keys: string[]) => {
Expand Down Expand Up @@ -376,6 +392,7 @@ export const keyStoreFactory = (
getItem,
setExpiry,
setItemWithExpiry,
setItemWithExpiryNX,
deleteItem,
deleteItems,
incrementBy,
Expand Down
5 changes: 5 additions & 0 deletions backend/src/keystore/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
store[key] = value;
return "OK";
},
setItemWithExpiryNX: async (key, _expiryInSeconds, value) => {
if (store[key] !== undefined) return null;
store[key] = value;
return "OK";
},
deleteItem: async (key) => {
delete store[key];
return 1;
Expand Down
11 changes: 11 additions & 0 deletions backend/src/server/plugins/auth/inject-identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ export const injectIdentity = fp(
}

requestContext.set("identityAuthInfo", identityAuthInfo);

// Fire-and-forget: enrich PostHog person record for this machine identity
void server.services.telemetry
.identifyIdentity(identity.identityId, {
name: identity.identityName,
authMethod: identity.authMethod
})
.catch((error) => {
req.log.error(error, `Failed to enrich PostHog identity [identityId=${identity.identityId}]`);
});
Comment thread
0xArshdeep marked this conversation as resolved.

break;
}
case AuthMode.SERVICE_TOKEN: {
Expand Down
51 changes: 49 additions & 2 deletions backend/src/services/telemetry/telemetry-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PostHog } from "posthog-node";

import { TLicenseServiceFactory } from "@app/ee/services/license/license-service";
import { InstanceType } from "@app/ee/services/license/license-types";
import { TKeyStoreFactory } from "@app/keystore/keystore";
import { KeyStorePrefixes, KeyStoreTtls, TKeyStoreFactory } from "@app/keystore/keystore";
import { getConfig } from "@app/lib/config/env";
import { request } from "@app/lib/config/request";
import { crypto } from "@app/lib/crypto/cryptography";
Expand Down Expand Up @@ -38,7 +38,7 @@ export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory
export type TTelemetryServiceFactoryDep = {
keyStore: Pick<
TKeyStoreFactory,
"incrementBy" | "deleteItemsByKeyIn" | "setItemWithExpiry" | "getKeysByPattern" | "getItems"
"incrementBy" | "deleteItemsByKeyIn" | "setItemWithExpiry" | "setItemWithExpiryNX" | "getKeysByPattern" | "getItems"
>;
licenseService: Pick<TLicenseServiceFactory, "getInstanceType" | "getPlan">;
orgDAL: Pick<TOrgDALFactory, "findOrgById">;
Expand Down Expand Up @@ -391,6 +391,52 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
}
};

// In-memory fallback dedup set to limit blast radius during Redis outages
const inMemoryIdentityDedup = new Set<string>();

const identifyIdentity = async (
identityId: string,
properties: {
name?: string;
authMethod?: string;
}
) => {
if (postHog && identityId) {
const instanceType = licenseService.getInstanceType();
if (instanceType === InstanceType.Cloud) {
const dedupKey = `${identityId}-${properties.authMethod ?? ""}`;
try {
const cacheKey = KeyStorePrefixes.TelemetryIdentifyIdentity(dedupKey);
// Atomic SET NX + EX: only the first caller within the TTL window proceeds
const wasSet = await keyStore.setItemWithExpiryNX(
cacheKey,
KeyStoreTtls.TelemetryIdentifyIdentityInSeconds,
"1"
);
if (!wasSet) return;
} catch (error) {
logger.error(error, `Failed to check PostHog identity dedup cache [identityId=${identityId}]`);
// In-memory fallback to limit blast radius during Redis outage
if (inMemoryIdentityDedup.has(dedupKey)) return;
inMemoryIdentityDedup.add(dedupKey);
const timer = setTimeout(
() => inMemoryIdentityDedup.delete(dedupKey),
KeyStoreTtls.TelemetryIdentifyIdentityInSeconds * 1000
);
timer.unref();
// falls through intentionally: first caller during Redis outage still identifies
}

const distinctId = `identity-${identityId}`;
try {
postHog.identify({ distinctId, properties });
} catch (err) {
logger.error(err, `Failed to call postHog.identify for machine identity [identityId=${identityId}]`);
}
}
}
};

const flushAll = async () => {
if (postHog) {
await postHog.shutdownAsync();
Expand All @@ -401,6 +447,7 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
sendLoopsEvent,
sendPostHogEvents,
identifyUser,
identifyIdentity,
processAggregatedEvents,
flushAll,
getBucketForDistinctId
Expand Down
Loading