Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28148

Unable to load jar connector to a Python Table API app

    XMLWordPrintableJSON

Details

    Description

      Background

      User currently unable to build & install the latest PyFlink and then load jars. The jar loading mechanism was introduced in FLINK-16943.

      Reproduction steps

      • Clone the latest Flink from the master branch.
      • Follow the Flink recommended steps to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, reproduced with: Maven 3.2.5, Python 3.7.
      • Create a new Python Table API app that loads in a jar, similar to:
      from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings
      env_settings = EnvironmentSettings.in_streaming_mode()
      t_env = StreamTableEnvironment.create(environment_settings=env_settings)
      t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 

       

      • The following alternative way of loading jars produce a similar issue:
      table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///path/to/your/jar.jar") 

       

      • The jar loaded here can be any jar, and the following message will appear:
      Traceback (most recent call last):
        File "pyflink_table_api_firehose.py", line 48, in <module>
          log_processing()
        File "pyflink_table_api_firehose.py", line 14, in log_processing
          t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
        File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set
          add_jars_to_context_class_loader(value.split(";"))
        File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader
          addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
        File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__
          answer, self.gateway_client, self.target_id, self.name)
        File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco
          return f(*a, **kw)
        File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
          format(target_id, ".", name), value)
      py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
      : java.lang.IllegalArgumentException: object is not an instance of declaring class
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
         at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
         at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
         at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
         at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
         at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
         at java.base/java.lang.Thread.run(Thread.java:829) 

       

      • Next do:
      pip uninstall apache-flink
      pip install apache-flink

      ...to downgrade it to 1.15 release.

      The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose).

      Reproduced on Mac and Amazon Linux 2.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              CrynetLogistics Zichen Liu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: