Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13202

Add Coder to CountIfFn.Accum

Details

    • Improvement
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • 2.33.0
    • 2.38.0
    • dsl-sql
    • None
    • GCP Dataflow

    Description

      When using

      org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf 
      

      with dataflow-runner I got the following issue:

      [WARNING] 
      java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
          at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
          at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
          at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
          at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
          at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke (Method.java:566)
          at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
          at java.lang.Thread.run (Thread.java:829)
      Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
          at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
          at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
          at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
          at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
          at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
          at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
          at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
          at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
          at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
          at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke (Method.java:566)
          at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
          at java.lang.Thread.run (Thread.java:829)
      Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
          at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
          at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
          at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
          at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
          at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
          at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
          at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
          at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
          at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
          at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
          at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
          at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
          at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
          at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
          at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
          at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
          at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
          at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
          at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
          at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
          at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke (Method.java:566)
          at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
          at java.lang.Thread.run (Thread.java:829)
      [INFO] ------------------------------------------------------------------------
      [INFO] BUILD FAILURE
      [INFO] ------------------------------------------------------------------------
      [INFO] Total time:  57.006 s
      [INFO] Finished at: 2021-10-25T13:54:23Z
      [INFO] ------------------------------------------------------------------------
      [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]
      

      So I found out that

      CountIfFn.Accum
      

      on Beam seems not to have an associated Coder.

      FYI - iemejia

      Attachments

        Issue Links

          Activity

            People

              iemejia Ismaël Mejía
              patitonav Patricio Navarro
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 10m
                  2h 10m