Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16552

Cannot include Option fields in any Table join

    XMLWordPrintableJSON

Details

    Description

      The table API currently fails joins where one of the tables has an option type, even though it is not in the join condition. A reproducible test case:

       

      object TestJoinWithOption {
        case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int])
        case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int])
        def main(args: Array[String]): Unit = {
          val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
          val tEnv = TableEnvironment.getTableEnvironment(sEnv)
      
          val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1))))
          val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2))))
      
          val t1 = tEnv.fromDataStream(testStream1)
          val t2 = tEnv.fromDataStream(testStream2)
      
          val result = t1.join(t2, "joinKeyOne = joinKeyTwo")
          result.toAppendStream[Row].print()
      
          sEnv.execute()
        }
      }
      

      Result:

      Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method. at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153) at org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147) at org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:56) at org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:45) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112)
      

      It seems as though this issue has been brought up before in the streams API here: https://issues.apache.org/jira/browse/FLINK-2673

      Expected behaviour: Since the join condition does not contain the option, the resulting schema should look like this (Actually, this was created by result.printSchema)

      root
       |-- joinKeyOne: String
       |-- otherFieldOne: Option[Integer]
       |-- joinKeyTwo: String
       |-- otherFieldTwo: Option[Integer] 

      Actual behaviour: Runtime exception is thrown above.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jasoncsinn Jason Sinn
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: