Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6767

OffsetCheckpoint write assumes parent directory exists

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.1.0
    • None
    • streams
    • None

    Description

      We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies it is created from scratch, rather than reusing the existing RocksDB.)

      We routinely see:

      2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
      java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
      at java.io.FileOutputStream.open0(Native Method)
      at java.io.FileOutputStream.open(FileOutputStream.java:270)
      at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
      at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
      at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
      at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
      at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
      at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
      at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
      at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

      Inspecting the state store directory, I can indeed see that chat/0_11 does not exist (although many other partitions do).

       

      Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file without first ensuring that the parent directory exists.

       

          public void write(final Map<TopicPartition, Long> offsets) throws IOException {
              // if there is no offsets, skip writing the file to save disk IOs
              if (offsets.isEmpty()) {
                  return;
              }
      
              synchronized (lock) {
                  // write to temp file and then swap with the existing file
                  final File temp = new File(file.getAbsolutePath() + ".tmp");

       

      Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition of it being called should ensure that is the case.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              stevenschlansker Steven Schlansker
              Votes:
              2 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated: