Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
Description
I have an existing Apache Beam project with Java 8, Apache Beam 2.27.0, Maven and Dagger 2.
I migrated this project in Kotlin : Kotlin JDK 8 with version 1.5.0.
I used the 1.5.0 version of Kotlin because the 1.4.3 had an issue with Beam and Maven plugin (Could not read class: VirtualFile : https://stackoverflow.com/questions/66170900/kotlin-1-4-30-apache-beam-compilation-error)
Everything seems to be good except the use of native MapElement or FlatMapElement with Typedescriptor and lambda expression.
A part of my pom.xml file
<properties> <beam.version>2.27.0</beam.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kotlin.code.style>official</kotlin.code.style> <kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget> <kotlin.compiler.incremental>true</kotlin.compiler.incremental> <kotlin.version>1.5.0</kotlin.version> <serialization.version>1.2.0</serialization.version> <java.version>1.8</java.version> <dagger.version>2.35.1</dagger.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> <properties> <dependencies> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> <version>${kotlin.version}</version> </dependency> <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-serialization-json</artifactId> <version>${serialization.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-redis</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test-junit</artifactId> <version>${kotlin.version}</version> <scope>test</scope> </dependency> <dependencies> <build> <plugins> <plugin> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-maven-plugin</artifactId> <version>${kotlin.version}</version> <executions> <execution> <id>kapt</id> <goals> <goal>kapt</goal> </goals> <configuration> <sourceDirs> <sourceDir>src/main/kotlin</sourceDir> </sourceDirs> <annotationProcessorPaths> <annotationProcessorPath> <groupId>com.google.dagger</groupId> <artifactId>dagger-compiler</artifactId> <version>${dagger.version}</version> </annotationProcessorPath> </annotationProcessorPaths> </configuration> </execution> <execution> <id>compile</id> <phase>process-sources</phase> <goals> <goal>compile</goal> </goals> <configuration> <sourceDirs> <sourceDir>src/main/kotlin</sourceDir> </sourceDirs> </configuration> </execution> <execution> <id>test-kapt</id> <goals> <goal>test-kapt</goal> </goals> <configuration> <sourceDirs> <sourceDir>src/test/kotlin</sourceDir> </sourceDirs> <annotationProcessorPaths> <annotationProcessorPath> <groupId>com.google.dagger</groupId> <artifactId>dagger-compiler</artifactId> <version>${dagger.version}</version> </annotationProcessorPath> </annotationProcessorPaths> </configuration> </execution> <execution> <id>test-compile</id> <goals> <goal>test-compile</goal> </goals> <configuration> <sourceDirs> <sourceDir>src/test/kotlin</sourceDir> <sourceDir>target/generated-sources/kapt/test</sourceDir> </sourceDirs> </configuration> </execution> </executions> <configuration> <compilerPlugins> <plugin>kotlinx-serialization</plugin> </compilerPlugins> </configuration> <dependencies> <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-maven-serialization</artifactId> <version>${kotlin.version}</version> </dependency> </dependencies> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>${maven-surefire-plugin.version}</version> <dependencies> <dependency> <groupId>org.apache.maven.surefire</groupId> <artifactId>surefire-junit47</artifactId> <version>${maven-surefire-plugin.version}</version> </dependency> </dependencies> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>${maven-exec-plugin.version}</version> <configuration> <cleanupDaemonThreads>false</cleanupDaemonThreads> </configuration> </plugin> </plugins>
An object that implements Serializable (java.io)
data class MyObject( val field: String = "" ) : Serializable {
And basically i want to execute a FlatMapElement with Typedescriptor and a lambda (behind the scene a SerializableFunction)
class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) : PTransform<PBegin, PCollection<MyObject>>() { override fun expand(input: PBegin): PCollection<MyObject> { return input .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*")) .apply( FlatMapElements.into(of(MyObject::class.java)) .via(SerializableFunction<KV<String, String>, List<MyObject>> \\{ toMyObjects(it) } ) ) } fun toMyObjects(entry: KV<String, String>): List<MyObject> { val key = entry.key val value = entry.value val ref = object : TypeReference<List<MyObject>>() {} return OBJECT_MAPPER.readValue(value, ref) }
I volontary changed the code and put some part of code in method "toMyObjects" in order to give the maximum of elements.
The "OBJECT_MAPPER" object is a Jackson Object Mapper.
With Java 8 and Beam 2.27.0 this basic code works perfectly fine.
With Kotlin this code doesn't works with the following error :
Unable to find source-code formatter for language: text. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yaml at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59) at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692) at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218) at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814) at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214) at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163) at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429) at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239) at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175) at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239) at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213) at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468) at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933) at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308) at myPackage.MyApp.main (MyApp.kt:44) at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke (Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282) at java.lang.Thread.run (Thread.java:748) Caused by: java.io.NotSerializableException: Non-serializable lambda at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source) [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project: An exception occured while executing the Java class. unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70, mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
The SerializableUtils.serializeToByteArray method in Beam sdk sends this error : java.io.NotSerializableException: Non-serializable lambda
MyObject is Serializable and the lambda is wrapped in a Beam SerializableFunction (function that implements Serializable).
Normally in this case, Beam take a SerializableCoder from the Serializable object.
I don't understand why Beam saw the lambda as non Serializable.
I don't have this kind of behaviour directly in Java.
I precise, if i replace the FlatMapElement/descriptor/lambda by a ParDo.of(DoFn), this works fine, but in some cases for a better concision and readabilty, i want to use the built in MapElement and FlatMapElement with lambda expressions.
Thanks in advance for your help.
I did the same test with a little project and only the required dependencies (Kotlin 1.5.0 and Beam 2.27.0 with Maven) and i have exactly the same issue.
There is also a Stackoverflow link : https://stackoverflow.com/questions/67341499/flatmapelement-kotlin-beam-non-serializable-lambda