Description
To reproduce this issue:
1. create a application with a socket dstream
2. start the socket server and start the application
3. restart the socket server
4. the socket dstream will fail to reconnect (it will close the connection after a successful connect)
The main issue should be the status in SocketReceiver and ReceiverSupervisor is incorrect after the reconnect:
In SocketReceiver ::receive() the while loop will never be entered after reconnect since the isStopped will returns true:
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext)
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
That is caused by the status flag "receiverState" in ReceiverSupervisor will be set to Stopped when the connection losses, but it is reset after the call of Receiver start method:
def startReceiver(): Unit = synchronized {
try
catch
{ case t: Throwable => stop("Error starting receiver " + streamId, Some(t)) }}