Conversation
| watermarkToCommit = writerWatermark; | ||
| } else { | ||
| watermarkToCommit = | ||
| (watermarkToCommit.compareTo(writerWatermark) < 0) ? watermarkToCommit : writerWatermark; |
There was a problem hiding this comment.
This needs to be inverted, the compare logic is incorrect.
| * | ||
| * @return The lowest watermark out of all pending write requests | ||
| */ | ||
| Optional<ComparableWatermark> getUnacknowledgedWatermark(); |
There was a problem hiding this comment.
We need to bit more crisp about the definition / functionality of this method and rename: getUcknowledgedWatermark to getLowestUcknowledgedWatermark
| */ | ||
| public interface WatermarkStorage { | ||
|
|
||
| void commitWatermark(Watermark watermark) |
| * A DataWriter that is WatermarkAware. Required for implementing writers that | ||
| * can operate in continuous ingestion mode. | ||
| */ | ||
| public interface WatermarkAwareWriter<D> extends DataWriter<D> { |
There was a problem hiding this comment.
Not very specific to this extractor, but it is worthwhile illustrating that for streaming true watermark management is being done by the Task framework via WatermarkManager rather than Job (Source) running in the driver JVM.
Or else, it will get confusing and messy for people trying to mix the watermarks at Job (Source) and Task level
I wonder if we should rather use another name for this? checkpoints?
| } | ||
|
|
||
| @Override | ||
| public Optional<ComparableWatermark> getCommittableWatermark() { |
There was a problem hiding this comment.
At the moment getCommittableWatermark is only considering least of unacknowledged and committable from individual writers.
However, if fault tolerance is turned on, and we are ok with 10% failure, ie. ignore 1 record per 10 records, and your least unacknowledged write is at x whereas you have x+1 to x+10 all returned successfully; shouldn't your committable watermark be x+10?
There would be more such scenarios with failed and timed-out records combined with failure tolerance window which we should hash out (one argument could be that its upto the writer to handle such scenarios and return right committable / unacknowledged watermark)
There was a problem hiding this comment.
Agree that it would be up to the writer to handle such scenarios, but we do need to flesh out the expected behavior. Also, I don't expect there to be many WatermarkAwareWriters. Most writers will just be AsyncDataWriters / SyncDataWriters and just be responsible for writing records and flushing pending writes, so they won't care about this stuff.
There will possibly be a single WatermarkAwareWriter that wraps them. My current plan to have AsyncWriterManager be that class.
| public interface WatermarkStorage { | ||
|
|
||
| void commitWatermark(Watermark watermark) | ||
| throws IOException; |
There was a problem hiding this comment.
Should this fail, or fail with certain tolerance ie. only fail if it cant persist for last 1 minute, because if some one turns checkpointing on for every millisecond and there are transient network / connection failures, it would fail the Task - which is not desirable?
There was a problem hiding this comment.
The way the current code is written (in WatermarkManager), failures in committing watermarks shouldn't fail the Task. At least that was my intention :)
| + " is not an instance of WatermarkStorage but the task is configured to run in continuous mode"); | ||
| } | ||
| long commitIntervalMillis = 1000; // TODO: Configure | ||
| this.watermarkManager = Optional.of(this.closer.register |
There was a problem hiding this comment.
Commenting here since github isn't letting me comment on commit() method since you haven't touched it :) :
- We should tweak commit() to consolidate handle watermark update for last of writes
- Revisit publishTaskData() being invoked from commit()
There was a problem hiding this comment.
Yeah I haven't handled the "orderly shutdown" case at all. We should discuss the design before I write the code for it.
| } | ||
|
|
||
| @Override | ||
| public void close() |
There was a problem hiding this comment.
doesn't seem to be invoked anywhere, I was expecting this to be invoked via some sort of finishJob() / finishTask() and the last write of writers consolidated to update final watermark
There was a problem hiding this comment.
Yeah I haven't handled the "orderly shutdown" case at all. We should discuss the design before I write the code for it.
| if (this.parentTaskDone) { | ||
| return; | ||
| } else { | ||
| this.logger.error("Found a {} record but parent task is not done. Aborting this fork.", record); |
There was a problem hiding this comment.
as per what I see .. it should never get in else clause? did I miss some condition?
| currentIteration++; | ||
| if (!externalWatermarkStorage.isEmpty()) { | ||
| for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) { | ||
| System.out.println(watermark); |
There was a problem hiding this comment.
nit: a bit of context in system.out would be helpful for debug
9ea782d to
9bb0e71
Compare
9bb0e71 to
7d33334
Compare
No description provided.