Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13965

Polymorphic types not supported in PipelineOptions

Details

    • Improvement
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.38.0
    • sdk-java-core
    • None

    Description

      I noticed that polymorphic types using @JsonTypeInfo and @JsonSubTypes are currently not supported in pipeline options as deserialization fails lacking necessary type information. One has to provide a de/serializer and handle things manually.

      Looks like the deserialization code path should just follow the serialization path.
       

      diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
      --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java	(revision 53f5a3c1756509b6fab75d3946a9f95247d02184)
      +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java	(date 1645114275316)
      @@ -1725,7 +1725,8 @@
           }
         }
       
      -  private static JsonDeserializer<Object> computeDeserializerForMethod(Method method) {
      +  private static Optional<JsonDeserializer<Object>> computeCustomDeserializerForMethod(
      +      Method method) {
           try {
             BeanProperty prop = createBeanProperty(method);
             AnnotatedMember annotatedMethod = prop.getMember();
      @@ -1736,16 +1737,10 @@
                     .getAnnotationIntrospector()
                     .findDeserializer(annotatedMethod);
       
      -      JsonDeserializer<Object> jsonDeserializer =
      +      return Optional.fromNullable(
                 DESERIALIZATION_CONTEXT
                     .get()
      -              .deserializerInstance(annotatedMethod, maybeDeserializerClass);
      -
      -      if (jsonDeserializer == null) {
      -        jsonDeserializer =
      -            DESERIALIZATION_CONTEXT.get().findContextualValueDeserializer(prop.getType(), prop);
      -      }
      -      return jsonDeserializer;
      +              .deserializerInstance(annotatedMethod, maybeDeserializerClass));
           } catch (JsonMappingException e) {
             throw new RuntimeException(e);
           }
      @@ -1771,11 +1766,12 @@
          * JsonDeserialize} the specified deserializer from the annotation is returned, otherwise the
          * default is returned.
          */
      -  private static JsonDeserializer<Object> getDeserializerForMethod(Method method) {
      +  private static @Nullable JsonDeserializer<Object> getCustomDeserializerForMethod(Method method) {
           return CACHE
               .get()
               .deserializerCache
      -        .computeIfAbsent(method, PipelineOptionsFactory::computeDeserializerForMethod);
      +        .computeIfAbsent(method, PipelineOptionsFactory::computeCustomDeserializerForMethod)
      +        .orNull();
         }
       
         /**
      @@ -1796,10 +1792,13 @@
             return null;
           }
       
      +    JsonDeserializer<Object> jsonDeserializer = getCustomDeserializerForMethod(method);
      +    if (jsonDeserializer == null) {
      +      return DESERIALIZATION_CONTEXT.get().readTreeAsValue(node, method.getReturnType());
      +    }
      +
           JsonParser parser = new TreeTraversingParser(node, MAPPER);
           parser.nextToken();
      -
      -    JsonDeserializer<Object> jsonDeserializer = getDeserializerForMethod(method);
           return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.get());
         }
       
      @@ -2055,7 +2054,8 @@
           private final Map<Set<Class<? extends PipelineOptions>>, Registration<?>> combinedCache =
               Maps.newConcurrentMap();
       
      -    private final Map<Method, JsonDeserializer<Object>> deserializerCache = Maps.newConcurrentMap();
      +    private final Map<Method, Optional<JsonDeserializer<Object>>> deserializerCache =
      +        Maps.newConcurrentMap();
       
           private final Map<Method, Optional<JsonSerializer<Object>>> serializerCache =
               Maps.newConcurrentMap();
      
      

      Attachments

        Issue Links

          Activity

            People

              mosche Moritz Mack
              mosche Moritz Mack
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h