Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-135

LevelDbKeyValueStore leaks memory on putAll

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • kv
    • None

    Description

      Sitting in a tight loop and calling store.all(); store.close(); shows that memory is being leaked.

      Digging into it a bit more, when we use a KeyValueStore with a cache, and we call store.all, the cache does:

        def all() = {
          metrics.alls.inc
          flush()
          store.all()
        }
      

      In turn, flush() does:

        def flush() {
          trace("Flushing.")
      
          metrics.flushes.inc
      
          // write out the contents of the dirty list oldest first
          val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
          for (k <- this.dirty.reverse) {
            val entry = this.cache.get(k)
            entry.dirty = null // not dirty any more
            batch.add(new Entry(k, entry.value))
          }
          store.putAll(batch)
          store.flush
          metrics.flushBatchSize.inc(batch.size)
      
          // reset the dirty list
          this.dirty = new mutable.DoubleLinkedList[K]()
          this.dirtyCount = 0
        }
      

      The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:

        def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
          val batch = db.createWriteBatch()
          val iter = entries.iterator
          var wrote = 0
          var deletes = 0
          while (iter.hasNext) {
            wrote += 1
            val curr = iter.next()
            if (curr.getValue == null) {
              deletes += 1
              batch.delete(curr.getKey)
            } else {
              val key = curr.getKey
              val value = curr.getValue
              metrics.bytesWritten.inc(key.size + value.size)
              batch.put(key, value)
            }
          }
          db.write(batch)
          batch.close
          metrics.puts.inc(wrote)
          metrics.deletes.inc(deletes)
        }
      

      According to the docs on https://github.com/fusesource/leveldbjni, the batch needs to be close!

      WriteBatch batch = db.createWriteBatch();
      try {
        batch.delete(bytes("Denver"));
        batch.put(bytes("Tampa"), bytes("green"));
        batch.put(bytes("London"), bytes("red"));
      
        db.write(batch);
      } finally {
        // Make sure you close the batch to avoid resource leaks.
        batch.close();
      }
      

      This should be a one-line fix.

      Attachments

        1. SAMZA-135.0.patch
          0.5 kB
          Chris Riccomini

        Activity

          People

            criccomini Chris Riccomini
            criccomini Chris Riccomini
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: