Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12557

Beam: Kafka with Spark Runner configuration

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.30.0
    • Missing
    • io-java-kafka, runner-spark
    • None

    Description

      I am new to the Beam project and one task of my bachelor thesis is to do some benchmarking using Beam. I have created a simple "*number-count*" program by modifying a word-count example(https://dzone.com/articles/unbounded-stream-processing-using-apache-beam).

      I am using a simple *kafka* *topic(1 partition) and produce a number and a timestamp(event time) periodically. The problem is I run the pipeline with **Direct runner* and it works *fine, but when I use **Spark* it *fails* without an error. Basically spark as soon as it configures the executors it shuts down, instead of waiting for kafka. I tested it with yarn or standalone but nothing.

      After spending 1 week i can't figure it out. Any help is highly appreciated.

      *I am sure that pom.xml might be missing something*

      ```
      Beam: 2.30.0
      Spark: 2.4.7
      Kafka: 2.4.1
      Hadoop: 2.7.7
      Java: 8 (1.8.0_291)
      OS: Ubuntu 18.04 Lts
      ```

      The pipeline:
      ```
      Pipeline pipeline = Pipeline.create(options);
      Duration WINDOW_TIME = Duration.standardSeconds(5);
      Duration ALLOWED_LATENESS = Duration.standardSeconds(5);

      CoderRegistry cr = pipeline.getCoderRegistry();
      cr.registerCoderForClass(Record.class, new RecordSerializableCoder());

      pipeline.apply(
      KafkaIO.<Long, Record>read()
      .withBootstrapServers(options.getBootstrap())
      .withTopic(options.getInputTopic())
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(RecordDeserializer.class)
      .withTimestampPolicyFactory((tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))
      .withConsumerConfigUpdates(ImmutableMap.of("group.id", "test.group"))
      .withoutMetadata()
      )
      .apply(Values.<Record>create())
      .apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getTimestamp())))
      .apply("extract number", MapElements
      .into(TypeDescriptors.longs())
      .via(Record::getNumber))
      .apply("apply window", Window.<Long>into(FixedWindows.of(WINDOW_TIME))
      .withAllowedLateness(ALLOWED_LATENESS)
      .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
      .accumulatingFiredPanes()
      )
      .apply("count numbers", new CountNumbers())
      .apply("format result to String",MapElements
      .into(TypeDescriptors.strings())
      .via((KV<Long, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
      .apply("Write it to a text file", new WriteOneFilePerWindow(options.getOutput()));

      pipeline.run();
      ```

      *pom.xml*
      ```
      // Spark profile and general dependencies

      <profile>
      <id>spark-runner</id>
      <properties>
      <netty.version>4.1.17.Final</netty.version>
      </properties>
      <dependencies>
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-spark</artifactId>
      <version>${beam.version}</version>
      <scope>runtime</scope>
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <!-<scope>runtime</scope>->
      </dependency>
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
      <version>${beam.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
      <!-<scope>runtime</scope>->
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-catalyst_2.11</artifactId>
      <version>${spark.version}</version>
      <!-<scope>runtime</scope>->
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <!-- <scope>runtime</scope> -->
      <exclusions>
      <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>jul-to-slf4j</artifactId>
      </exclusion>
      </exclusions>
      </dependency>

      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>

      <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      </dependencies>
      </profile>

      <dependencies>
      <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-spark</artifactId>
      <version>${beam.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
      <version>${beam.version}</version>
      <!-- <scope>runtime</scope> -->
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
      <!-- <exclusion>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      </exclusion> -->
      <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>jul-to-slf4j</artifactId>
      </exclusion>
      <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
      </exclusions>
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <!-- <scope>runtime</scope> -->
      <exclusions>
      <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>jul-to-slf4j</artifactId>
      </exclusion>
      </exclusions>
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
      <!-<scope>runtime</scope>->
      </dependency>
      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-catalyst_2.11</artifactId>
      <version>${spark.version}</version>
      <!-<scope>runtime</scope>->
      </dependency>

      <!-- Add slf4j API frontend binding with JUL backend -->
      <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
      </dependency>

      <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
      <scope>runtime</scope>
      </dependency>

      <!-- Adds a dependency on the Beam SDK. -->
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
      </dependency>

      <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>${joda.version}</version>
      </dependency>

      <!-- The DirectRunner is needed for unit tests. -->
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
      <scope>test</scope>
      </dependency>

      <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>${mockito.version}</version>
      <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
      <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-kafka</artifactId>
      <version>${beam.version}</version>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
      <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
      </dependency>

      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
      </dependency>

      ```
      Submitting the pipeline to yarn:
      ```
      ./spark-submit
      --class com.nikarav.WindowedNumberCount
      --master yarn
      target/count-numbers-bundled-1.0.jar
      --runner=SparkRunner
      --output=counts

      ```
      ```
      // yarn log

      21/06/30 01:50:59 INFO yarn.YarnAllocator: Will request 2 executor container(s), each with 1 core(s) and 1408 MB memory (including 384 MB of overhead)
      21/06/30 01:50:59 INFO yarn.YarnAllocator: Submitted 2 unlocalized container requests.
      21/06/30 01:50:59 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
      21/06/30 01:50:59 INFO impl.AMRMClientImpl: Received new token for : nikarav:42295
      21/06/30 01:50:59 INFO yarn.YarnAllocator: Launching container container_1624916683431_0015_01_000002 on host nikarav for executor with ID 1
      21/06/30 01:50:59 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
      21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
      21/06/30 01:50:59 INFO impl.ContainerManagementProtocolProxy: Opening proxy : nikarav:42295
      21/06/30 01:51:01 INFO yarn.YarnAllocator: Launching container container_1624916683431_0015_01_000003 on host nikarav for executor with ID 2
      21/06/30 01:51:01 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
      21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
      21/06/30 01:51:01 INFO impl.ContainerManagementProtocolProxy: Opening proxy : nikarav:42295
      21/06/30 01:51:06 INFO yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
      21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. nikarav:43449
      21/06/30 01:51:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. nikarav:43449
      21/06/30 01:51:06 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
      21/06/30 01:51:06 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
      21/06/30 01:51:06 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
      21/06/30 01:51:06 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://localhost:9000/user/hadoop/.sparkStaging/application_1624916683431_0015
      21/06/30 01:51:06 INFO util.ShutdownHookManager: Shutdown hook called
      ```
      *log* file: https://pastebin.com/nhL5byvK

      Attachments

        Activity

          People

            Unassigned Unassigned
            karntasis Nikos Karavasilis
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: