Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-45714

Spark UI throws 500 error when StructuredStreaming query filter is selected

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.4.0
    • None
    • Java API
    • None

    Description

       

      When I'm trying to do the following inside the listener class, and go to the UI to check the streaming query tab and click on Sort by Latest Batch, I get the below error.

       

      class MyListener extends StreamingQueryListener {
      
        private val kafkaAdminClient = try {
          Some(Admin.create(getKafkaProp()))
        } catch {
          case _: Throwable => None
        } 
        private val registeredConsumerGroups = kafkaAdminClient.get.listConsumerGroups().all().get()
      
        override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
          val eventName = event.progress.name
          commitConsumerOffset(event.progress.sources.head.endOffset, eventName)
        }
      
        private def getKafkaProp(eventName: String = "dummy-admin"): Properties = {
          val props: Properties = new Properties()
          props.put("bootstrap.servers", Configuration.configurationMap("kafka.broker"))
          props.put("group.id", eventName)
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
          props.put("security.protocol", Configuration.configurationMap("kafka.security.protocol"))
          props.put("sasl.mechanism", Configuration.configurationMap("kafka.sasl.mechanism"))
          props.put("sasl.jaas.config", Configuration.configurationMap("kafka.sasl.jaas.config"))
          props
        }
      
        private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, Map[String, Long]]) = {
          val offsets = jsonOffsets.head._2
          val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]()
          offsets.keys.foreach(partition => {
            val tp = new TopicPartition(topic, partition.toInt)
            val oam = new OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue())
            topicPartitionMap.put(tp, oam)
          })
          topicPartitionMap
        }
      
      
        private def commitConsumerOffset(endOffset: String, eventName: String) = {
          val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, Map[String, Long]]])
          val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets)
          if (!registeredConsumerGroups.contains(eventName)) {
            print("topic consumer not created! creating")
            val kafkaConsumer = new KafkaConsumer[String, String](getKafkaProp(eventName))
            kafkaConsumer.commitSync(topicPartitionMap)
            kafkaConsumer.close()
          } else {
            print("committing cg offsets using admin")
            kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, topicPartitionMap).all().get()
          }
        }
      }

       

       

      HTTP ERROR 500 java.lang.NullPointerException

      URI: /StreamingQuery/active
      STATUS: 500
      MESSAGE: java.lang.NullPointerException
      SERVLET: org.apache.spark.ui.JettyUtils$$anon$1-522a82dd
      CAUSED BY: java.lang.NullPointerException

      Caused by:

       

      java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.<init>(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.base/java.lang.Thread.run(Thread.java:829)
       
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            stym06 Satyam Raj
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: