Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-14135

BigQuery Storage API insert with writeResult retry and write to error table

Details

    • Improvement
    • Status: Open
    • P1
    • Resolution: Unresolved
    • None
    • None
    • io-java-gcp
    • None

    Description

      I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:

      bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
              .withExtendedErrorInfo();
      bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
              writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
                      HandleInsertError.of();

      and in  HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:

       

      @Override
      public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
       return input
               .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                       BigQueryInsertError bigQueryInsertError = c.element();
                       TableRow convertedRow = new TableRow();
                       convertedRow.set("error", bigQueryInsertError.getError().toString());
                       convertedRow.set("t",  CommonConverter.convertDate(new Date()));
                       convertedRow.set(UUID, bigQueryInsertError.getRow().get(UUID));
                       TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
                               bigQueryInsertError.getTable().getDatasetId(), errorTable);
                       c.output(KV.of(convertedRow,tableDestination));
                   }
               }))
               .apply(new BqInsertError());
      }
      
      
      
      

       

      I’m trying to change the write method to use the new one

      .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);

       

      but I get this error:

      When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must be specified
       
      
      
      
      

      even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
      https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-

       

      after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:

      Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts instead

       

      but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError>
      and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow>:

      1. we can get the insert error  bigQueryInsertError.getError()
      2. we can determine the projectid and dataset id by using 
             bigQueryInsertError.getTable().getProjectId(),
             bigQueryInsertError.getTable().getDatasetId()
             we need them because our pipeline is a multi tenant use case and to get                             those prarameters otherwise would require a lot of overhead.

      and also when I’m trying to run it with the getFailedInserts like that:

      bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
              .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
              .withTriggeringFrequency(Duration.standardSeconds(5))
              .withNumStorageWriteApiStreams(12);
      bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
              writeResult.getFailedInserts().apply("BQ-insert-error-write",
                      HandleStorageWriteApiInsertError.of();
       
      
      
      
      

      I get the following error:
       

      Record-insert retry policies are not supported when using BigQuery load jobs.

       

      but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors

      So first i think  there is a something missing in the implementation of that write method  that makes the retry feature not supported,

      and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API

      if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs

      Thanks!

      Attachments

        Activity

          People

            Unassigned Unassigned
            yoni.be Yoni Bendayan
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: