Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4632

KafkaIO seems to fail on streaming mode over spark runner

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Cannot Reproduce
    • 2.4.0
    • 2.6.0
    • io-java-kafka, runner-spark
    • None
    • Ubuntu 16.04.4 LTS

    Description

      Dear sir,

      The following versions of related tools are set in my running program:

      ==================================

      Beam 2.4.0 (Direct runner and Spark runner)

      Spark 2.2.1 (local mode and standalone mode)

      Kafka: 2.11-0.10.1.1

      scala: 2.11.8

      java: 1.8

      ==================================

      My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: https://github.com/LinRick/beamkafkaIO,

      The description of my situation is as:

      The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip (http://ubuntu7:9092).

      The user manual of kafkaIO SDK (on web:https://beam.apache.org/documentation/sdks/javadoc/2.4.0/)  indicates that the following parameters need to be set, and then the kafkaIO can work well.

      .withBootstrapServers("kafka broker ip:9092")
      .withTopic("kafkasink")
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)

      When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown. Here, as mentioned on the website: https://beam.apache.org/documentation/runners/spark/, the performing program will automatically set streaming mode. 

      Unfortunately, it failed for my program.

      On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing).

      The steps of performing StarterPipeline.java in my program are:

      step1 mvn compile exec:java Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="-runner=SparkRunner"
      step2 mvn clean package
      step3 cp -rf target/beamkafkaIO-0.1.jar /root/
      step4 cd /spark-2.2.1-bin-hadoop2.6/bin
      step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner

      I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ?

      I really can't handle it, so I hope to get help from you.

      if any further information is needed, i am glad to be informed and will provide to you as soon as possible.

      I will highly appreciate it if you can help me to deal with this issue.

      i am looking forward to hearing from you.

      Sincerely yours,

      Rick

       

      Attachments

        1. .withMaxNumRecords(500000).JPG
          18 kB
          Rick Lin
        2. DB_table_kafkabeamdata_count.JPG
          21 kB
          Rick Lin
        3. error UnboundedDataset.java 81(0) has different number of partitions.JPG
          252 kB
          Rick Lin
        4. the error GeneratedMessageV3.JPG
          157 kB
          Rick Lin
        5. the error GeneratedMessageV3.JPG
          288 kB
          Rick Lin

        Activity

          People

            aromanenko Alexey Romanenko
            Ricklin Rick Lin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: