Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Won't Fix
-
None
-
windows 10 (dataflow runner)
Description
I have dataflow job that is written using apache beam.Here I am loading the data from one table to another table the mode of table is write truncate and the table contains sensitive data so table configured with kms key. I am following this code and trying to fetch the kms key associated with table and setting this key at the time of (BigQueryIO.write) while writing data into table.
{{finalOutput.apply("Write success rows to Sensitive BigQuery",
BigQueryIO.writeTableRows().withoutValidation()
.to(options.getTargetTable())
.withKmsKey(NestedValueProvider.of(options.getTargetTable(),new FetchingKMSKey(options.getProject())).toString()).withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));}}
{{public class FetchingKMSKey implements SerializableFunction<String, String>
{
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("dataMainPipeline");
public static final Logger LOG = LoggerFactory.getLogger(FetchingKMSKey.class);
Table table ;
String tableName;
String project;
public FetchingKMSKey(String project)
public String apply(String tableName)
{
String[] name= tableName.split(Pattern.quote("."));
try
{ table = BQ_CLIENT.getTableResource(project,name[0],name[1]); }catch (IOException e)
{ LOG.error(String.format("exception occured: %s", e.getMessage())); }catch (InterruptedException e)
{ LOG.error(String.format("exception occured: %s", e.getMessage())); } String kmsKey =table.getEncryptionConfiguration().getKmsKeyName();
return kmsKey;
}
}}}
{{}}
After creating a template when I am running the job I am getting error.
{{"location" : "US",
"errors" : [
{ "message" : "The KMS key does not contain a location.", "reason" : "invalid" }],}}
I am completely stuck. anyone could help?