Close network request tacker properly
This commit is contained in:
+10
-10
@@ -44,16 +44,7 @@ public class NetworkRequestTracker
|
||||
public CompletableFuture<TResponse> sendRequest(Channel channel, IFutureTrackableNetworkMessage<TKey> msg)
|
||||
{
|
||||
if (knownChannels.add(channel))
|
||||
{
|
||||
channel.closeFuture().addListener(closeFuture ->
|
||||
{
|
||||
pendingFutures.row(channel).values().removeIf(responseFuture ->
|
||||
{
|
||||
responseFuture.completeExceptionally(closeFuture.cause());
|
||||
return true;
|
||||
});
|
||||
});
|
||||
}
|
||||
channel.closeFuture().addListener(closeFuture -> completeAllExceptionally(channel, closeFuture.cause()));
|
||||
|
||||
CompletableFuture<TResponse> responseFuture = new CompletableFuture<>();
|
||||
pendingFutures.put(channel, msg.getRequestKey(), responseFuture);
|
||||
@@ -66,8 +57,17 @@ public class NetworkRequestTracker
|
||||
return responseFuture;
|
||||
}
|
||||
|
||||
private void completeAllExceptionally(Channel channel, Throwable cause) {
|
||||
for (CompletableFuture<TResponse> responseFuture : pendingFutures.row(channel).values()) {
|
||||
responseFuture.completeExceptionally(cause);
|
||||
};
|
||||
pendingFutures.row(channel).clear();
|
||||
}
|
||||
|
||||
@Override public void close()
|
||||
{
|
||||
this.eventSource.close();
|
||||
for (Channel channel : pendingFutures.rowKeySet())
|
||||
this.completeAllExceptionally(channel, new Exception(this.getClass().getSimpleName()+" is closed."));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user