Uploaded image for project: 'Apache Gobblin'
  1. Apache Gobblin
  2. GOBBLIN-136

How to ensure kafka log data integrity?

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      hi,
      I now with an example of a website, run our Kafka log, the log of our Kafka has 128 partitions, I use 128 mapper to run task, our data is very big, so I set the execution time 15 minutes of each task, ,but I found ths every time our get hdfs result is not integrity. for example ,frist time ,I got frist time partition 10 offset from 100000 to 90000,but after 15 minutes ,I got hdfs partitions 10 records is:23000; So I think the second time the offset is should begin 123000,but when I run the second time the begin offset is from 160000,I want to konw why ?
      The following is the configuration file information:

          1. job.name=ZeusLogOnlineJob

      job.group=GobblinKafka
      job.description=Gobblin quick start job for Kafka
      job.lock.enabled=false

      kafka.brokers=####

      topic.whitelist=zeus

      source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
      extract.namespace=gobblin.extract.kafka

      writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
      writer.file.path.type=tablename
      writer.destination.type=HDFS
      writer.output.format=txt

      data.publisher.type=gobblin.publisher.BaseDataPublisher
      mr.job.max.mappers=128

      metrics.reporting.file.enabled=true
      metrics.log.dir=/gobblin-kafka/metrics
      metrics.reporting.file.suffix=txt

      bootstrap.with.offset=earliest

      fs.uri=hdfs://
      writer.fs.uri=hdfs://
      state.store.fs.uri=hdfs://

      mr.job.root.dir=/gobblin-kafka/working
      state.store.dir=/gobblin-kafka/state-store
      task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
      data.publisher.final.dir=/gobblintest/job-output

      extract.limit.enabled=true
      extract.limit.type=time
      extract.limit.time.limit=15
      extract.limit.time.limit.timeunit=minutes

      _

      Github Url : https://github.com/linkedin/gobblin/issues/751
      Github Reporter : zhenglu696
      Github Created At : 2016-02-24T03:18:12Z
      Github Updated At : 2016-03-04T17:23:57Z

      Comments


      stakiar wrote on 2016-02-25T03:11:44Z : Hey @zhenglu696 I am not sure I fully understand the problem, can you elaborate a little more and provide a few more details? Thanks

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188582567


      zhenglu696 wrote on 2016-02-25T07:59:35Z : hey,@sahilTakiar,I have the following questions:

      1, In the configuration file, I have configured the execution time of each task, such as configuration for fifteen minutes,eg:
      extract.limit.enabled=true
      extract.limit.type=time
      extract.limit.time.limit=15
      extract.limit.time.limit.timeunit=minutes
      I configure the Kafka pull strategy is: earliest;
      The task is not performed in 15 minutes, When I run the second time, When the second run, I want to know how to pull from which offset? whether to pull data from the last offset last time?
      2, when I pull data with 128 partitions, with 128 map, using the above strategies that sometimes generated total files numbers is not 128? Why is this?

      thank you

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188661097


      stakiar wrote on 2016-02-25T18:33:52Z : @zliu41 correct me if I am wrong

      1: The first run will pull from the earliest offsets. Let's say it pulled data from `[earliest, x]`. assuming the first run completed with no failures, the second run will pull from offsets `(x, y]` automatically.
      2: Is it possible that some of the Kafka partitions have no data? This could explain why some map tasks don't write any files.

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-188921010


      zhenglu696 wrote on 2016-02-26T00:57:36Z : @sahilTakiar @zliu41
      The first run will pull from the earliest offsets. this is Ok. I think the frist time offset is [earlist,X],
      X is the current time this task read the newest offset,but in I run this task 15minites,in this times,the task is not performed ,eg :this task pull[earliest,Y],I want to konw the secnond run this task ,it begins
      X or Y ????
      thinks !!!!!

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-189059186


      stakiar wrote on 2016-02-26T18:24:35Z : Kafka only deals with offsets, not with timestamps. Each offset corresponds to a single record. So you pull data from offset `x` to offset `y`. Kafka is agnostic to the timestamp of each record. The `extract.limit.time.limit` only controls how long Gobblin Tasks will spend trying to consume data from a topic.

      The first run will pull data from the earliest offset to some offset `x` where `x` is `>` the earliest offset. The value of `x` is determined at runtime. Gobblin will start to pull from the earliest offset. It contacts Kafka and says give the record corresponding to the lastest offset. It processes that record and then requests the earliest offset + 1, then the earliest offset + 2, etc. It will do this until 15 minutes have elapsed. At which point it will be at some offset we call `x`.

      The second run will pull data from `x` to `y` where `y` is `>` `x`

      I also suggest taking a look at the Kafka documentation.

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-189412094


      zhenglu696 wrote on 2016-02-29T07:31:42Z : Hi,@sahilTakiar @zliu41,
      Thank you for your answer!
      I also have some questions as follows :
      1. if I set mapper.nums equals to kafka.topic.partition.nums, then whether each mapper consume a specific partition of the topic? if mapper.nums bigger than partitions.nums, for example 256 mappers ,I want to konw what will other mappers do?
      2. Where is the consumption offset storage in? If my job failed , I want to manually modify offset spending again, is that possible? if possible , how to operate? I think it stoarge in the state.store.dir this config, I open this file ,I found this file is error code ,How to open this file and manually modify offset?

      3、Since the data is pushed into Kafka streamingly and continuously during the Gobblin is running, Will a mapper records the latest offset of a partition before it starts consume the topic and just consumed to the recorded offset and finish the mapper? If not , how does a mapper judge a data of the partition is all consumed?

      thanks a lot!

      Github Url : https://github.com/linkedin/gobblin/issues/751#issuecomment-190075318

      Attachments

        Activity

          People

            Unassigned Unassigned
            abti Abhishek Tiwari
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: