Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13379

Incrementing a counter from a Python subthread doesn't seem to do anything

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.34.0
    • None
    • sdk-py-harness
    • None
    • Debian 5.10.46-5rodete1 (2021-09-28) x86_64 GNU/Linux

    Description

      For example:

      from concurrent import futures
      import threading
      
      from absl import app
      from absl import logging
      import apache_beam as beam
      
      NAMESPACE = 'METRICS_THREADS_REPRO'
      
      main_thread_counter = beam.metrics.Metrics.counter(
          NAMESPACE, 'main_thread_counter')
      sub_thread_counter = beam.metrics.Metrics.counter(
          NAMESPACE, 'sub_thread_counter')
      
      
      def increment_counter(counter):
        counter.inc()
        logging.info('Incremented counter %s from thread %s',
                     counter.metric_name, threading.current_thread().name)
      
      
      class IncrementCountersFn(beam.DoFn):
      
        def setup(self):
          self.executor = futures.ThreadPoolExecutor()
      
        def process(self, idx):
          increment_counter(main_thread_counter)
          self.executor.submit(increment_counter, sub_thread_counter).result()
          logging.info('Processed %i', idx)
      
      
      def main(argv):
        if len(argv) > 1:
          raise app.UsageError('Too many command-line arguments.')
      
        p = beam.Pipeline()
        _ = (
            p
            | 'Create' >> beam.Create(range(100))
            | 'Process' >> beam.ParDo(IncrementCountersFn()))
      
        result = p.run()
        result.wait_until_finish()
      
        filter_by_namespace = beam.metrics.MetricsFilter().with_namespace(NAMESPACE)
        filtered_metrics = result.metrics().query(filter_by_namespace)
        logging.info('Pipeline finished, metrics logged: %s', filtered_metrics)
      
      if __name__ == '__main__':
        app.run(main)
      

      Only main_thread_counter is incremented, not sub_thread_counter:

      I1203 18:38:56.394423 140078103397056 pipeline.py:48] Pipeline finished, metrics logged: {'counters': [MetricResult(key=MetricKey(step=Process, metric=MetricName(namespace=METRICS_THREADS_REPRO, name=main_thread_
      counter), labels={}), committed=100, attempted=100)], 'distributions': [], 'gauges': []}
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            alimuldal Alistair Muldal
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: