Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30495

How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • Structured Streaming
    • None

    Description

      Trying to read data from a secured Kafka cluster using spark structured
      streaming. Also, using the below library to read the data -
      "spark-sql-kafka-0-10_2.12":"3.0.0-preview" since it has the feature to
      specify our custom group id (instead of spark setting its own custom group
      id)

      Dependency used in code:

              <groupId>org.apache.spark</groupId>
              <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
              <version>3.0.0-preview</version>

       

      Logs:

      Getting the below error - even after specifying the required JAAS
      configuration in spark options.

      Caused by: java.lang.IllegalArgumentException: requirement failed:
      Delegation token must exist for this connector. at
      scala.Predef$.require(Predef.scala:281) at

      org.apache.spark.kafka010.KafkaTokenUtil$.isConnectorUsingCurrentToken(KafkaTokenUtil.scala:299)
      at
      org.apache.spark.sql.kafka010.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:533)
      at
      org.apache.spark.sql.kafka010.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:275)

       

      Spark configuration used to read from Kafka:

      val kafkaDF = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootStrapServer)
      .option("subscribe", kafkaTopic )

      //Setting JAAS Configuration
      .option("kafka.sasl.jaas.config", KAFKA_JAAS_SASL)
      .option("kafka.sasl.mechanism", "PLAIN")
      .option("kafka.security.protocol", "SASL_SSL")

      // Setting custom consumer group id
      .option("kafka.group.id", "test_cg")
      .load()

       

      Following document specifies that we can disable the feature of obtaining
      delegation token -
      https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html

      Tried setting this property spark.security.credentials.kafka.enabled to
      false in spark config, but it is still failing with the same error.

      Attachments

        Issue Links

          Activity

            People

              gsomogyi Gabor Somogyi
              act_coder act_coder
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: