Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10100

FileIO writeDynamic with AvroIO.sink not writing all data

Details

    • Bug
    • Status: Open
    • P1
    • Resolution: Unresolved
    • 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0, 2.21.0, 2.22.0
    • None
    • Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
    • Important

    Description

      FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping records. This is with a very small test dataset - 6 records, which should produce 3 directories.

      Pipeline p = Pipeline.create(options);
      PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv"))
      .apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
      
      //write out into AVRO in each separate directory
      records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
        .by(KV::getKey)
        .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
        .to(options.getTargetPath())
        .withDestinationCoder(StringUtf8Coder.of())
        .withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));
      
      p.run().waitUntilFinish();
      

      If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently.

      e.g.

      // Initialise pipeline
      Pipeline p = Pipeline.create(options);
      
      PCollection<KV<String, String>> records = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn()));
      
      //write out into AVRO in each separate directory
      records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic()
          .by(KV::getKey)
          .via(Contextful.fn(KV::getValue), TextIO.sink())
          .to(options.getTargetPath())
          .withDestinationCoder(StringUtf8Coder.of())
          .withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
      
       p.run().waitUntilFinish();
      

      cc timrobertson100

      Attachments

        Activity

          People

            Unassigned Unassigned
            djtfmartin Dave Martin
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: