Uploaded image for project: 'Apache Gobblin'
  1. Apache Gobblin
  2. GOBBLIN-71

KafkaSource isn't dynamically creating mappers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      According to the documentation, when the mr.job.max.mappers property isn't set in the job conf file, Gobblin will create one mapper for each workunit. But in practice we've found that it actually defaults to 100 mappers, and when there are fewer than 100 work units, the execution of the commit method on the empty / idle mappers is triggering the error below. My question is: how can we enable the dynamic mapper creation behavior if possible, and if not and the documentation is wrong, how can we deal with the following exception appearing in the empty mapper logs:

      > 2016-12-07 17:12:50,378 WARN [main] org.apache.hadoop.metrics2.impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-maptask.properties,hadoop-metrics2.properties
      2016-12-07 17:12:50,510 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
      2016-12-07 17:12:50,510 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
      2016-12-07 17:12:50,527 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
      2016-12-07 17:12:50,528 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1473377625712_2174, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@4194e3ee)
      2016-12-07 17:12:50,695 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
      2016-12-07 17:12:51,166 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /hadoop/yarn/local/usercache/airflow/appcache/application_1473377625712_2174,/hadoop/yarn/local/usercache/airflow/appcache/application_1473377625712_2174,hadoop/yarn/local/usercache/airflow/appcache/application_1473377625712_2174
      2016-12-07 17:12:51,833 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
      2016-12-07 17:12:52,491 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
      2016-12-07 17:12:52,790 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat$GobblinSplit@3d918345
      2016-12-07 17:12:52,933 INFO [TaskExecutor STARTING] gobblin.runtime.TaskExecutor: Starting the task executor
      2016-12-07 17:12:52,934 INFO [MRTaskStateTracker STARTING] gobblin.runtime.mapreduce.MRTaskStateTracker: Starting the task state tracker
      2016-12-07 17:12:52,976 WARN [main] gobblin.runtime.GobblinMultiTaskAttempt: No work units to run in container attempt_1473377625712_2174_m_000003_0
      2016-12-07 17:12:52,976 INFO [main] gobblin.runtime.AbstractJobLauncher: Will commit tasks directly.
      2016-12-07 17:12:52,976 INFO [main] gobblin.runtime.mapreduce.MRJobLauncher: Starting the clean-up steps.
      2016-12-07 17:12:52,978 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Stopping the task executor
      2016-12-07 17:12:52,978 INFO [MRTaskStateTracker STOPPING] gobblin.runtime.mapreduce.MRTaskStateTracker: Stopping the task state tracker
      2016-12-07 17:12:52,978 INFO [MRTaskStateTracker STOPPING] gobblin.runtime.mapreduce.MRTaskStateTracker: Attempting to shutdown ExecutorService: java.util.concurrent.ScheduledThreadPoolExecutor@5868f6a9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,978 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@32cf8dd6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,978 INFO [MRTaskStateTracker STOPPING] gobblin.runtime.mapreduce.MRTaskStateTracker: Successfully shutdown ExecutorService: java.util.concurrent.ScheduledThreadPoolExecutor@5868f6a9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,978 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@32cf8dd6[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,978 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Attempting to shutdown ExecutorService: java.util.concurrent.ScheduledThreadPoolExecutor@5319fe97[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,978 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Successfully shutdown ExecutorService: java.util.concurrent.ScheduledThreadPoolExecutor@5319fe97[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,979 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@44d20052[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,979 INFO [TaskExecutor STOPPING] gobblin.runtime.TaskExecutor: Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@44d20052[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
      2016-12-07 17:12:52,979 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.NullPointerException
      at gobblin.runtime.GobblinMultiTaskAttempt.commit(GobblinMultiTaskAttempt.java:102)
      at gobblin.runtime.AbstractJobLauncher.runAndOptionallyCommitTaskAttempt(AbstractJobLauncher.java:676)
      at gobblin.runtime.AbstractJobLauncher.runWorkUnits(AbstractJobLauncher.java:655)
      at gobblin.runtime.mapreduce.MRJobLauncher$TaskRunner.run(MRJobLauncher.java:602)
      at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
      at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
      at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:415)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
      at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

      Github Url : https://github.com/linkedin/gobblin/issues/1443
      Github Reporter : adammac1991
      Github Assignee : shirshanka
      Github Created At : 2016-12-08T17:28:47Z
      Github Updated At : 2017-04-14T23:25:13Z

      Comments


      ydai1124 wrote on 2017-04-14T23:25:13Z : Hi @adammac1991 , I believe the exception you saw has been fixed in the latest version. Can you try again?
      For the dynamic number of mappers, Kafka source has its own packing algorithm. It also creates empty workunits for partitions in previous state-store. Can you send us your job config file and also the log on the driver side?

      Github Url : https://github.com/linkedin/gobblin/issues/1443#issuecomment-294254955

      Attachments

        Activity

          People

            Unassigned Unassigned
            abti Abhishek Tiwari
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: