Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10998

Write just one file per window with WriteToFiles transform

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.24.0
    • None
    • io-py-files, sdk-py-core
    • None
    • Important

    Description

       In this case all message from Pub/Sub topic need accumulate in one text file per window, however  WriteToFiles produce many file instead one

      input = (p
                       | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
                       | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
                       | 'Parse' >> beam.Map(parse_json)
                       | 'Data w' >> beam.WindowInto(
                          FixedWindows(60),
                          trigger=trigger.AfterWatermark(),
                          accumulation_mode=AccumulationMode.DISCARDING
                      )
                       | 'Group elements into windows' >> beam.Reshuffle()
                       )
      
              event_data = (input
                   | 'Filter events' >> beam.Filter(lambda x: x['t'] == 'event')
                   | 'Encode ' >> beam.Map(lambda x: json.dumps(x))
                   | 'Write to file's' >> fileio.WriteToFiles(
                          path='gs://some/gcs/bucket/',
                          file_naming=fileio.default_file_naming(
                              prefix='events',
                              suffix='.txt'
                          ),
                          shards=1
                      )
              )
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            andy_ap Andrey
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: