Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13577

Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.32.0, 2.33.0, 2.34.0, 2.35.0, 2.36.0
    • 2.37.0
    • dsl-sql, sdk-java-core
    • None

    Description

      We use BeamSQL in our project. When we use any JOIN. SQL generates BeamCoGBKJoinRel plan which uses Select from core sdk. While Select infer output schema it loses nullability of complex types such as Array, Map. You can see an example error. 

      INFO: SQL:
      SELECT `o1`.`order_id`, `o1`.`site_id`, `o1`.`price`, `o1`.`f_stringArr`, `o2`.`order_id` AS `order_id0`, `o2`.`site_id` AS `site_id0`, `o2`.`price` AS `price0`
      FROM `beam`.`ORDER_DETAILS1_WITH_ARRAY` AS `o1`
      INNER JOIN `beam`.`ORDER_DETAILS2` AS `o2` ON `o1`.`order_id` = `o2`.`site_id` AND `o2`.`price` = `o1`.`site_id`
      Dec 28, 2021 1:20:14 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
      INFO: SQLPlan>
      LogicalProject(order_id=[$0], site_id=[$1], price=[$2], f_stringArr=[$3], order_id0=[$4], site_id0=[$5], price0=[$6])
        LogicalJoin(condition=[AND(=($0, $5), =($6, $1))], joinType=[inner])
          BeamIOSourceRel(table=[[beam, ORDER_DETAILS1_WITH_ARRAY]])
          BeamIOSourceRel(table=[[beam, ORDER_DETAILS2]])Dec 28, 2021 1:20:14 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
      INFO: BEAMPlan>
      BeamCoGBKJoinRel(condition=[AND(=($0, $5), =($6, $1))], joinType=[inner])
        BeamIOSourceRel(table=[[beam, ORDER_DETAILS1_WITH_ARRAY]])
        BeamIOSourceRel(table=[[beam, ORDER_DETAILS2]])
      Types not equal. provided output schema: Fields:
      Field{name=order_id, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=site_id, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=price, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=f_stringArr, description=, type=ARRAY<STRING NOT NULL>, options={{}}}
      Field{name=order_id0, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=site_id0, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=price0, description=, type=INT32 NOT NULL, options={{}}}
      Encoding positions:
      {f_stringArr=3, price0=6, price=2, site_id=1, order_id0=4, order_id=0, site_id0=5}
      Options:{{}}UUID: null Schema inferred from select: Fields:
      Field{name=317499b1-9c8a-4bb9-8897-4ecda110d02a, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=46249f84-b89e-439d-b799-a039b427a60a, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=565d397c-d36c-4387-b2e4-5d6402c839bd, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=bbe10404-3c44-41a4-b942-16f484211dab, description=, type=ARRAY<STRING NOT NULL> NOT NULL, options={{}}}
      Field{name=bd3e3adf-ae12-4155-9770-b0123c8bb18c, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=2b2a14ed-dcd6-45d5-b49d-ae8d3037d6c6, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=60a346c5-fa18-40e1-819f-c06af92ff033, description=, type=INT32 NOT NULL, options={{}}}
      Encoding positions:
      {2b2a14ed-dcd6-45d5-b49d-ae8d3037d6c6=5, 60a346c5-fa18-40e1-819f-c06af92ff033=6, 565d397c-d36c-4387-b2e4-5d6402c839bd=2, bbe10404-3c44-41a4-b942-16f484211dab=3, bd3e3adf-ae12-4155-9770-b0123c8bb18c=4, 46249f84-b89e-439d-b799-a039b427a60a=1, 317499b1-9c8a-4bb9-8897-4ecda110d02a=0}
      Options:{{}}UUID: null from input type: Fields:
      Field{name=lhs, description=, type=ROW<order_id INT32 NOT NULL, site_id INT32 NOT NULL, price INT32 NOT NULL, f_stringArr ARRAY<STRING NOT NULL>> NOT NULL, options={{}}}
      Field{name=rhs, description=, type=ROW<order_id INT32 NOT NULL, site_id INT32 NOT NULL, price INT32 NOT NULL> NOT NULL, options={{}}}
      Encoding positions:
      {lhs=0, rhs=1}
      Options:{{}}UUID: a35cf07b-2bc1-48b8-b229-3c2368993738
      java.lang.IllegalArgumentException: Types not equal. provided output schema: Fields:
      Field{name=order_id, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=site_id, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=price, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=f_stringArr, description=, type=ARRAY<STRING NOT NULL>, options={{}}}
      Field{name=order_id0, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=site_id0, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=price0, description=, type=INT32 NOT NULL, options={{}}}
      Encoding positions:
      {f_stringArr=3, price0=6, price=2, site_id=1, order_id0=4, order_id=0, site_id0=5}
      Options:{{}}UUID: null Schema inferred from select: Fields:
      Field{name=317499b1-9c8a-4bb9-8897-4ecda110d02a, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=46249f84-b89e-439d-b799-a039b427a60a, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=565d397c-d36c-4387-b2e4-5d6402c839bd, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=bbe10404-3c44-41a4-b942-16f484211dab, description=, type=ARRAY<STRING NOT NULL> NOT NULL, options={{}}}
      Field{name=bd3e3adf-ae12-4155-9770-b0123c8bb18c, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=2b2a14ed-dcd6-45d5-b49d-ae8d3037d6c6, description=, type=INT32 NOT NULL, options={{}}}
      Field{name=60a346c5-fa18-40e1-819f-c06af92ff033, description=, type=INT32 NOT NULL, options={{}}}
      Encoding positions:
      {2b2a14ed-dcd6-45d5-b49d-ae8d3037d6c6=5, 60a346c5-fa18-40e1-819f-c06af92ff033=6, 565d397c-d36c-4387-b2e4-5d6402c839bd=2, bbe10404-3c44-41a4-b942-16f484211dab=3, bd3e3adf-ae12-4155-9770-b0123c8bb18c=4, 46249f84-b89e-439d-b799-a039b427a60a=1, 317499b1-9c8a-4bb9-8897-4ecda110d02a=0}
      Options:{{}}UUID: null from input type: Fields:
      Field{name=lhs, description=, type=ROW<order_id INT32 NOT NULL, site_id INT32 NOT NULL, price INT32 NOT NULL, f_stringArr ARRAY<STRING NOT NULL>> NOT NULL, options={{}}}
      Field{name=rhs, description=, type=ROW<order_id INT32 NOT NULL, site_id INT32 NOT NULL, price INT32 NOT NULL> NOT NULL, options={{}}}
      Encoding positions:
      {lhs=0, rhs=1}
      Options:{{}}UUID: a35cf07b-2bc1-48b8-b229-3c2368993738
          at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
          at org.apache.beam.sdk.schemas.transforms.Select$Fields.expand(Select.java:205)
          at org.apache.beam.sdk.schemas.transforms.Select$Fields.expand(Select.java:157)
          at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
          at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
          at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:363)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel.standardJoin(BeamCoGBKJoinRel.java:196)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel.access$400(BeamCoGBKJoinRel.java:75)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel$StandardJoin.expand(BeamCoGBKJoinRel.java:135)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRel$StandardJoin.expand(BeamCoGBKJoinRel.java:93)
          at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
          at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:72)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:42)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BaseRelTest.compilePipeline(BaseRelTest.java:34)
          at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.testInnerJoin(BeamCoGBKJoinRelBoundedVsBoundedTest.java:83)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322)
          at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
          at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
          at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
          at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
          at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
          at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
          at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
          at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
          at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
          at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
          at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
          at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
          at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
          at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
          at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
          at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
          at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
          at java.base/java.lang.Thread.run(Thread.java:829) 

       

       

      Attachments

        Issue Links

          Activity

            People

              talat Talat Uyarer
              talat Talat Uyarer
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 4h 20m
                  4h 20m