Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7608

v1new ReadFromDatastore skips entities

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.13.0
    • Fix Version/s: 2.15.0
    • Component/s: io-py-gcp
    • Labels:
      None
    • Environment:
      MacOS 10.14.5, Python 2.7

      Description

      A simple map over a datastore kind in local emulator using the new v1new.datastoreio.ReadFromDatastore skip entities.

      The kind has 1516 entities, and when I map over it using the old ReadFromDatastore transform, it maps all of them, i.e., I can map to id and write to text file.

      But the new transform only maps 365 entities. There is no error. The tail of the standard output is:

      INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 08:15:21+00:00
       INFO:root:Estimated size bytes for query: 116188
       INFO:root:Splitting the query into 12 splits
       INFO:root:Running (((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)(ref_AppliedPTransform_GetEntities/Read_16)))((ref_AppliedPTransform_MapToId_17)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)(WriteToFile/Write/WriteImpl/GroupByKey/Write)))))
       INFO:root:Running (WriteToFile/Write/WriteImpl/GroupByKey/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)(ref_PCollection_PCollection_20/Write))
       INFO:root:Running (ref_PCollection_PCollection_12/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)(ref_PCollection_PCollection_21/Write))
       INFO:root:Running (ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33)
       INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
       INFO:root:Renamed 1 shards in 0.12 seconds.

       

      The code for the job on the new transform is:

       

       

      from __future__ import absolute_import
      
      import logging
      import os
      import sys
      
      import apache_beam as beam
      from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
      from apache_beam.io.gcp.datastore.v1new.types import Query
      
      # TODO: should be set outside of python process
      os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
      
      
      def map_to_id(element):
       face_log_id = element.to_client_entity().id
       return face_log_id
      
      
      def run(argv=None):
       p = beam.Pipeline(argv=argv)
      
       project = 'dev'
      
       (p
       | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', project=project))
       | 'MapToId' >> beam.Map(map_to_id)
       | 'WriteToFile' >> beam.io.WriteToText('result')
       )
      
       p.run().wait_until_finish()
      
      
      if __name__ == '__main__':
       logging.getLogger().setLevel(logging.INFO)
       run(sys.argv)

       

      For comparison, the code for the job on the old transform is:

       

      from __future__ import absolute_import
      
      import logging
      import os
      import sys
      
      import apache_beam as beam
      from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
      from google.cloud.proto.datastore.v1 import query_pb2
      
      # TODO: should be set outside of python process
      os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085'
      
      
      def map_to_id(element):
       face_log_id = element.key.path[-1].id
       return face_log_id
      
      
      def run(argv=None):
       p = beam.Pipeline(argv=argv)
      
       project = 'dev'
      
       query = query_pb2.Query()
       query.kind.add().name = 'face_apilog'
      
       (p
       | 'GetEntities' >> ReadFromDatastore(project=project, query=query)
       # TODO: ParDo???
       | 'MapToId' >> beam.Map(map_to_id)
       | 'WriteToFile' >> beam.io.WriteToText('result')
       )
      
       p.run().wait_until_finish()
      
      
      if __name__ == '__main__':
       logging.getLogger().setLevel(logging.INFO)
       run(sys.argv)

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                udim Udi Meiri
                Reporter:
                jacobg Jacob Gur
              • Votes:
                1 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m