Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47859

Why does javaRDD().mapPartitions lead to the memory leak in this case?

    XMLWordPrintableJSON

Details

    • IT Help
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.2, 3.5.0
    • None
    • Spark Core
    • None

    Description

      Hello Spark community. I have an Java Spark Structured Streaming application:
      Unless I am doing silly mistake, the JedisCluster closed in the finally block, but still some memory leak. 

      FlatMapFunction<Iterator<Row>, Row> myFunction = new MyFunction(jedisConfiguration);
      StructType  structSchema = getSchema();
      
      VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
              Dataset<Row> dataset = getDataset();
              dataset.persist();
              JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
              Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD, structSchema);
              parquetWriter.write(processedDS);
              dataset.unpersist();
          };
      
      DataStreamWriter<Row> dataStream = dataset
      .writeStream()
      .foreachBatch(forEachFunc)
      .outputMode(outputMode)
      .option("checkpointLocation", checkpointLocation);
      
      ....<stream dataStream> 

      And the function

      public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
      
         <constructor with jedisConfiguration parameter>...
      
          @Override
          public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
      
              List<Row> output;
              JedisCluster redis = new JedisCluster(jedisConfiguration);
      
              try {
                  output = new ArrayList<>();
      
                  while (rowIterator.hasNext()) {
                      Row row = rowIterator.next();
                      Long var1 = row.getAs("var1");
                      Long var2 = row.getAs("var2");
      
                      var redisKey = "some_key";
                      var result = redis.hgetAll(redisKey);
      
                      if (!result.isEmpty()) {
                          output.add(RowFactory.create(
                                  var1,
                                  var2,
                                  result.getOrDefault("some_id", null)));
                      }
                  }
              } finally {
                  if (redis != null) {
                      try {
                          redis.close();
                      } catch (Exception e) {
                          throw new RuntimeException("Failed to close Redis connection: " + e);
                      }
                  }
              }
              return output.iterator();
          }
      } 

      It actually works couple of days then dies. Can't figure out what does cause memory leak in the Driver?

      Tested with Spark 3.3.2 and 3.5.0

      Grafana board of the Driver's Memory Pool 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              leotim Leo Timofeyev
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: