After a connection failure from the reducer to the node manager, shuffles started to hang with the following message:
There are two problems that leads to the hang.
When a reducer has an issue connecting to the node manager, copyFromHost may call putBackKnownMapOutput on the same task attempt multiple times.
There are two call sites of putBackKnownMapOutput in copyFromHost since
1. In the finally block of copyFromHost
2. In the catch block of openShuffleUrl.
When openShuffleUrl fails to connect from the catch block in copyFromHost, it returns null.
By the time openShuffleUrl returns null, putBackKnownMapOutput would have been called already for all remaining map outputs.
However, the finally block calls putBackKnownMapOutput one more time on the map outputs.
Problem 2. Problem 1 causes a leak in MergeManager.
The problem occurs when multiple fetchers get the same set of map attempt outputs to fetch.
Different fetchers reserves memory from MergeManager in Fetcher.copyMapOutput for the same map outputs.
When the fetch succeeds, only the first map output gets committed through ShuffleSchedulerImpl.copySucceeded -> InMemoryMapOutput.commit, because commit() is gated by !finishedMaps[mapIndex].
This may lead to a condition where usedMemory > memoryLimit, while commitMemory < mergeThreshold.
This gets the MergeManager into a deadlock where a merge is never triggered while MergeManager cannot reserve additional space for map outputs.