Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Duplicate
-
None
-
None
Description
I am trying to build a streaming beam pipeline in python which should capture messages from kafka and then execute further stages of data fetching from other sources and aggregation. The step-by-step process of what I have built till now is:
- Running Kafka instance on localhost:9092
./bin/kafka-server-start.sh ./config/server.properties
- Run beam-flink job server using docker 92
docker run --net=host apache/beam_flink1.10_job_server:latest
- Run beam-kafka pipeline
import apache_beam as beam from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions if __name__ == '__main__': options = PipelineOptions([ "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK", "--streaming", "--environment_config={\"command\":\"/opt/apache/beam/boot\"}", ]) options = options.view_as(StandardOptions) options.streaming = True pipeline = beam.Pipeline(options=options) result = ( pipeline | "Read from kafka" >> ReadFromKafka( consumer_config={ "bootstrap.servers": 'localhost:9092', }, topics=['mytopic'], expansion_service='localhost:8097', ) | beam.Map(print) ) pipeline.run()
Publish new message using kafka-producer.sh
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic >tryme
After publishing this trial message, the beam pipeline perceives the message but crashes giving this error:
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478) at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:138) at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483) at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:84) at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:516) at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForElementAndRestriction(FnApiDoFnRunner.java:838) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSizedElementAndRestriction(FnApiDoFnRunner.java:808) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$200(FnApiDoFnRunner.java:132) at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:226) at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:223) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179) at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:204) at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173) at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[] at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63) at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56) at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541) at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109) at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:155) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
Attachments
Issue Links
- is duplicated by
-
BEAM-10529 Kafka XLang fails for ?empty? key/values
- Resolved