Details
-
Improvement
-
Status: Open
-
P1
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
.withExtendedErrorInfo();
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
HandleInsertError.of();
and in HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:
@Override public PCollection<Void> expand(PCollection<BigQueryInsertError> input) { return input .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>() { @ProcessElement public void processElement(ProcessContext c) { BigQueryInsertError bigQueryInsertError = c.element(); TableRow convertedRow = new TableRow(); convertedRow.set("error", bigQueryInsertError.getError().toString()); convertedRow.set("t", CommonConverter.convertDate(new Date())); convertedRow.set(UUID, bigQueryInsertError.getRow().get(UUID)); TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(), bigQueryInsertError.getTable().getDatasetId(), errorTable); c.output(KV.of(convertedRow,tableDestination)); } })) .apply(new BqInsertError()); }
I’m trying to change the write method to use the new one
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
but I get this error:
When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must be specified
even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-
after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:
Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts instead
but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError>
and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow>:
1. we can get the insert error bigQueryInsertError.getError()
2. we can determine the projectid and dataset id by using
bigQueryInsertError.getTable().getProjectId(),
bigQueryInsertError.getTable().getDatasetId()
we need them because our pipeline is a multi tenant use case and to get those prarameters otherwise would require a lot of overhead.
and also when I’m trying to run it with the getFailedInserts like that:
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(5))
.withNumStorageWriteApiStreams(12);
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
writeResult.getFailedInserts().apply("BQ-insert-error-write",
HandleStorageWriteApiInsertError.of();
I get the following error:
Record-insert retry policies are not supported when using BigQuery load jobs.
but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors
So first i think there is a something missing in the implementation of that write method that makes the retry feature not supported,
and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API
if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs
Thanks!