Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
Description
Template parameters don't work if they are only used in DoFns but not anywhere else in main.
Sample pipeline:
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; public class BugPipeline { public interface Options extends PipelineOptions { ValueProvider<String> getFoo(); void setFoo(ValueProvider<String> foo); } public static void main(String[] args) throws Exception { Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1)).apply(ParDo.of(new DoFn<Integer, String>() { @ProcessElement public void processElement(ProcessContext context) { System.out.println(context.getPipelineOptions().as(Options.class).getFoo()); } })); p.run(); } }
Option "foo" is not used anywhere else than the DoFn. So to reproduce the problem:
$java BugPipeline --project=$PROJECT --stagingLocation=$STAGING --templateLocation=$TEMPLATE --runner=DataflowRunner $gcloud dataflow jobs run $NAME --gcs-location=$TEMPLATE --parameters=foo=bar
it will fail w/ this error:
ERROR: (gcloud.dataflow.jobs.run) INVALID_ARGUMENT: (2621bec26c2488b7): The workflow could not be created. Causes: (2621bec26c248dba): Found unexpected parameters: ['foo' (perhaps you meant 'zone')] - '@type': type.googleapis.com/google.rpc.DebugInfo detail: "(2621bec26c2488b7): The workflow could not be created. Causes: (2621bec26c248dba):\ \ Found unexpected parameters: ['foo' (perhaps you meant 'zone')]"
The underlying problem is that ProxyInvocationHandler.java only populate options which are "invoked" to the pipeline option map in the job object:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java#L159
One way to solve it is to save all ValueProvider type of params in the pipelineoptions section. Alternatively, some registration mechanism can be introduced.
A current workaround is to annotate the parameter with
@Validation.Required
, which will call invoke() behind the scene.