diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientCongestionControl.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientCongestionControl.java new file mode 100644 index 000000000..6c80f91b0 --- /dev/null +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientCongestionControl.java @@ -0,0 +1,80 @@ +package com.seibel.distanthorizons.core.multiplayer.client; + +import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; +import java.util.function.IntConsumer; +import java.util.function.IntSupplier; + +public class ClientCongestionControl +{ + private static final double ADDITIVE_INCREASE = 50000; + private static final double MULTIPLICATIVE_DECREASE = 0.75; + private static final long INTERVAL_MS = 1000; + + private final AtomicLong bytesReceived = new AtomicLong(0); + private volatile double lastIntervalThroughput = 0; + + private double currentRate = 50000; + private long lastAdjustTime = System.currentTimeMillis(); + + private final IntSupplier currentMaxRateSupplier; + private final BooleanSupplier hasIncompleteBuffersSupplier; + private final IntConsumer rateUpdateConsumer; + + + public ClientCongestionControl( + IntSupplier currentMaxRateSupplier, + BooleanSupplier hasIncompleteBuffersSupplier, + IntConsumer rateUpdateConsumer + ) + { + this.currentMaxRateSupplier = currentMaxRateSupplier; + this.hasIncompleteBuffersSupplier = hasIncompleteBuffersSupplier; + this.rateUpdateConsumer = rateUpdateConsumer; + } + + + public void onPayloadReceived(FullDataSplitMessage message) + { + this.bytesReceived.addAndGet(message.buffer.readableBytes()); + + long now = System.currentTimeMillis(); + if (now - this.lastAdjustTime >= INTERVAL_MS) + { + this.adjustRate(now); + } + } + + private void adjustRate(long now) + { + double throughput = this.bytesReceived.getAndSet(0); + + if (throughput != 0 && throughput >= this.lastIntervalThroughput) + { + this.currentRate = Math.min(this.currentRate, this.currentMaxRateSupplier.getAsInt() * 1000) + ADDITIVE_INCREASE; + } + else if (this.hasIncompleteBuffersSupplier.getAsBoolean()) + { + this.currentRate *= MULTIPLICATIVE_DECREASE; + throughput *= MULTIPLICATIVE_DECREASE; + + if (this.currentRate < 1) + { + this.currentRate = 1; + } + } + + this.lastIntervalThroughput = throughput; + this.lastAdjustTime = now; + + this.rateUpdateConsumer.accept((int) this.currentRate / 1000); + } + + public double getCurrentRate() + { + return this.currentRate; + } + +} diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java index f3abf60cd..705420e88 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/client/ClientNetworkState.java @@ -56,6 +56,12 @@ public class ClientNetworkState implements Closeable private long serverTimeOffset = 0; public long getServerTimeOffset() { return this.serverTimeOffset; } + private final ClientCongestionControl congestionControl = new ClientCongestionControl( + () -> this.sessionConfig.getMaxDataTransferSpeed(), + this.fullDataPayloadReceiver::hasIncompleteBuffers, + x -> this.sendConfigMessage(false) + ); + //=============// @@ -116,6 +122,7 @@ public class ClientNetworkState implements Closeable }); this.networkSession.registerHandler(FullDataSplitMessage.class, this.fullDataPayloadReceiver::receiveChunk); + this.networkSession.registerHandler(FullDataSplitMessage.class, this.congestionControl::onPayloadReceived); } } @@ -127,10 +134,18 @@ public class ClientNetworkState implements Closeable - public void sendConfigMessage() + public void sendConfigMessage() { this.sendConfigMessage(true); } + public void sendConfigMessage(boolean blocking) { - this.configReceived = false; - this.getSession().sendMessage(new SessionConfigMessage(new SessionConfig())); + SessionConfig sessionConfig = new SessionConfig(); + sessionConfig.overrideValue(Config.Server.maxDataTransferSpeed, (int) this.congestionControl.getCurrentRate() / 1000); + + if (blocking) + { + this.configReceived = false; + } + + this.getSession().sendMessage(new SessionConfigMessage(sessionConfig)); } diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java index 25a38bfc1..b8e3f1058 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/config/SessionConfig.java @@ -18,7 +18,7 @@ public class SessionConfig implements INetworkObject private static final LinkedHashMap CONFIG_ENTRIES = new LinkedHashMap<>(); - private final LinkedHashMap values = new LinkedHashMap<>(); + private final HashMap values = new HashMap<>(); public SessionConfig constrainingConfig; @@ -106,7 +106,7 @@ public class SessionConfig implements INetworkObject // internal getters // //==================// - private T getValue(ConfigEntry configEntry) { return this.getValue(configEntry.getChatCommandName()); } + public T getValue(ConfigEntry configEntry) { return this.getValue(configEntry.getChatCommandName()); } @SuppressWarnings("unchecked") private T getValue(String name) { @@ -123,6 +123,12 @@ public class SessionConfig implements INetworkObject : value); } + public void overrideValue(ConfigEntry configEntry, T value) { this.overrideValue(configEntry.getChatCommandName(), value); } + private void overrideValue(String name, Object value) + { + this.values.put(name, value); + } + private Map getValues() { return CONFIG_ENTRIES.keySet().stream().collect(Collectors.toMap( diff --git a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java index f0944b1b5..0445d4d30 100644 --- a/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java +++ b/core/src/main/java/com/seibel/distanthorizons/core/multiplayer/fullData/FullDataPayloadReceiver.java @@ -35,6 +35,7 @@ public class FullDataPayloadReceiver implements AutoCloseable .build().asMap(); + public boolean hasIncompleteBuffers() { return !this.buffersById.isEmpty(); } @Override public void close()