Adaptive data transfer speed
This commit is contained in:
+80
@@ -0,0 +1,80 @@
|
||||
package com.seibel.distanthorizons.core.multiplayer.client;
|
||||
|
||||
import com.seibel.distanthorizons.core.network.messages.fullData.FullDataSplitMessage;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntSupplier;
|
||||
|
||||
public class ClientCongestionControl
|
||||
{
|
||||
private static final double ADDITIVE_INCREASE = 50000;
|
||||
private static final double MULTIPLICATIVE_DECREASE = 0.75;
|
||||
private static final long INTERVAL_MS = 1000;
|
||||
|
||||
private final AtomicLong bytesReceived = new AtomicLong(0);
|
||||
private volatile double lastIntervalThroughput = 0;
|
||||
|
||||
private double currentRate = 50000;
|
||||
private long lastAdjustTime = System.currentTimeMillis();
|
||||
|
||||
private final IntSupplier currentMaxRateSupplier;
|
||||
private final BooleanSupplier hasIncompleteBuffersSupplier;
|
||||
private final IntConsumer rateUpdateConsumer;
|
||||
|
||||
|
||||
public ClientCongestionControl(
|
||||
IntSupplier currentMaxRateSupplier,
|
||||
BooleanSupplier hasIncompleteBuffersSupplier,
|
||||
IntConsumer rateUpdateConsumer
|
||||
)
|
||||
{
|
||||
this.currentMaxRateSupplier = currentMaxRateSupplier;
|
||||
this.hasIncompleteBuffersSupplier = hasIncompleteBuffersSupplier;
|
||||
this.rateUpdateConsumer = rateUpdateConsumer;
|
||||
}
|
||||
|
||||
|
||||
public void onPayloadReceived(FullDataSplitMessage message)
|
||||
{
|
||||
this.bytesReceived.addAndGet(message.buffer.readableBytes());
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
if (now - this.lastAdjustTime >= INTERVAL_MS)
|
||||
{
|
||||
this.adjustRate(now);
|
||||
}
|
||||
}
|
||||
|
||||
private void adjustRate(long now)
|
||||
{
|
||||
double throughput = this.bytesReceived.getAndSet(0);
|
||||
|
||||
if (throughput != 0 && throughput >= this.lastIntervalThroughput)
|
||||
{
|
||||
this.currentRate = Math.min(this.currentRate, this.currentMaxRateSupplier.getAsInt() * 1000) + ADDITIVE_INCREASE;
|
||||
}
|
||||
else if (this.hasIncompleteBuffersSupplier.getAsBoolean())
|
||||
{
|
||||
this.currentRate *= MULTIPLICATIVE_DECREASE;
|
||||
throughput *= MULTIPLICATIVE_DECREASE;
|
||||
|
||||
if (this.currentRate < 1)
|
||||
{
|
||||
this.currentRate = 1;
|
||||
}
|
||||
}
|
||||
|
||||
this.lastIntervalThroughput = throughput;
|
||||
this.lastAdjustTime = now;
|
||||
|
||||
this.rateUpdateConsumer.accept((int) this.currentRate / 1000);
|
||||
}
|
||||
|
||||
public double getCurrentRate()
|
||||
{
|
||||
return this.currentRate;
|
||||
}
|
||||
|
||||
}
|
||||
+18
-3
@@ -56,6 +56,12 @@ public class ClientNetworkState implements Closeable
|
||||
private long serverTimeOffset = 0;
|
||||
public long getServerTimeOffset() { return this.serverTimeOffset; }
|
||||
|
||||
private final ClientCongestionControl congestionControl = new ClientCongestionControl(
|
||||
() -> this.sessionConfig.getMaxDataTransferSpeed(),
|
||||
this.fullDataPayloadReceiver::hasIncompleteBuffers,
|
||||
x -> this.sendConfigMessage(false)
|
||||
);
|
||||
|
||||
|
||||
|
||||
//=============//
|
||||
@@ -116,6 +122,7 @@ public class ClientNetworkState implements Closeable
|
||||
});
|
||||
|
||||
this.networkSession.registerHandler(FullDataSplitMessage.class, this.fullDataPayloadReceiver::receiveChunk);
|
||||
this.networkSession.registerHandler(FullDataSplitMessage.class, this.congestionControl::onPayloadReceived);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,10 +134,18 @@ public class ClientNetworkState implements Closeable
|
||||
|
||||
|
||||
|
||||
public void sendConfigMessage()
|
||||
public void sendConfigMessage() { this.sendConfigMessage(true); }
|
||||
public void sendConfigMessage(boolean blocking)
|
||||
{
|
||||
this.configReceived = false;
|
||||
this.getSession().sendMessage(new SessionConfigMessage(new SessionConfig()));
|
||||
SessionConfig sessionConfig = new SessionConfig();
|
||||
sessionConfig.overrideValue(Config.Server.maxDataTransferSpeed, (int) this.congestionControl.getCurrentRate() / 1000);
|
||||
|
||||
if (blocking)
|
||||
{
|
||||
this.configReceived = false;
|
||||
}
|
||||
|
||||
this.getSession().sendMessage(new SessionConfigMessage(sessionConfig));
|
||||
}
|
||||
|
||||
|
||||
|
||||
+8
-2
@@ -18,7 +18,7 @@ public class SessionConfig implements INetworkObject
|
||||
private static final LinkedHashMap<String, Entry> CONFIG_ENTRIES = new LinkedHashMap<>();
|
||||
|
||||
|
||||
private final LinkedHashMap<String, Object> values = new LinkedHashMap<>();
|
||||
private final HashMap<String, Object> values = new HashMap<>();
|
||||
public SessionConfig constrainingConfig;
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ public class SessionConfig implements INetworkObject
|
||||
// internal getters //
|
||||
//==================//
|
||||
|
||||
private <T> T getValue(ConfigEntry<T> configEntry) { return this.getValue(configEntry.getChatCommandName()); }
|
||||
public <T> T getValue(ConfigEntry<T> configEntry) { return this.getValue(configEntry.getChatCommandName()); }
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> T getValue(String name)
|
||||
{
|
||||
@@ -123,6 +123,12 @@ public class SessionConfig implements INetworkObject
|
||||
: value);
|
||||
}
|
||||
|
||||
public <T> void overrideValue(ConfigEntry<T> configEntry, T value) { this.overrideValue(configEntry.getChatCommandName(), value); }
|
||||
private void overrideValue(String name, Object value)
|
||||
{
|
||||
this.values.put(name, value);
|
||||
}
|
||||
|
||||
private Map<String, ?> getValues()
|
||||
{
|
||||
return CONFIG_ENTRIES.keySet().stream().collect(Collectors.toMap(
|
||||
|
||||
+1
@@ -35,6 +35,7 @@ public class FullDataPayloadReceiver implements AutoCloseable
|
||||
.build().asMap();
|
||||
|
||||
|
||||
public boolean hasIncompleteBuffers() { return !this.buffersById.isEmpty(); }
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
|
||||
Reference in New Issue
Block a user