-
Notifications
You must be signed in to change notification settings - Fork 3
Prod - Kafka resiliency and better dropped connection handling #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| try { | ||
| await this.prepareNextIterativeReview(challengeId); | ||
| } catch (error) { | ||
| const err = error as Error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡 maintainability]
Consider adding more context to the error logging. While the error message and stack trace are logged, additional context such as the function name or parameters could aid in debugging.
| await scheduler.advancePhase(payload); | ||
|
|
||
| expect( | ||
| first2FinishService.handleIterativePhaseClosed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[correctness]
Consider checking if first2FinishService.handleIterativePhaseClosed resolves successfully or handles potential errors. This will ensure that any issues during the handling of the iterative phase closure are caught and managed appropriately.
|
|
||
| if (operation === 'close' && phaseName === ITERATIVE_REVIEW_PHASE_NAME) { | ||
| try { | ||
| await this.first2FinishService.handleIterativePhaseClosed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[maintainability]
Consider adding a retry mechanism for this.first2FinishService.handleIterativePhaseClosed to handle transient errors more gracefully. This would improve the robustness of the phase closure process.
| reason: 'Kafka reconnection attempts exhausted', | ||
| }); | ||
|
|
||
| await expect(indicator.isHealthy('kafka')).rejects.toBeInstanceOf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡 performance]
Consider checking the state before calling isConnected to avoid unnecessary calls when the state is already failed. This can improve performance slightly by reducing unnecessary operations.
|
|
||
| async isHealthy(key: string) { | ||
| try { | ||
| const status = this.kafkaService.getKafkaStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[correctness]
Consider handling the case where getKafkaStatus() might return undefined or an unexpected structure. This could prevent potential runtime errors if the method's contract changes or if there's an unexpected failure in fetching the status.
| async isHealthy(key: string) { | ||
| try { | ||
| const status = this.kafkaService.getKafkaStatus(); | ||
| const timestamp = new Date().toISOString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡 maintainability]
The timestamp is being generated multiple times in the method. Consider generating it once at the start of the method and reusing it to ensure consistency across all log entries and error messages.
| ); | ||
| } | ||
|
|
||
| const isConnected = await this.kafkaService.isConnected(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡 design]
The isConnected check could potentially be combined with the status.state check to streamline the health check logic. Consider whether both checks are necessary or if they can be unified to simplify the control flow.
|
|
||
| try { | ||
| const producerConnected = this.producer.isConnected(); | ||
| const producerConnected = this.producer?.isConnected?.() ?? false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[correctness]
The use of optional chaining (?.) with this.producer suggests that this.producer might be undefined. However, the constructor initializes this.producer using this.createProducer(), which should always return a valid KafkaProducer. Consider removing the optional chaining if this.producer is guaranteed to be defined, or ensure that this.producer can indeed be undefined in some scenarios.
| } | ||
|
|
||
| private scheduleReconnect(): void { | ||
| if (this.reconnectionTask || this.shuttingDown) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[maintainability]
The scheduleReconnect method sets this.reconnectionTask to the result of this.performReconnect(), but does not handle any potential errors that might occur during the execution of performReconnect. Consider adding error handling to ensure that any exceptions are logged or managed appropriately.
| } | ||
| } | ||
|
|
||
| private async restartConsumers(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[❗❗ correctness]
In restartConsumers, the method first closes all consumers and then starts new sessions for them. If startConsumerSession fails for any group, it might leave the system in a partially initialized state. Consider implementing a rollback mechanism or ensuring that all consumers can be restarted successfully before proceeding.
| } | ||
| } | ||
|
|
||
| private async wait(delayMs: number): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[💡 performance]
The wait method uses setTimeout to delay execution. This approach is generally fine, but be aware that it can be affected by the Node.js event loop and might not be precise for very short delays. Ensure that this behavior is acceptable for the use case.
No description provided.