-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server] Add threadsafe mode to venice-server which adjusts message processing order #910
base: main
Are you sure you want to change the base?
Conversation
…age processing order This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts: -Drainer is skipped on leaders: In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR. -DCR logic must change Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR. -Transient Record is disabled transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache. There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others. NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change! Overall looks great, I leave some comment, especially about TR.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
Outdated
Show resolved
Hide resolved
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
...vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, now I am not sure if I totally follow the logic as I post my concern in one of the reply......we probably should chat a bit offline to chew the code change.
Also, we probably need some tests coverage to prove our old mode can fail in the race condition case and new mode is correct, even if my concern is eventually proved invalid...
@@ -127,6 +127,7 @@ public void setUp() { | |||
Properties serverProperties = new Properties(); | |||
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); | |||
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); | |||
serverProperties.put(SERVER_INGESTION_TASK_THREAD_SAFE_MODE, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this mode is default false, can we make it to run both true and false for some sophisticated tests for AAWC?
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
@@ -2834,6 +2885,14 @@ private int internalProcessConsumerRecord( | |||
LOGGER.error("Failed to record Record heartbeat with message: ", e); | |||
} | |||
} else { | |||
// TODO: This is a hack. Today the code kind of does a backdoor change to a key in the leaderProducer callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't really follow this hack, can you explain a bit? I remember the issue being the leader does not chunk the record before it is produced to VT, and it seems like here it does not solve the issue? So the Leader and the Follower will still have different view?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I wish I would have reviewed it sooner, but better late than never I guess... my comments are pretty minor, I think... but I have a few questions as well.
I don't see which integration tests are disabled. Is this a stale part of the commit message / PR body? Let's delete that part if it is stale...
Thanks again!
@@ -453,6 +454,8 @@ public class VeniceServerConfig extends VeniceClusterConfig { | |||
|
|||
private final int ingestionTaskMaxIdleCount; | |||
|
|||
private final boolean threadSafeMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably find another name for this functionality, since technically the old code is also supposed to be threadsafe... it's just that the new code is intended to make it easier to maintain thread-safety (less likely to introduce concurrency bugs)...
I guess the most significant functional change with this mode is that the leader persists changes locally prior to writing to Kafka, and as a result the TransientRecordCache
becomes unnecessary. Perhaps a name along those lines might be more clear?
How about: leaderPersistsLocallyBeforeProducingToVT
/ leader.persists.locally.before.producing.to.vt
It's a bit of a mouthful, but seems more clear... a more concise version might be "Leader Persists Before Producing", and in day-to-day operations we might end up calling it "LPBP", for short. IDK, I'm just riffing at this point 😂
Open to other suggestions too, of course.
* The below flow must be executed in a critical session for the same key: | ||
* The below flow must be executed in a critical section for the same key: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks 😄 ...
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
if (this.syncOffsetsOnlyAfterProducing && !chunkDeprecation) { | ||
// sync offsets | ||
ingestionTask | ||
.maybeSyncOffsets(consumedRecord, leaderProducedRecordContext, partitionConsumptionState, subPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the PCS instance passed here has a chance of having been modified by another thread (the processing thread)? I imagine we don't clone the PCS in order to make them immutable, so I wonder if what we would be checkpointing here is guaranteed to represent the correct state, up until what was just produced, rather than up until what's been consumed by another thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch, yeah it could happen.
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java
Show resolved
Hide resolved
/** | ||
* The consumer record has been put into drainer queue; the following cases will result in putting to drainer directly: | ||
* The consumer record needs to be put into drainer queue; the following cases will result in putting to drainer directly: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should delete the O/B/O reference below, right?
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.PUT_ONLY_PART_LENGTH_FIELD_POS; | ||
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_POS; | ||
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_POS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: why are we removing those? It makes the code a bit more verbose, and it adds trivially modified lines into the git blame, which inflates the size of an already complex PR... is there any benefit? Perhaps there is ambiguity between the constants coming from different classes? (In which case I'd be fine with keeping the change...)
final boolean newFieldCompletelyReplaceOldField = newFieldValue != oldFieldValue; | ||
if (newFieldCompletelyReplaceOldField) { | ||
|
||
if (newFieldValue != oldFieldValue) { | ||
oldRecord.put(oldRecordField.pos(), newFieldValue); | ||
} | ||
return newFieldCompletelyReplaceOldField | ||
? UpdateResultStatus.COMPLETELY_UPDATED | ||
: UpdateResultStatus.NOT_UPDATED_AT_ALL; | ||
|
||
return UpdateResultStatus.COMPLETELY_UPDATED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I guess this is the change referenced in the commit message about:
DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.
I don't understand this, however... why would there be changes persisted in the leader but not in the VT, in cases of a perfect tie? If it is a perfect tie, then I assume the leader need not persist any changes either, right...?
setNewMapActiveElementAndTs( | ||
setNewMapActiveElementAndTimestamp( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick... I would prefer trivial renames to be in a separate PR, to minimize the size of complex PRs... but up to you.
[server][WIP] Add threadsafe mode to venice-server which adjusts message processing order
This is an initial phase PR. It is seen as the minimal set of changes needed in order to add a mode where writes on leader are committed to rocksdb prior to producing. This change in order has the following impacts:
Drainer is skipped on leaders:
In a later refactor it might be prudent to remove the drainer entirely. However, in order to best accomodate that, it would likely make sense to execute a kind of batching logic when flushing to rocksdb. We do not attempt to make this change in this PR.
DCR logic must change:
Since writes are persisted to rocksdb prior to producing to Kafka, we now must accomodate for the possibility of left over state on a leader. To address this, we add a new mode to the merge conflict resolution logic where upon a perfect tie (on value and timestamp), we resolve to produce the repeated record. The intention here is to be able to be certain that a write which was persisted to rocksdb on leader but not produced doesn't end up getting lost due to failing DCR.
Transient Record is disabled
transient record cache is disabled for those ingestion tasks which enable this mode. This is itself was one of the goals, but we should go here with some validation. Most clusters in production end up seeing pretty low cache hit rate on transient record cache in production, however, there is at least one use case that gets as high as a 20% hit rate. Theoretically, we may be able to avoid taking too much hit here as we are able to give the memory savings to rocksdb cache, but this needs vetting. If this doesn't work, then we will need to replace the transient record cache with a simple size/time based cache.
There are also some cleanups here and there. Getting rid of some code paths that we no longer need and cleaning up others.
NOTE: Integration tests haven't been completely added to this PR yet. Part of that is because while switching some of the existing integration tests to this mode, some tests are failing. This needs some more diagnosis. Hence the WIP tag.
Resolves #XXX
How was this PR tested?
Does this PR introduce any user-facing changes?