Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No manual reconnecting of Kafka in the usage service #6416

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions packages/services/usage/src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,13 @@ export function createKVBuffer<T>(config: {
reports: readonly T[],
size: number,
batchId: string,
isRetry = false,
isChunkedBuffer = false,
) {
logger.info(`Flushing (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);
const estimatedSizeInBytes = estimator.estimate(size);
buffer = [];
if (isChunkedBuffer) {
buffer = [];
}
await config
.sender(reports, estimatedSizeInBytes, batchId, function validateSize(bytes) {
if (!config.useEstimator) {
Expand All @@ -203,7 +205,7 @@ export function createKVBuffer<T>(config: {
}
})
.catch(error => {
if (!isRetry && isBufferTooBigError(error)) {
if (!isChunkedBuffer && isBufferTooBigError(error)) {
config.onRetry(reports);
logger.info(`Retrying (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);

Expand Down
77 changes: 23 additions & 54 deletions packages/services/usage/src/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import {
} from './metrics';

enum Status {
Waiting,
Ready,
Unhealthy,
Stopped,
Waiting = 'Waiting',
Ready = 'Ready',
Unhealthy = 'Unhealthy',
Stopped = 'Stopped',
}

const levelMap = {
Expand All @@ -30,11 +30,11 @@ const levelMap = {
} as const;

const retryOptions = {
maxRetryTime: 15 * 1000,
initialRetryTime: 300,
maxRetryTime: 30_000,
initialRetryTime: 500,
factor: 0.2,
multiplier: 2,
retries: 3,
retries: 10,
} satisfies RetryOptions; // why satisfies? To be able to use `retryOptions.retries` and get `number` instead of `number | undefined`

export function splitReport(report: RawReport, numOfChunks: number) {
Expand Down Expand Up @@ -140,15 +140,19 @@ export function createUsage(config: {
},
// settings recommended by Azure EventHub https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations
requestTimeout: 60_000, //
connectionTimeout: 5000,
authenticationTimeout: 5000,
connectionTimeout: 15_000,
authenticationTimeout: 15_000,
kamilkisiela marked this conversation as resolved.
Show resolved Hide resolved
retry: retryOptions,
});

const producer = kafka.producer({
// settings recommended by Azure EventHub https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations
metadataMaxAge: 180_000,
createPartitioner: Partitioners.LegacyPartitioner,
retry: retryOptions,
// Usually, there's one flush at a time,
// the only exception is when the buffer is chunked or when a reconnect happens
maxInFlightRequests: 5.
kamilkisiela marked this conversation as resolved.
Show resolved Hide resolved
});
const buffer = createKVBuffer<RawReport>({
logger,
Expand Down Expand Up @@ -212,23 +216,21 @@ export function createUsage(config: {
logger.info(`Flushed (id=%s, operations=%s)`, batchId, numOfOperations);
}

status = Status.Ready;
changeStatus(Status.Ready);
} catch (error: any) {
rawOperationFailures.inc(numOfOperations);

if (isBufferTooBigError(error)) {
logger.debug('Buffer too big, retrying (id=%s, error=%s)', batchId, error.message);
} else {
status = Status.Unhealthy;
changeStatus(Status.Unhealthy);
logger.error(`Failed to flush (id=%s, error=%s)`, batchId, error.message);
Sentry.setTags({
batchId,
message: error.message,
numOfOperations,
});
Sentry.captureException(error);

scheduleReconnect();
}

throw error;
Expand All @@ -238,57 +240,24 @@ export function createUsage(config: {

let status: Status = Status.Waiting;

let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
let reconnectCounter = 0;
function scheduleReconnect() {
logger.info('Scheduling reconnect');
if (reconnectTimeout) {
logger.info('Reconnect was already scheduled. Waiting...');
return;
}

reconnectCounter++;

if (reconnectCounter > retryOptions.retries) {
const message = 'Failed to reconnect Kafka producer. Too many retries.';
logger.error(message);
status = Status.Unhealthy;
void config.onStop(message);
function changeStatus(newStatus: Status) {
if (status === newStatus) {
return;
}

logger.info('Reconnecting in 1 second... (attempt=%s)', reconnectCounter);
reconnectTimeout = setTimeout(() => {
logger.info('Reconnecting Kafka producer');
status = Status.Waiting;
producer
.connect()
.then(() => {
logger.info('Kafka producer reconnected');
reconnectCounter = 0;
})
.catch(error => {
logger.error('Failed to reconnect Kafka producer: %s', error.message);
logger.info('Reconnecting in 2 seconds...');
setTimeout(scheduleReconnect, 2000);
})
.finally(() => {
if (reconnectTimeout != null) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
});
}, 1000);
logger.info('Changing status to %s', newStatus);
status = newStatus;
}

producer.on(producer.events.CONNECT, () => {
logger.info('Kafka producer: connected');
status = Status.Ready;
changeStatus(Status.Ready);
});

producer.on(producer.events.DISCONNECT, () => {
logger.info('Kafka producer: disconnected');
status = Status.Stopped;
changeStatus(Status.Stopped);
});

producer.on(producer.events.REQUEST_TIMEOUT, () => {
Expand All @@ -298,7 +267,7 @@ export function createUsage(config: {
async function stop() {
logger.info('Started Usage shutdown...');

status = Status.Stopped;
changeStatus(Status.Stopped);
await buffer.stop();
logger.info(`Buffering stopped`);
await producer.disconnect();
Expand Down Expand Up @@ -332,7 +301,7 @@ export function createUsage(config: {
logger.info('Starting Kafka producer');
await producer.connect();
buffer.start();
status = Status.Ready;
changeStatus(Status.Ready);
logger.info('Kafka producer is ready');
},
stop,
Expand Down
Loading