Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2641

One inflight commit rolling back other concurrent inflight commits causing them to fail

    XMLWordPrintableJSON

Details

    Description

      The bug is easily reproducible when running 4-5 concurrent writers at once. Atleast a few writers would end up failing with the following stack trace in Hudi 0.8.0:

      py4j.protocol.Py4JJavaError: An error occurred while calling o122.save.
      : java.lang.IllegalArgumentException
      	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
      	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396)
      	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377)
      	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154)
      	at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212)
      	at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185)
      	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
      	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:479)
      	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223)
      	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
      	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
      	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
      	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
      	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
      	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
      	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
      	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
      	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
      	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
      	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
      	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
      	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
      	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:282)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.lang.Thread.run(Thread.java:748)

       

      Based on my debugging, this is what is going on:

      • In our start commit/clean logic, we have a step where Hoodie rolls back any failed writes here. As part of the logic, it needs to identify instants that need to be rolled back. In multi writer case, it depends upon the heart beats (whether expired on not) to make a determination whether an inflight commit is a failed write or not and can be rolled back here.
      • And as part of the rolling back failed writes logic it makes this call to delete heartbeats of rolled back/expired commits here. This call is being made irrespective of whether or not there is any instant to rollback and the logic in that method is glaringly wrong. It just goes and deletes all the heartbeats of all the inflight commits.
      • So one commit while starting the commit process goes ahead and deletes all the heartbeats of other inflight commits. Meanwhile another commit has reached the cleaner stage where it again checks if there are any commits to rollback. Now, because of missing/deleted heartbeats of some of the other commits it assumes them to be expired/failed writes and starts rolling them back and deletes their INFLIGHT commit file on the timeline.
      • Ultimately, the inflight commits that are incorrectly rolled back end up failing with the above exception because INFLIGHT state of that commit no longer exists in the timeline, for it to be able to complete the commit process successfully.

      Attachments

        Issue Links

          Activity

            People

              uditme Udit Mehrotra
              uditme Udit Mehrotra
              sivabalan narayanan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: