Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7446

Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a "too many open files" error

Details

    Description

      Reading from a Cloud Pub/Sub topic inside Python's DirectRunner results in a "too many open files" error within 2 minutes for a busy topic.

      I am not exactly sure, but it appears that "pubsub.SubscriberClient()" creates a grpc channel which must be closed explicitly after we are done pulling messages from that channel. This may have been due to change that was introduced in grpc v1.12.0 ("a new grpc.Channel.close method is introduced and correct use of gRPC Python now requires that channels be closed after use"). If the underling grcp channel is not closed and many "pubsub.SubscriberClient" instances are created, the system may run out of available file handles, resulting in "too many open files" errors. A similar issue is reported here: https://github.com/googleapis/google-cloud-python/issues/5523.

      The issue can be reproduced by running the following file:

      # directrunner_streaming_tmof.py
      from __future__ import print_function
      
      import multiprocessing
      import os
      import subprocess
      import time
      
      import apache_beam as beam
      from google.cloud import pubsub_v1
      
      
      def count_open_files():
          """Count the number of files opened by current process."""
          pid = multiprocessing.current_process().pid
          lsof_out = subprocess.check_output(["lsof", "-p", str(pid)])
          num_open_files = len(lsof_out.strip().split("\n")) - 1
          return num_open_files
      
      
      def start_streaming_pipeline(project_id, subscription_path):
          """Create a simple streaming pipeline."""
          runner = beam.runners.direct.DirectRunner()
          pipeline_options = beam.pipeline.PipelineOptions(project=project_id, streaming=True)
          taxirides_pc = (
              #
              beam.Pipeline(runner=runner, options=pipeline_options)
              | "Read" >> beam.io.ReadFromPubSub(subscription=subscription_path)
          )
          results = taxirides_pc.pipeline.run()
          return results
      
      
      def monitor():
          """Periodically print the number of open files."""
          start_time = time.time()
          for _ in range(20):
              num_open_files = count_open_files()
              time_elapsed = time.time() - start_time
              print(
                  "Time elapsed: {:<3s}s, Number of open files: {}".format(
                      str(round(time_elapsed, 0)), num_open_files
                  )
              )
              if num_open_files > 1000:
                  break
              time.sleep(5)
      
      
      if __name__ == "__main__":
          project_id = "{project_id}"
          topic_path = "projects/pubsub-public-data/topics/taxirides-realtime"
      
          client = pubsub_v1.SubscriberClient()
          subscription_path = client.subscription_path(project_id, "taxirides-realtime-sub")
      
          subscription = client.create_subscription(subscription_path, topic_path)
          print("Subscription created: {}".format(subscription_path))
          try:
              results = start_streaming_pipeline(project_id, subscription_path)
              monitor()
          finally:
              client.delete_subscription(subscription_path)
              print("Subscription deleted: {}".format(subscription_path))
              pass
      

      Currently, the output from running this script looks something like:

      Subscription created: projects/project_id/subscriptions/taxirides-realtime-sub
      Time elapsed: 0.0s, Number of open files: 160
      Time elapsed: 5.0s, Number of open files: 179
      Time elapsed: 11.0s, Number of open files: 247
      Time elapsed: 16.0s, Number of open files: 339
      Time elapsed: 21.0s, Number of open files: 436
      Time elapsed: 26.0s, Number of open files: 523
      Time elapsed: 31.0s, Number of open files: 615
      Time elapsed: 36.0s, Number of open files: 713
      Time elapsed: 41.0s, Number of open files: 809
      Time elapsed: 46.0s, Number of open files: 903
      Time elapsed: 52.0s, Number of open files: 999
      Time elapsed: 57.0s, Number of open files: 1095
      Subscription deleted: projects/project_id/subscriptions/taxirides-realtime-sub
      WARNING:root:The DirectPipelineResult is being garbage-collected while the DirectRunner is still running the corresponding pipeline. This may lead to incomplete execution of the pipeline if the main thread exits before pipeline completion. Consider using result.wait_until_finish() to wait for completion of pipeline execution.
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ostrokach Alexey Strokach
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m