Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error running example: pipe /etc/profile to the topic #4

Open
rongpenl opened this issue Jan 28, 2021 · 0 comments
Open

Error running example: pipe /etc/profile to the topic #4

rongpenl opened this issue Jan 28, 2021 · 0 comments

Comments

@rongpenl
Copy link

Hi @knaufk , thank you for the demo.

I set up the environment without using the Dockerfile and it looks fine so far. The lambda syntax is outdated so I changed it to

| 'ExtractWords' >> beam.FlatMap(lambda k,v : re.findall(r'[A-Za-z\']+', v))

However, when I run kafka-console-producer.sh --broker-list localhost:9092 --topic beam-input, the output gives me the following error. I tried to include the earliest possible abnormal message.

message: "Client failed to dequeue and process the value"
trace: "org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)\n\tat org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)\n\tat org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n"
transform_id: "ReadFromKafka/Remove Kafka Metadata/ParMultiDo(Anonymous)"
log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
thread: "29"

ERROR:root:severity: ERROR
timestamp {
  seconds: 1611810138
  nanos: 853000000
}
message: "Exception while trying to handle InstructionRequest bundle_140"
trace: "org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)\n\tat org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)\n\tat org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n"
transform_id: "ReadFromKafka/Remove Kafka Metadata/ParMultiDo(Anonymous)"
log_location: "org.apache.beam.fn.harness.control.BeamFnControlClient"
thread: "29"

INFO:root:severity: INFO
timestamp {
  seconds: 1611810139
  nanos: 604000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1306429134_none-3, groupId=Reader-0_offset_consumer_1306429134_none] Seeking to LATEST offset of partition beam-input-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "35"

INFO:root:severity: INFO
timestamp {
  seconds: 1611810139
  nanos: 607000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1306429134_none-3, groupId=Reader-0_offset_consumer_1306429134_none] Resetting offset for partition beam-input-0 to offset 81."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "35"

61220705a9252942f23961858c53a3129c0ea63a042850eca7039af0d3437725
Traceback (most recent call last):
  File "wordcount.py", line 41, in <module>
    run()
  File "wordcount.py", line 36, in run
    result = p.run()
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
    bundle_context_manager,
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
    bundle_manager)
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 937, in process_bundle
    raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)
	at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)
	at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)
	at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
	at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)
	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)
	at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)
	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)
	at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
	at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
	at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
	at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
	at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)
	at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
	at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
	at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
	at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
	at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
	at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
	at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
	at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)

While debugging, I also find it looks like the official doc of beam said only Flink Runner support the Kafka source. I am not sure whether this is related.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant