Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8029

Using BigQueryIO.read with DIRECT_READ causes Illegal Mutation

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.14.0
    • None
    • io-java-gcp
    • None

    Description

       

      Code to read from BigQuery that is causing the issue:

      pipeline
          .apply(BigQueryIO
          .read(SchemaAndRecord::getRecord)
          .from(options.getTableRef())
          .withMethod(Method.DIRECT_READ)
          .withCoder(AvroCoder.of(schema)))
      

      If we remove .withMethod(Method.DIRECT_READ) then there is no issue.

       

      The error is:

      org.apache.beam.sdk.util.IllegalMutationException: PTransform BigQueryIO.TypedRead/Read(BigQueryStorageTableSource) mutated value {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 52.0, "sample_time": 1564412307969368, "humidity": 74.3} after it was output (new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}). Values must not be mutated in any way after being output.
          at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit (ImmutabilityCheckingBundleFactory.java:134)
          at org.apache.beam.runners.direct.EvaluationContext.commitBundles (EvaluationContext.java:210)
          at org.apache.beam.runners.direct.EvaluationContext.handleResult (EvaluationContext.java:151)
          at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult (QuiescenceDriver.java:262)
          at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:189)
          at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:126)
          at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
          at java.util.concurrent.FutureTask.run (FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
          at java.lang.Thread.run (Thread.java:748)
      Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 52.0, "sample_time": 1564412307969368, "humidity": 74.3} mutated illegally, new value was {"device_id": "rpi-rpi0-thermostat", "temperature_c": 20.0, "temperature_f": 52.0, "sample_time": 1564412360458615, "humidity": 74.7}. Encoding was AiZycGktcnBpMC10aGVybW9zdGF0AgAAAAAAADRAAgAAAAAAAEpAArDVsP7jtMcFAjMzMzMzk1JA, now AiZycGktcnBpMC10aGVybW9zdGF0AgAAAAAAADRAAgAAAAAAAEpAAu6FuLDktMcFAs3MzMzMrFJA.
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation (MutationDetectors.java:153)
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions (MutationDetectors.java:148)
          at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified (MutationDetectors.java:123)
          at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit (ImmutabilityCheckingBundleFactory.java:124)
          at org.apache.beam.runners.direct.EvaluationContext.commitBundles (EvaluationContext.java:210)
          at org.apache.beam.runners.direct.EvaluationContext.handleResult (EvaluationContext.java:151)
          at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult (QuiescenceDriver.java:262)
          at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle (DirectTransformExecutor.java:189)
          at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:126)
          at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
          at java.util.concurrent.FutureTask.run (FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
          at java.lang.Thread.run (Thread.java:748)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            clarsen Chris Larsen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: