Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9046

Kafka connector for Python throws ClassCastException when reading KafkaRecord

Details

    Description

       I'm trying to read the data streaming from Apache Kafka using the Python SDK for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 1.8.3, I follow these steps:

      • Compile and run Beam 2.16 with Flink 1.8 runner.
        git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
        cd beam-2.16.0
        nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
        
      • Run the Python pipeline.
        from apache_beam import Pipeline
        from apache_beam.io.external.kafka import ReadFromKafka
        from apache_beam.options.pipeline_options import PipelineOptions
        
        
        if __name__ == '__main__':
            with Pipeline(options=PipelineOptions([
                '--runner=FlinkRunner',
                '--flink_version=1.8',
                '--flink_master_url=localhost:8081',
                '--environment_type=LOOPBACK',
                '--streaming'
            ])) as pipeline:
                (
                    pipeline
                    | 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
                )
                result = pipeline.run()
                result.wait_until_finish()
        
      • Publish some data to Kafka.
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
        >{"hello":"world!"}
        

        The Python script throws this error:

        [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
                at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
                at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
                at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
                at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
                at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
                at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
                at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
                at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
                at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
                at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
                at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
                at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
                at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
                ... 13 more
        Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
                at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
                at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
                at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
                at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
                at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
                at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
                at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
                at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
                at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
                at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
                at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
                at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
                at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
                ... 1 more
        ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        [flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
        [flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
        Traceback (most recent call last):
          File "main.py", line 40, in <module>
            run()
          File "main.py", line 37, in run
            result.wait_until_finish()
          File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
        RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        

        I tried other deserializers available in Kafka but they did not work, for example:

        Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer

        When I pass any coder from

         org.apache.beam.sdk.coders

        I get this error:

        java.lang.RuntimeException: Failed to build transform beam:external:java:kafka:read:v1 from spec urn: "beam:external:java:kafka:read:v1" ... Caused by: java.lang.RuntimeException: Couldn't resolve coder for Deserializer ...
        

        I also tried applying this patch by modifying the source code, but it didn't work:

      RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayDeserializer
      

      As another solution, I tried cloning that repository from the given commit:

      git clone https://github.com/mxm/beam.git beam-mxm
      git reset --hard b31cf99c75
      

      But it also did not work:

      Project 'runners' not found in root project 'beam'
      

      If this is not a bug but a problem on my part, please answer my StackOverflow question and close this issue.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              BerkayOzturk Berkay Öztürk
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: