Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-4631

kafkIO should run the streaming mode over spark runner

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • 2.4.0
    • 2.4.0
    • io-java-kafka, runner-spark
    • None
    • Ubuntu 16.04.4 LTS

    Description

      Dear sir,

      The following versions of related tools are set in my running program:

      ==================================

      Beam 2.4.0 (Direct runner and Spark runner)

      Spark 2.2.1 (local mode and standalone mode)

      Kafka: 2.11-0.10.1.1

      scala: 2.11.8

      java: 1.8

      ==================================

      My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: https://github.com/LinRick/beamkafkaIO,

      The description of my situation is as:

      The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip (http://ubuntu7:9092).

      The user manual of kafkaIO SDK (on web:https://beam.apache.org/documentation/sdks/javadoc/2.4.0/)  indicates that the following parameters need to be set, and then the kafkaIO can work well.

      .withBootstrapServers("kafka broker ip:9092")
      .withTopic("kafkasink")
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)

      When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown. Here, as mentioned on the website: https://beam.apache.org/documentation/runners/spark/, the performing program will automatically set streaming mode. 

      Unfortunately, it failed for my program.

      On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing).

      The steps of performing StarterPipeline.java in my program are:

      step1 mvn compile exec:java Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="-runner=SparkRunner"
      step2 mvn clean package
      step3 cp -rf target/beamkafkaIO-0.1.jar /root/
      step4 cd /spark-2.2.1-bin-hadoop2.6/bin
      step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner

      I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ?

      I really can't handle it, so I hope to get help from you.

      if any further information is needed, i am glad to be informed and will provide to you as soon as possible.

      I will highly appreciate it if you can help me to deal with this issue.

      i am looking forward to hearing from you.

      Sincerely yours,

      Rick

       

      Attachments

        Activity

          People

            rangadi Raghu Angadi
            Ricklin Rick Lin
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: