Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25416

Build unified Parquet BulkFormat for both Table API and DataStream API

    XMLWordPrintableJSON

Details

    Description

      Background information

      Current AvroParquet implementation AvroParquetRecordFormat uses the high level API ParquetReader that does not provide offset information, which turns out the restoreReader logic has big room to improve.

      Beyond AvroParquetRecordFormat there is another format implementation ParquetVectorizedInputFormat w.r.t. the parquet which is coupled tightly with the Table API.

      It would be better to provide an unified Parquet BulkFormat with one implementation that can support both Table API and DataStream API.

       

      Some thoughts

      Use the low level API ParquetFileReader with BulkFormat directly like 'ParquetVectorizedInputFormat' did instead of with StreamFormat for the following reasons:

      • the read logic is built in the internal low level class InternalParquetRecordReader with package private visibility in parquet-hadoop lib which uses another low level class ParquetFileReader internally. This makes the implementation of StreamFormat very complicated. I think the design idea of StreamFormat is to simplify the implementation. They do not seem to work together.
      • ParquetFileReaderreads data in batch mode, i.e. PageReadStore pages = reader.readNextFilteredRowGroup();. If we build these logic into StreamFormat(AvroParquetRecordFormat in this case), AvroParquetRecordFormat has to take over the role InternalParquetRecordReader does, including but not limited to
        1. read PageReadStore in batch mode.
        2. manage PageReadStore, i.e. read next page when all records in the current page have been consumed and cache it.
        3. manage the read index within the current PageReadStore because StreamFormat has its own setting for read size, etc.
          All of these make AvroParquetRecordFormat become the BulkFormat instead of StreamFormat
      • StreamFormat can only be used via StreamFormatAdapter, which means everything we will do with the low level APIs for parquet-hadoop lib should have no conflict with the built-in logic provided by StreamFormatAdapter.

      Now we could see if we build these logics into a StreamFormat implementation, i.e. AvroParquetRecordFormat, all convenient built-in logic provided by the StreamFormatAdapter turns into obstacles. There is also a violation of single responsibility principle, i.e. AvroParquetRecordFormat }}will take some responsibility of {{{}BulkFormat. These might be the reasons why 'ParquetVectorizedInputFormat' implemented BulkFormat instead of StreamFormat.

      In order to build a unified parquet implementation for both Table API and DataStream API, it makes more sense to consider building these code into a BulkFormat implementation class. Since the output data types are different, RowData vs. Avro, extra converter logic should be introduced into the architecture design. Depending on how complicated the issue will be and how big the impact it will have on the current code base, a new FLIP might be required. 

      Following code piece were suggested by Arvid Heise for the next optimized AvroParquetReader:

      // Injected
                  GenericData model = GenericData.get();
                  org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
      
                  // Low level reader - fetch metadata
                  ParquetFileReader reader = null;
                  MessageType fileSchema = reader.getFileMetaData().getSchema();
                  Map<String, String> metaData = reader.getFileMetaData().getKeyValueMetaData();
      
                  // init Avro specific things
                  AvroReadSupport<T> readSupport = new AvroReadSupport<>(model);
                  ReadSupport.ReadContext readContext =
                          readSupport.init(
                                  new InitContext(
                                        conf,
                                          metaData.entrySet().stream()
                                                  .collect(Collectors.toMap(e -> e.getKey(), e -> Collections.singleton(e.getValue()))),
                                          fileSchema));
                  RecordMaterializer<T> recordMaterializer = readSupport.prepareForRead(conf, metaData, fileSchema, readContext);
                  MessageType requestedSchema = readContext.getRequestedSchema();
      
                  // prepare record reader
                  ColumnIOFactory columnIOFactory = new ColumnIOFactory(reader.getFileMetaData().getCreatedBy());
                  MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
      
                  // for recovery
                  while (...) {
                    reader.skipNextRowGroup();
                  }
      
                  // for reading
                  PageReadStore pages;
                  for (int block = 0; (pages = reader.readNextRowGroup()) != null; block++) {
                      RecordReader<T> recordReader = columnIO.getRecordReader(pages, recordMaterializer);
                      for (int i = 0; i < pages.getRowCount(); i++) {
                          T record = recordReader.read();
                          emit record;
                      }
                  } 

      Required features

      Attachments

        Issue Links

          Activity

            People

              jingge Jing Ge
              jingge Jing Ge
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: