Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15935

Unable to use watermark when depends both on flink planner and blink planner

    XMLWordPrintableJSON

Details

    Description

      Run the following code in module `flink-examples-table` (must be under this module)

      /*
       * Licensed to the Apache Software Foundation (ASF) under one
       * or more contributor license agreements.  See the NOTICE file
       * distributed with this work for additional information
       * regarding copyright ownership.  The ASF licenses this file
       * to you under the Apache License, Version 2.0 (the
       * "License"); you may not use this file except in compliance
       * with the License.  You may obtain a copy of the License at
       *
       *     http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      package org.apache.flink.table.examples.java;
      
      
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.java.StreamTableEnvironment;
      
      /**
       * javadoc.
       */
      public class TableApiExample {
      
         /**
          *
          * @param args
          * @throws Exception
          */
         public static void main(String[] args) throws Exception {
      
            StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
            bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
               "    status  STRING,\n" +
               "    direction STRING,\n" +
               "    event_ts TIMESTAMP(3),\n" +
               "    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
               ") WITH (\n" +
               "  'connector.type' = 'kafka',       \n" +
               "  'connector.version' = 'universal',    \n" +
               "  'connector.topic' = 'generated.events2',\n" +
               "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
               "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
               "  'connector.properties.group.id' = 'testGroup',\n" +
               "  'format.type'='json',\n" +
               "  'update-mode' = 'append'\n" +
               ")\n");
      
         }
      }
       

       

      And hit the following error:

      Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930) at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25 more 

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              zjffdu Jeff Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m