Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12904

Python DataflowRunner uses always default app_profile_id when writing to BigTable, when using custom write fn

Details

    • Improvement
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.28.0, 2.32.0
    • None
    • io-py-gcp, runner-dataflow
    • Default Python SDK image for environment is apache/beam_python3.7_sdk:2.32.0
      Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37:2.32.0

    Description

       

      There are 2 things:
      1. apache_beam.io.gcp.bigtableio.WriteToBigTable has no support for custom App profiles at all
      2. i've added support to custom DoFn, its passed correctly and works on DirectRunner, and even shows correct passed params in Dataflow logs, but still uses 'default' app_profile_id.
      Its easy to trigger just by passing not-existent app_profile_id: DirectRunner crashes with error, DataflowRunner uses 'default' and crashes if 'default' is multi-cluster routing and/or transactional writes are disabled.
      BigTable needs to use single-cluster routing to support transactional writes (read-modify-write, check-and-mutate). Thats why i need to use in 1 case custom app_profile_id.
      Custom write func:

      from datetime import datetime, timezone
      import logging
      
      import apache_beam as beam
      from apache_beam.metrics import Metrics
      from apache_beam.transforms.display import DisplayDataItem
      from google.cloud.bigtable import Client, row_filters
      
      class BigTableWriteIfNotExistsConditionalFn(beam.DoFn):
        def __init__(self, project_id, instance_id, app_profile_id, table_id, column_family, column: str):
          super(BigTableWriteIfNotExistsConditionalFn, self).__init__()
          self.beam_options = {
              'project_id': project_id,
              'instance_id': instance_id,
              'app_profile_id': app_profile_id,
              'table_id': table_id,
              'column_family': column_family,
              'column': column,
          }
          self.table = None
          self.written = Metrics.counter(self.__class__, 'Written Row')
      
        def __getstate__(self):
          return self.beam_options
      
        def __setstate__(self, options):
          self.beam_options = options
          self.table = None
          self.written = Metrics.counter(self.__class__, 'Written Row')
      
        def start_bundle(self):
          if self.table is None:
            client = Client(project=self.beam_options['project_id'])
            instance = client.instance(self.beam_options['instance_id'])
      
          # add admin=True param in client ininitialization and uncomment below 
          # for profile in instance.list_app_profiles():
          #   logging.info('Profile name: %s', profile.name)
          #   logging.info('Profile desc: %s', profile.description)
          #   logging.info('Routing policyt type: %s', profile.routing_policy_type)
          #   logging.info('Cluster id: %s', profile.cluster_id)
          #   logging.info('Transactional writes: %s', profile.allow_transactional_writes)
      
            self.table = instance.table(table_id=self.beam_options['table_id'], app_profile_id=self.beam_options['app_profile_id'])
      
        def process(self, kvmessage):
          self.written.inc()
      
          row_key, value = kvmessage
      
          row_filter = row_filters.RowFilterChain(
              filters=[row_filters.FamilyNameRegexFilter(self.beam_options['column_family']),
                       row_filters.ColumnQualifierRegexFilter(self.beam_options['column']),
                      ])
          bt_row = self.table.conditional_row(row_key=row_key, filter_=row_filter)
          params = {'column_family_id': self.beam_options['column_family'], 'column': self.beam_options['column'], 'value': value, 'timestamp': datetime.fromtimestamp(0, timezone.utc), 'state': False}
          bt_row.set_cell(**params)
          bt_row.commit()
      
      
        def finish_bundle(self):
          pass
      
        def display_data(self):
          return {
              'projectId': DisplayDataItem(
                  self.beam_options['project_id'], label='Bigtable Project Id'),
              'instanceId': DisplayDataItem(
                  self.beam_options['instance_id'], label='Bigtable Instance Id'),
              'tableId': DisplayDataItem(
                  self.beam_options['table_id'], label='Bigtable Table Id')
          }
      

       It processes Tuple[string, string] messages, where first string is BigTable row_key and second is cell value

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mitgath Krzysztof Korzeniewski
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 10m
                  3h 10m