Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10786

Stateful DoFn with Python sdk and DataFlow runner

Details

    Description

      Hello,

      This is related to : https://issues.apache.org/jira/browse/BEAM-9655

       

      We need stateful DoFn for some of our usescases (and migration from java to python sdk), and this feature seems not fully implemented on Direct runner and Dataflow runner.

       

      To see more clearly on this, we decided to create a pipeline (based on wordcount example) for testing all combinations between StateSpec type (Bag; Combining value; Timer); Mode (Batch; Streaming) and Runner (Direct; Dataflow).

       

      Results :

      Runner Mode StateSpec Result Error JobId
      Direct Batch Timer; TimeDomain.REALTIME KO #1  
      Direct Batch Timer; TimeDomain.WATERMARK OK    
      Direct Batch CombiningValue OK    
      Direct Batch Bag OK    
      Direct Streaming Timer; TimeDomain.REALTIME OK    
      Direct Streaming Timer; TimeDomain.WATERMARK OK    
      Direct Streaming CombiningValue OK    
      Direct Streaming Bag OK    
      Dataflow Batch Timer; TimeDomain.REALTIME KO #2 2020-08-20_08_14_07-5985905092341835149
      Dataflow Batch Timer; TimeDomain.WATERMARK KO #2 2020-08-20_08_14_51-227797524346310138
      Dataflow Batch CombiningValue KO #2 2020-08-20_08_15_46-14394222017890152995
      Dataflow Batch Bag KO #2 2020-08-20_08_17_20-2307047231213658649
      Dataflow Streaming Timer; TimeDomain.REALTIME KO #3 2020-08-20_08_47_37-6883008099159189108
      Dataflow Streaming Timer; TimeDomain.WATERMARK KO #3 2020-08-20_08_46_48-7341546514472681857
      Dataflow Streaming CombiningValue OK    
      Dataflow Streaming Bag OK    

       

       

       

      Error #1 :

      Traceback (most recent call last):
        File "test_stateful.py", line 142, in <module>
          run()
        File "test_stateful.py", line 136, in run
          test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
          self.run().wait_until_finish()
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
          return self.runner.run_pipeline(self, self._options)
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 179, in run_via_runner_api
          self._check_requirements(pipeline_proto)
        File "/home/dstackowiak/.pyenv/versions/3.7.6/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 280, in _check_requirements
          raise NotImplementedError(timer.time_domain)
      NotImplementedError: 2

      Error #2 :

      Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 638, in do_work work_executor.execute() File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", line 649, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/common.py", line 943, in apache_beam.runners.common.DoFnRunner.__init__ Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.

      Error #3 :

      rror message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1004: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 959, in process_bundle output_stream = self.timer_data_channel.output_timer_stream( AttributeError: 'NoneType' object has no attribute 'output_timer_stream' 

       

      Pipeline Code :

      
      
      from __future__ import absolute_import
      
      import argparse
      import logging
      import re
      
      from time import time
      
      from past.builtins import unicode
      
      import apache_beam as beam
      from apache_beam.io import ReadFromText
      from apache_beam.io import WriteToText
      from apache_beam.options.pipeline_options import PipelineOptions
      from apache_beam.options.pipeline_options import SetupOptions
      from apache_beam.options.pipeline_options import StandardOptions
      from apache_beam.options.pipeline_options import GoogleCloudOptions
      import apache_beam.coders as coders
      import apache_beam.transforms.userstate as user_state
      from apache_beam.transforms.timeutil import TimeDomain
      from apache_beam.transforms.combiners import CountCombineFn
      import google.auth
      
      class WordExtractingDoFn(beam.DoFn):
        def process(self, element):
          return re.findall(r'[\w\']+', element, re.UNICODE)
      
      class TestStatefulTimerRealTime(beam.DoFn):
        STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.REAL_TIME)
      
        def process(self, word, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
          logging.info('Process Timer RealTime')
          stale_timer.set(time()+1)
      
        @user_state.on_timer(STALE_TIMER)
        def stale(self):
          logging.info('OK Timer RealTime')
          yield 1
      
      class TestStatefulTimerWatermark(beam.DoFn):
        STALE_TIMER = user_state.TimerSpec('stale', TimeDomain.WATERMARK)
      
        def process(self, word, w=beam.DoFn.WindowParam, stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
          logging.info('Process Timer Watermark')
          stale_timer.set(w.end)
      
        @user_state.on_timer(STALE_TIMER)
        def stale(self):
          logging.info('OK Timer Watermark')
          yield 1
      
      class TestStatefulCombiningValue(beam.DoFn):
        COUNT_STATE = user_state.CombiningValueStateSpec('count',coders.VarIntCoder(), CountCombineFn())
      
        def process(self, word,count_state=beam.DoFn.StateParam(COUNT_STATE)):
          logging.info('Process Combining Value : %s' % count_state.read())
          count_state.add(1)
      
      class TestStatefulBag(beam.DoFn):
        BAG_STATE = user_state.BagStateSpec('buffer', coders.VarIntCoder())
      
        def process(self, word, bag_state=beam.DoFn.StateParam(BAG_STATE)):
          logging.info('Process Bag length: %s' % sum(1 for word in bag_state.read()))
          bag_state.add(word[0])
      
      def run(argv=None, save_main_session=True):
        input_file='gs://dataflow-samples/shakespeare/kinglear.txt'
        input_topic='projects/pubsub-public-data/topics/shakespeare-kinglear'
      
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--stream_mode',
            dest='stream_mode',
            default='false',
            help='is streamming mode')
        parser.add_argument(
            '--timer_realtime',
            dest='timer_realtime',
            default='false',
            help='Test Stateful Timer; RealTime Domain')
        parser.add_argument(
            '--timer_watermark',
            dest='timer_watermark',
            default='false',
            help='Test Stateful Timer; Watermark Domain')
        parser.add_argument(
            '--combining_value',
            dest='combining_value',
            default='false',
            help='Test Stateful Combining Value')
        parser.add_argument(
            '--bag',
            dest='bag',
            default='false',
            help='Test Stateful Bag')
        known_args, pipeline_args = parser.parse_known_args(argv)
      
        is_streaming=True if known_args.stream_mode != 'false' else False
      
        pipeline_options = PipelineOptions(pipeline_args)
        _, pipeline_options.view_as(GoogleCloudOptions).project = google.auth.default()
        pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
        pipeline_options.view_as(StandardOptions).streaming = is_streaming
      
        # The pipeline will be run on exiting the with block.
        with beam.Pipeline(options=pipeline_options) as p:
          if(is_streaming):
            words=p | "Read" >> beam.io.ReadFromPubSub(topic=input_topic)
            #words=p | "Read" >> beam.io.ReadFromPubSub(subscription=input_subscription)
          else:
            words=(
              p
              | 'Read' >> ReadFromText(input_file)
              | 'Split' >> beam.ParDo(WordExtractingDoFn()).with_output_types(unicode)
              )
      
          # Set key
          words=words | 'SetKey' >> beam.Map(lambda word:(1, words))
      
      
          # TESTS
          if known_args.timer_realtime == 'true':
            test_timer_realtime = words | 'Test timer realTime' >> beam.ParDo(TestStatefulTimerRealTime())
      
          if known_args.timer_watermark == 'true':
            test_timer_watermark = (words 
              | "window" >> beam.WindowInto(beam.window.FixedWindows(1)) 
              | 'Test timer watermark' >> beam.ParDo(TestStatefulTimerWatermark()))
      
          if known_args.combining_value == 'true':
            test_combining_value = words | 'Test combining value' >> beam.ParDo(TestStatefulCombiningValue())
      
          if known_args.bag == 'true':
            test_bag = words | 'Test Bag' >> beam.ParDo(TestStatefulBag())
      
      
      
      if __name__ == '__main__':
        logging.getLogger().setLevel(logging.DEBUG)
        run()
      
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            dstackowiak STACKOWIAK Denis
            Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: