Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10484

Python - Apache Beam - Flink runner setup: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

Details

    Description

      I am trying to build a streaming beam pipeline in python which should capture messages from kafka and then execute further stages of data fetching from other sources and aggregation. The step-by-step process of what I have built till now is:

      1. Running Kafka instance on localhost:9092
        ./bin/kafka-server-start.sh ./config/server.properties
        

         

      1. Run beam-flink job server using docker 92
        docker run --net=host apache/beam_flink1.10_job_server:latest
        

         

      1. Run beam-kafka pipeline 
      import apache_beam as beam
      from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
      from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
      
      
      if __name__ == '__main__':
          options = PipelineOptions([
              "--job_endpoint=localhost:8099",
              "--environment_type=LOOPBACK",
              "--streaming",
              "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
          ])
      
          options = options.view_as(StandardOptions)
          options.streaming = True
      
          pipeline = beam.Pipeline(options=options)
      
          result = (
              pipeline
      
              | "Read from kafka" >> ReadFromKafka(
                  consumer_config={
                      "bootstrap.servers": 'localhost:9092',
                  }, 
                  topics=['mytopic'],
                  expansion_service='localhost:8097',
              )
      
              | beam.Map(print)
          )
      
          pipeline.run()

       

      Publish new message using kafka-producer.sh  

      ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
      >tryme

       

      After publishing this trial message, the beam pipeline perceives the message but crashes giving this error:

      RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
          at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
          at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
          at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
          at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:138)
          at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
          at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:84)
          at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:516)
          at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForElementAndRestriction(FnApiDoFnRunner.java:838)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSizedElementAndRestriction(FnApiDoFnRunner.java:808)
          at org.apache.beam.fn.harness.FnApiDoFnRunner.access$200(FnApiDoFnRunner.java:132)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:226)
          at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:223)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
          at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:204)
          at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
          at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)
          at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
          at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
          at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
          at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
          at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
          at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
          at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
          at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
          at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:155)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
          at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ayush1705 Ayush Sharma
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: