Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
0.6.0, 0.7.0
-
None
Description
I have a nifi instance that I have been running for about a week and has deadlocked at least 3 times during this time. When I say deadlock the whole nifi instance stops doing any progress on flowfiles. I looked at the stack trace and there are a lot of threads stuck doing tasks in the PersistentProvenanceRepository. Looking at the code I think this is what is happening:
There is a ReadWriteLock that all the reads are waiting for a write. The write is in the loop:
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { // if a shutdown happens while we are in this loop, kill the rollover thread and break if (this.closed.get()) { if (future != null) { future.cancel(true); } break; } if (repoSize > sizeThreshold) { logger.debug("Provenance Repository has exceeded its size threshold; will trigger purging of oldest events"); purgeOldEvents(); journalFileCount = getJournalCount(); repoSize = getSize(getLogFiles(), 0L); continue; } else { // if we are constrained by the number of journal files rather than the size of the repo, // then we will just sleep a bit because another thread is already actively merging the journals, // due to the runnable that we scheduled above try { Thread.sleep(100L); } catch (final InterruptedException ie) { } } logger.debug("Provenance Repository is still behind. Keeping flow slowed down " + "to accommodate. Currently, there are {} journal files ({} bytes) and " + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); journalFileCount = getJournalCount(); repoSize = getSize(getLogFiles(), 0L); } logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of " + "journal files to be rolled over is {}", journalFileCount); }
My nifi is at the sleep indefinitely. The reason my nifi cannot move forward is because of the thread doing the merge is stopped. The thread doing the merge is at:
accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
so the queue is full.
What I believe happened is that the callables created here:
final Callable<Object> callable = new Callable<Object>() { @Override public Object call() throws IOException { while (!eventQueue.isEmpty() || !finishedAdding.get()) { final Tuple<StandardProvenanceEventRecord, Integer> tuple; try { tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS); } catch (final InterruptedException ie) { continue; } if (tuple == null) { continue; } indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue()); } return null; }
finish before the offer adds its first event because I do not see any Index Provenance Events threads. My guess is the while loop condition is wrong and should be && instead of ||.
I upped the thread count for the index creation from 1 to 3 to see if that helps. I can tell you if that helps later this week.