Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8303

Filesystems not properly registered using FileIO.write() on FlinkRunner

Details

    Description

      I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry *should* be properly seeded by the time the graph is compiled and executed:

       java.lang.IllegalArgumentException: No filesystem found for scheme s3
          at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
          at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
          at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
          at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
          at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
          at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
          at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
          at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
          at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
          at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
          at java.lang.Thread.run(Thread.java:748)
       

      For reference, the write code resembles this:

       FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
                      .via(ParquetIO.sink(schema))
                      .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
                      .withSuffix(".parquet");
      
      records.apply(String.format("Write(%s)", options.getOutputDir()), write);

      The issue does not appear to be related to ParquetIO.sink().  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.  Moreover, AvroIO is affected if withWindowedWrites() option is added.

      Just trying some different knobs, I went ahead and set the following option:

      write = write.withNoSpilling();

      This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:

       java.lang.IllegalArgumentException: No filesystem found for scheme s3
          at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
          at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
          at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
          at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
          at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
          at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
          at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
          at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
          at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
          at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
          at java.lang.Thread.run(Thread.java:748)

       

      And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):

       write = write.withIgnoreWindowing(); 

      This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?

       

      In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313   

       

      Is it possible that I have hit upon a couple scenarios where that has not taken place?

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              preston Preston Koprivica
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 50m
                  2h 50m