Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.1
-
None
Description
Issue description
The summary of the issue is - when persisted DataFrame is used in two different concurrent threads, we are getting wrong value of internal.metrics.input.recordsRead in SparkListenerStageCompleted event.
Issue Details
The spark code I have written has 2 source temp hive tables. When the first temp table is read, it's dataframe is persisted. Whereas, for the other temp table, its source dataframe is not persisted. After that, we have 2 pipelines which we run in async fashion. In the 1st pipeline, the persisted dataframe is written to some hive target table. Whereas, in the 2nd pipeline, we are performing a UNION of persisted dataframe with non-persisted dataframe, which is then written to a separate hive table.
Our expectation is, since the first dataframe is persisted, its metric for recordsRead should be computed exactly once. But in our case, we are seeing an increased value of the metric.
Example - if my persisted dataframe has 2 rows, the above mentioned metric is consistently reporting it as 3 rows.
Steps to reproduce Issue:
- Create directory /tmp/INFA_UNION1 and copy input1.txt to this directory.
- Create directory /tmp/INFA_UNION2 and copy input2.txt to this directory.
- Run the following in spark-shell:
scala> :load asyncfactory.scala
scala> : paste -raw
package org.apache.spark import org.apache.spark.scheduler._ import org.apache.spark.util.JsonProtocol import org.json4s.jackson.JsonMethods._ class InfaListener(mode:String="ACCUMULATOR") extends org.apache.spark.scheduler.SparkListener { def onEvent(event: SparkListenerEvent): Unit = { val jv = JsonProtocol.sparkEventToJson(event) println(compact(jv)) } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { onEvent(stageCompleted)} override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { onEvent(stageSubmitted)} }
scala> :paste
import org.apache.spark.InfaListener implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d); val sqlc = spark.sqlContext val sc = spark.sparkContext val lis = new InfaListener("TAG") sc.addSparkListener(lis) sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`") sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION '/tmp/INFA_UNION1'") sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`") sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION '/tmp/INFA_UNION2'") sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`") sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)") sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`") sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)") val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true)); async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 FROM `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0"))); async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT tbl1.c0 as a0 FROM tbl1"), v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1"))); stop;
NOTE - The above code refers to 2 file directories /tmp/INFA_UNION1 and /tmp/INFA_UNION2. We have attached the files which need to be copied to the above locations after these directories are created.