Generation task prioritization (loaded > unloaded > ungenerated)

This commit is contained in:
s809
2023-08-13 18:59:08 +05:00
parent 271b193543
commit 0c155ac8cd
22 changed files with 395 additions and 109 deletions
@@ -161,7 +161,7 @@ public class FullDataPointIdMap
}
public String serialize(ILevelWrapper levelWrapper) { return this.biome.serialize(levelWrapper) + SEPARATOR_STRING + this.blockState.serialize(); }
public String serialize(ILevelWrapper levelWrapper) { return this.biome.serialize(levelWrapper) + SEPARATOR_STRING + this.blockState.serialize(levelWrapper); }
public static Entry deserialize(String str, ILevelWrapper levelWrapper) throws IOException, InterruptedException
{
@@ -178,7 +178,7 @@ public class FullDataPointIdMap
}
IBiomeWrapper biome = WRAPPER_FACTORY.deserializeBiomeWrapper(stringArray[0], levelWrapper);
IBlockStateWrapper blockState = WRAPPER_FACTORY.deserializeBlockStateWrapper(stringArray[1]);
IBlockStateWrapper blockState = WRAPPER_FACTORY.deserializeBlockStateWrapper(stringArray[1], levelWrapper);
return new Entry(biome, blockState);
}
@@ -43,6 +43,18 @@ public class FullDataFileHandler implements IFullDataSourceProvider
private final ConcurrentHashMap<DhSectionPos, File> unloadedFiles = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DhSectionPos, FullDataMetaFile> fileBySectionPos = new ConcurrentHashMap<>();
public void ForEachFile(Consumer<FullDataMetaFile> consumer) { this.fileBySectionPos.values().forEach(consumer); }
public Map<DhSectionPos, Integer> getLoadStates(Iterable<DhSectionPos> posList)
{
HashMap<DhSectionPos, Integer> map = new HashMap<>();
for (DhSectionPos pos : posList)
{
map.put(pos,
fileBySectionPos.containsKey(pos) ? 3 // Loaded
: unloadedFiles.containsKey(pos) ? 2 // Unloaded
: 1); // Not generated
}
return map;
}
private LinkedList<Consumer<IFullDataSource>> onUpdatedListeners = new LinkedList<>();
@@ -11,6 +11,8 @@ import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.GenTaskPriorityRequestMessage;
import com.seibel.distanthorizons.core.network.messages.GenTaskPriorityResponseMessage;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
import com.seibel.distanthorizons.core.pos.DhLodPos;
@@ -25,9 +27,11 @@ import org.apache.logging.log4j.Logger;
import javax.annotation.CheckForNull;
import java.awt.*;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebugRenderable
{
@@ -42,6 +46,9 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
private final Semaphore pendingTasksSemaphore = new Semaphore(Short.MAX_VALUE, true);
private int pendingTasks() { return Short.MAX_VALUE - pendingTasksSemaphore.availablePermits(); }
private CompletableFuture<?> genTaskPriorityRequest = CompletableFuture.completedFuture(null);
private final Semaphore genTaskPriorityRequestSemaphore = new Semaphore(1, true);
private final F3Screen.NestedMessage f3Message = new F3Screen.NestedMessage(this::f3Log);
private final AtomicInteger finishedRequests = new AtomicInteger();
private final AtomicInteger totalRequests = new AtomicInteger();
@@ -73,17 +80,62 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
return entry.future;
}
private int posDistanceSquared(DhBlockPos2D targetPos, DhSectionPos pos)
{
return (int) pos.getCenter().getCenterBlockPos().distSquared(targetPos);
}
@Override
public void runCurrentGenTasksUntilBusy(DhBlockPos2D targetPos)
{
while (generatorClosingFuture == null
&& networkState.client().isReady()
&& !waitingTasks.isEmpty()
if (generatorClosingFuture != null || !networkState.getClient().isReady()) return;
while (!waitingTasks.isEmpty()
&& pendingTasks() < this.networkState.config.fullDataRequestRateLimit
&& pendingTasksSemaphore.tryAcquire())
{
sendNewRequest(targetPos);
}
if (genTaskPriorityRequestSemaphore.tryAcquire()) {
List<DhSectionPos> posList = waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null && task.getValue().priority == 0)
.sorted((x, y) -> posDistanceSquared(targetPos, x.getKey()) - posDistanceSquared(targetPos, y.getKey()))
.limit(this.networkState.config.fullDataRequestRateLimit)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (posList.isEmpty()) {
genTaskPriorityRequestSemaphore.release();
return;
};
CompletableFuture<GenTaskPriorityResponseMessage> request = this.networkState.getClient().sendRequest(new GenTaskPriorityRequestMessage(posList));
genTaskPriorityRequest = request;
request.handleAsync((response, throwable) -> {
try
{
if (throwable != null)
throw throwable;
for (Map.Entry<DhSectionPos, Integer> mapEntry : response.posList.entrySet())
{
WorldGenQueueEntry entry = waitingTasks.get(mapEntry.getKey());
if (entry != null)
entry.priority = mapEntry.getValue();
}
}
catch (ChannelException | CancellationException ignored)
{
}
catch (Throwable e)
{
LOGGER.error("Error while fetching gen task priorities", e);
}
genTaskPriorityRequestSemaphore.release();
return null;
});
}
}
@Override
@@ -106,10 +158,10 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
Map.Entry<DhSectionPos, WorldGenQueueEntry> mapEntry = waitingTasks.entrySet().stream()
.filter(task -> task.getValue().request == null)
.reduce(null, (a, b)
-> a != null
&& a.getKey().getCenter().getCenterBlockPos().distSquared(targetPos)
< b.getKey().getCenter().getCenterBlockPos().distSquared(targetPos)
? a : b);
-> a == null
|| b.getValue().priority > a.getValue().priority
|| posDistanceSquared(targetPos, b.getKey()) < posDistanceSquared(targetPos, a.getKey())
? b : a);
if (mapEntry == null)
{
pendingTasksSemaphore.release();
@@ -119,69 +171,71 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
DhSectionPos sectionPos = mapEntry.getKey();
WorldGenQueueEntry entry = mapEntry.getValue();
entry.request = this.networkState.client().sendRequest(new FullDataSourceRequestMessage(sectionPos));
entry.request.handle((response, throwable) ->
{
entry.request = null;
pendingTasksSemaphore.release();
finishedRequests.incrementAndGet();
CompletableFuture<FullDataSourceResponseMessage> request = this.networkState.getClient().sendRequest(new FullDataSourceRequestMessage(sectionPos));
entry.request = request;
request.handleAsync((response, throwable) ->
{
pendingTasksSemaphore.release();
finishedRequests.incrementAndGet();
try
{
if (throwable != null)
throw throwable;
waitingTasks.remove(sectionPos);
LOGGER.debug("FullDataSourceResponseMessage " + sectionPos);
CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level);
// FIXME Add dimension context to request instead
// Check is dimension has been switched - received data may no longer be relevant
if (fullDataSource == null)
throw new CancellationException();
Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer = entry.tracker.getChunkDataConsumer();
// FIXME Why keeping a reference in first place
if (chunkDataConsumer == null)
return entry.future.cancel(false);
sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> {
ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ));
try
{
if (throwable != null)
throw throwable;
waitingTasks.remove(sectionPos);
LOGGER.debug("FullDataSourceResponseMessage " + sectionPos);
CompleteFullDataSource fullDataSource = response.getFullDataSource(sectionPos, level);
// Check is dimension has been switched - received data may no longer be relevant
if (fullDataSource == null)
throw new CancellationException();
Consumer<ChunkSizedFullDataAccessor> chunkDataConsumer = entry.tracker.getChunkDataConsumer();
// FIXME Who decided it was a good idea to use weak references for cancellation purposes?
if (chunkDataConsumer == null)
return entry.future.cancel(false);
sectionPos.forEachChildAtLevel(LodUtil.CHUNK_DETAIL_LEVEL, childPos -> {
ChunkSizedFullDataAccessor accessor = new ChunkSizedFullDataAccessor(new DhChunkPos(childPos.sectionX, childPos.sectionZ));
int detailLevelDifference = sectionPos.sectionDetailLevel - childPos.sectionDetailLevel;
int childRelativeX = childPos.sectionX - sectionPos.sectionX * BitShiftUtil.powerOfTwo(detailLevelDifference);
int childRelativeZ = childPos.sectionZ - sectionPos.sectionZ * BitShiftUtil.powerOfTwo(detailLevelDifference);
fullDataSource.subView(
LodUtil.CHUNK_WIDTH,
childRelativeX * LodUtil.CHUNK_WIDTH,
childRelativeZ * LodUtil.CHUNK_WIDTH
).shadowCopyTo(accessor);
chunkDataConsumer.accept(accessor);
});
}
catch (ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
LOGGER.warn("Rate limited by server, re-queueing task ["+sectionPos+"]: "+e.getMessage());
finishedRequests.decrementAndGet();
}
catch (CancellationException ignored)
{
finishedRequests.decrementAndGet();
totalRequests.decrementAndGet();
}
catch (Throwable e)
{
LOGGER.error("Error while fetching full data source", e);
failedRequests.incrementAndGet();
return entry.future.complete(WorldGenResult.CreateFail());
}
int detailLevelDifference = sectionPos.sectionDetailLevel - childPos.sectionDetailLevel;
int childRelativeX = childPos.sectionX - sectionPos.sectionX * BitShiftUtil.powerOfTwo(detailLevelDifference);
int childRelativeZ = childPos.sectionZ - sectionPos.sectionZ * BitShiftUtil.powerOfTwo(detailLevelDifference);
return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
fullDataSource.subView(
LodUtil.CHUNK_WIDTH,
childRelativeX * LodUtil.CHUNK_WIDTH,
childRelativeZ * LodUtil.CHUNK_WIDTH
).shadowCopyTo(accessor);
chunkDataConsumer.accept(accessor);
});
}
catch (ChannelException | RateLimitedException e)
{
if (e instanceof RateLimitedException)
LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage());
entry.request = null;
finishedRequests.decrementAndGet();
}
catch (CancellationException ignored)
{
finishedRequests.decrementAndGet();
totalRequests.decrementAndGet();
}
catch (Throwable e)
{
LOGGER.error("Error while fetching full data source", e);
failedRequests.incrementAndGet();
return entry.future.complete(WorldGenResult.CreateFail());
}
return entry.future.complete(WorldGenResult.CreateSuccess(sectionPos));
});
}
private String[] f3Log()
@@ -197,6 +251,11 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
public CompletableFuture<Void> startClosing(boolean cancelCurrentGeneration, boolean alsoInterruptRunning)
{
return this.generatorClosingFuture = CompletableFuture.runAsync(() -> {
while (!genTaskPriorityRequestSemaphore.tryAcquire())
{
genTaskPriorityRequest.cancel(false);
}
while (!pendingTasksSemaphore.tryAcquire(Short.MAX_VALUE))
{
for (WorldGenQueueEntry entry : this.waitingTasks.values())
@@ -231,8 +290,12 @@ public class WorldRemoteGenerationQueue implements IWorldGenerationQueue, IDebug
{
public final CompletableFuture<WorldGenResult> future;
public final IWorldGenTaskTracker tracker;
// Higher value = higher priority.
// Priority of 0 is reserved for unassigned value
public int priority = 0;
@CheckForNull
public CompletableFuture<FullDataSourceResponseMessage> request;
public CompletableFuture<?> request;
public WorldGenQueueEntry(CompletableFuture<WorldGenResult> future, IWorldGenTaskTracker tracker)
{
@@ -8,7 +8,6 @@ import com.seibel.distanthorizons.core.file.structure.AbstractSaveStructure;
import com.seibel.distanthorizons.core.generation.WorldRemoteGenerationQueue;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.multiplayer.ClientNetworkState;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.pos.DhBlockPos;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -83,7 +82,7 @@ public class DhClientLevel extends DhLevel implements IDhClientLevel
public void doWorldGen()
{
boolean isClientUsable = networkState != null && !networkState.client().isClosed();
boolean isClientUsable = networkState != null && !networkState.getClient().isClosed();
boolean shouldDoWorldGen = isClientUsable && clientside.isRendering();
boolean isWorldGenRunning = worldGenModule.isWorldGenRunning();
if (shouldDoWorldGen && !isWorldGenRunning)
@@ -13,9 +13,7 @@ import com.seibel.distanthorizons.core.multiplayer.RemotePlayerConnectionHandler
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkServer;
import com.seibel.distanthorizons.core.network.exceptions.RateLimitedException;
import com.seibel.distanthorizons.core.network.messages.CancelMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceRequestMessage;
import com.seibel.distanthorizons.core.network.messages.FullDataSourceResponseMessage;
import com.seibel.distanthorizons.core.network.messages.*;
import com.seibel.distanthorizons.core.pos.DhBlockPos2D;
import com.seibel.distanthorizons.core.pos.DhLodPos;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
@@ -65,6 +63,12 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
private void registerNetworkHandlers()
{
// TODO implement transparent message handling restriction by level
// workaround:
// ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
// if (serverPlayerState.serverPlayer.getLevel() != this.serverLevelWrapper)
// return;
this.eventSource.registerHandler(FullDataSourceRequestMessage.class, msg ->
{
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
@@ -100,6 +104,16 @@ public class DhServerLevel extends DhLevel implements IDhServerLevel
}
});
this.eventSource.registerHandler(GenTaskPriorityRequestMessage.class, msg -> {
ServerPlayerState serverPlayerState = remotePlayerConnectionHandler.getConnectedPlayer(msg);
if (serverPlayerState.serverPlayer.getLevel() != this.serverLevelWrapper)
return;
msg.sendResponse(new GenTaskPriorityResponseMessage(
this.serverside.dataFileHandler.getLoadStates(msg.posList)
));
});
this.eventSource.registerHandler(CancelMessage.class, msg ->
{
IncompleteDataSourceEntry entry = this.fullDataRequests.remove(msg.futureId);
@@ -2,6 +2,7 @@ package com.seibel.distanthorizons.core.multiplayer;
import com.seibel.distanthorizons.core.config.Config;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.network.IClientRequestHandler;
import com.seibel.distanthorizons.core.network.ScopedNetworkEventSource;
import com.seibel.distanthorizons.core.network.NetworkClient;
import com.seibel.distanthorizons.core.network.messages.AckMessage;
@@ -17,29 +18,40 @@ public class ClientNetworkState implements Closeable
{
protected static final Logger LOGGER = DhLoggerBuilder.getLogger();
private final ScopedNetworkEventSource<NetworkClient> eventSource;
private final NetworkClient client;
private final UUID playerUUID;
public MultiplayerConfig config = new MultiplayerConfig();
public NetworkClient client() { return this.eventSource.parent; }
/**
* Returns the client used by this instance. <p>
* If you need to subscribe to any packet events, create an instance of {@link ScopedNetworkEventSource} using the returned instance.
*/
public IClientRequestHandler getClient() { return this.client; }
/**
* Constructs a new instance.
*
* @param networkClient Client to use. It is assumed that this client will be at full control by this instance.
* @param playerUUID UUID of a player connected
*/
public ClientNetworkState(NetworkClient networkClient, UUID playerUUID)
{
this.eventSource = new ScopedNetworkEventSource<>(networkClient);
this.client = networkClient;
this.playerUUID = playerUUID;
this.registerNetworkHandlers();
this.client().startConnecting();
this.client.startConnecting();
}
private void registerNetworkHandlers()
{
this.client().registerHandler(HelloMessage.class, helloMessage ->
this.client.registerHandler(HelloMessage.class, helloMessage ->
{
LOGGER.info("Connected to server: "+helloMessage.getChannelContext().channel().remoteAddress());
this.client().<AckMessage>sendRequest(new PlayerUUIDMessage(playerUUID))
.thenCompose(ack -> this.client().<RemotePlayerConfigMessage>sendRequest(new RemotePlayerConfigMessage(new MultiplayerConfig()
this.getClient().<AckMessage>sendRequest(new PlayerUUIDMessage(playerUUID))
.thenCompose(ack -> this.getClient().<RemotePlayerConfigMessage>sendRequest(new RemotePlayerConfigMessage(new MultiplayerConfig()
{{
renderDistance = Config.Client.Advanced.Graphics.Quality.lodChunkRenderDistance.get();
fullDataRequestRateLimit = Config.Client.Advanced.Multiplayer.serverNetworkingRateLimit.get();
}})))
.thenAccept(msg -> {
@@ -54,7 +66,6 @@ public class ClientNetworkState implements Closeable
public void close()
{
this.eventSource.close();
this.client().close();
this.client.close();
}
}
@@ -0,0 +1,19 @@
package com.seibel.distanthorizons.core.network;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import java.util.concurrent.CompletableFuture;
public interface IClientRequestHandler
{
/** Indicates whether the client is initialized and not started connecting yet. */
boolean isInitialState();
/** Indicates whether the client is closed(-ing) and should not be used. */
boolean isClosed();
/** Indicates whether the connection is established and first message is sent. */
boolean isReady();
/** Sends a new request. */
<TResponse extends FutureTrackableNetworkMessage> CompletableFuture<TResponse> sendRequest(FutureTrackableNetworkMessage msg);
}
@@ -19,7 +19,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NetworkClient extends NetworkEventSource implements AutoCloseable
public class NetworkClient extends NetworkEventSource implements IClientRequestHandler, AutoCloseable
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
@@ -102,6 +102,8 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
// FIXME sometimes this causes the MC connection to crash
// this might happen if the URL can't be converted to a IP (IE UnknownHostException)
ChannelFuture connectFuture = this.clientBootstrap.connect(this.address);
this.channel = connectFuture.channel();
connectFuture.addListener((ChannelFuture channelFuture) ->
{
if (!channelFuture.isSuccess())
@@ -114,7 +116,6 @@ public class NetworkClient extends NetworkEventSource implements AutoCloseable
ready = true;
});
this.channel = connectFuture.channel();
this.channel.closeFuture().addListener((ChannelFuture channelFuture) ->
{
ready = false;
@@ -66,7 +66,10 @@ public abstract class NetworkEventSource
if (!handled)
{
LOGGER.warn("Unhandled message type: " + message.getClass().getSimpleName());
String error = "Unhandled message type: " + message.getClass().getSimpleName();
if (message instanceof FutureTrackableNetworkMessage)
error += ", future id: " + ((FutureTrackableNetworkMessage) message).futureId;
LOGGER.warn(error);
}
}
@@ -14,9 +14,9 @@ public class CloseReasonMessage extends NetworkMessage
public CloseReasonMessage(String reason) { this.reason = reason; }
@Override
public void encode(ByteBuf out) { INetworkObject.encodeString(this.reason, out); }
public void encode(ByteBuf out) { encodeString(this.reason, out); }
@Override
public void decode(ByteBuf in) { this.reason = INetworkObject.decodeString(in); }
public void decode(ByteBuf in) { this.reason = decodeString(in); }
}
@@ -27,13 +27,13 @@ public class ExceptionMessage extends FutureTrackableNetworkMessage
@Override protected void encode0(ByteBuf out)
{
out.writeInt(exceptionMap.indexOf(exception.getClass()));
INetworkObject.encodeString(exception.getMessage(), out);
encodeString(exception.getMessage(), out);
}
@Override protected void decode0(ByteBuf in) throws Exception
{
int id = in.readInt();
String message = INetworkObject.decodeString(in);
String message = decodeString(in);
exception = exceptionMap.get(id).getDeclaredConstructor(String.class).newInstance(message);
}
}
@@ -1,14 +1,10 @@
package com.seibel.distanthorizons.core.network.messages;
import com.google.common.collect.MapMaker;
import com.seibel.distanthorizons.core.level.DhClientLevel;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.network.protocol.INetworkObject;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ConcurrentMap;
public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage
{
public DhSectionPos dhSectionPos;
@@ -28,6 +24,6 @@ public class FullDataSourceRequestMessage extends FutureTrackableNetworkMessage
@Override
public void decode0(ByteBuf in)
{
dhSectionPos = INetworkObject.decode(new DhSectionPos((byte)0, 0, 0), in);
dhSectionPos = INetworkObject.decodeStatic(DhSectionPos.zero(), in);
}
}
@@ -0,0 +1,31 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
public class GenTaskPriorityRequestMessage extends FutureTrackableNetworkMessage
{
public List<DhSectionPos> posList = new ArrayList<>();
public GenTaskPriorityRequestMessage() { }
public GenTaskPriorityRequestMessage(List<DhSectionPos> posList)
{
this.posList = posList;
}
@Override
protected void encode0(ByteBuf out)
{
encodeCollection(out, posList);
}
@Override
protected void decode0(ByteBuf in)
{
decodeCollection(in, posList, DhSectionPos::zero);
}
}
@@ -0,0 +1,32 @@
package com.seibel.distanthorizons.core.network.messages;
import com.seibel.distanthorizons.core.network.protocol.FutureTrackableNetworkMessage;
import com.seibel.distanthorizons.core.pos.DhSectionPos;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GenTaskPriorityResponseMessage extends FutureTrackableNetworkMessage
{
public Map<DhSectionPos, Integer> posList = new HashMap<>();
public GenTaskPriorityResponseMessage() { }
public GenTaskPriorityResponseMessage(Map<DhSectionPos, Integer> posList)
{
this.posList = posList;
}
@Override
protected void encode0(ByteBuf out)
{
encodeCollection(out, posList.entrySet());
}
@Override
protected void decode0(ByteBuf in)
{
decodeMap(in, posList, DhSectionPos::zero, () -> 0);
}
}
@@ -18,6 +18,6 @@ public class RemotePlayerConfigMessage extends FutureTrackableNetworkMessage
public void encode0(ByteBuf out) { this.payload.encode(out); }
@Override
public void decode0(ByteBuf in) { this.payload = INetworkObject.decode(new MultiplayerConfig(), in); }
public void decode0(ByteBuf in) { this.payload = INetworkObject.decodeStatic(new MultiplayerConfig(), in); }
}
@@ -3,11 +3,13 @@ package com.seibel.distanthorizons.core.network.protocol;
import com.seibel.distanthorizons.core.network.messages.ExceptionMessage;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class FutureTrackableNetworkMessage extends NetworkMessage
{
private static int lastId = 0;
private static final AtomicInteger lastId = new AtomicInteger();
// Only low 32 bits are sent (high bits are used for identifying a channel this request was sent from by remote peer)
public long futureId = lastId++;
public long futureId = lastId.incrementAndGet();
public void sendResponse(FutureTrackableNetworkMessage responseMessage)
{
@@ -3,6 +3,10 @@ package com.seibel.distanthorizons.core.network.protocol;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.*;
public interface INetworkObject
{
@@ -10,22 +14,109 @@ public interface INetworkObject
void decode(ByteBuf in);
static <T extends INetworkObject> T decode(T obj, ByteBuf inputByteBuf)
static <T extends INetworkObject> T decodeStatic(T obj, ByteBuf inputByteBuf)
{
obj.decode(inputByteBuf);
return obj;
}
static void encodeString(String inputString, ByteBuf outputByteBuf)
default void encodeString(String inputString, ByteBuf outputByteBuf)
{
outputByteBuf.writeShort(inputString.length());
outputByteBuf.writeBytes(inputString.getBytes(StandardCharsets.UTF_8));
byte[] bytes = inputString.getBytes(StandardCharsets.UTF_8);
outputByteBuf.writeShort(bytes.length);
outputByteBuf.writeBytes(bytes);
}
static String decodeString(ByteBuf inputByteBuf)
default String decodeString(ByteBuf inputByteBuf)
{
int length = inputByteBuf.readShort();
return inputByteBuf.readBytes(length).toString(StandardCharsets.UTF_8);
}
default <T> void encodeCollection(ByteBuf outputByteBuf, Collection<T> collection)
{
outputByteBuf.writeInt(collection.size());
Codec codec = null;
for (T item : collection)
{
if (codec == null)
codec = Codec.getCodec(item.getClass());
codec.encode.accept(item, outputByteBuf);
}
}
default <T> void decodeCollection(ByteBuf inputByteBuf, Collection<T> collection, Supplier<T> innerValueConstructor)
{
int size = inputByteBuf.readInt();
Codec codec = null;
for (int i = 0; i < size; i++)
{
T item = innerValueConstructor.get();
if (codec == null)
codec = Codec.getCodec(item.getClass());
item = (T) codec.decode.apply(item, inputByteBuf);
collection.add(item);
}
}
default <K, V> void decodeMap(ByteBuf inputByteBuf, Map<K, V> map, Supplier<K> keySupplier, Supplier<V> valueSupplier)
{
ArrayList<Map.Entry<K, V>> entryList = new ArrayList<>();
decodeCollection(inputByteBuf, entryList, () -> new AbstractMap.SimpleEntry<>(keySupplier.get(), valueSupplier.get()));
for (Map.Entry<K, V> entry : entryList)
map.put(entry.getKey(), entry.getValue());
}
/** Should only be used for non-editable classes;
* otherwise, you may want to implement {@link INetworkObject} and use its method where applicable. */
class Codec
{
private static final ConcurrentMap<Class<?>, Codec> codecMap = new ConcurrentHashMap<Class<?>, Codec>()
{{
// Primitives must be added manually here
put(Integer.class, new Codec((obj, out) -> out.writeInt((int)obj), (obj, in) -> in.readInt()));
put(INetworkObject.class, new Codec(INetworkObject::encode, INetworkObject::decodeStatic));
put(Map.Entry.class, new Codec(
(obj, out) -> {
Map.Entry<?, ?> entry = (Entry<?, ?>) obj;
getCodec(entry.getKey().getClass()).encode.accept(entry.getKey(), out);
getCodec(entry.getValue().getClass()).encode.accept(entry.getValue(), out);
},
(obj, in) -> {
Map.Entry<?, ?> entry = (Entry<?, ?>) obj;
return new SimpleEntry<>(
getCodec(entry.getKey().getClass()).decode.apply(entry.getKey(), in),
getCodec(entry.getValue().getClass()).decode.apply(entry.getValue(), in)
);
}
));
}};
public final BiConsumer<Object, ByteBuf> encode;
public final BiFunction<Object, ByteBuf, Object> decode;
public <T> Codec(BiConsumer<T, ByteBuf> encode, BiFunction<T, ByteBuf, T> decode)
{
this.encode = (BiConsumer<Object, ByteBuf>) encode;
this.decode = (BiFunction<Object, ByteBuf, Object>) decode;
}
public static Codec getCodec(Class<?> clazz) {
return codecMap.computeIfAbsent(clazz, ignored -> {
for (Map.Entry<Class<?>, Codec> entry : codecMap.entrySet())
{
if (entry.getKey().isAssignableFrom(clazz))
return entry.getValue();
}
throw new AssertionError("Class has no compatible codec: "+clazz.getSimpleName());
});
}
}
}
@@ -12,7 +12,7 @@ public class MessageDecoder extends ByteToMessageDecoder
protected void decode(ChannelHandlerContext channelContext, ByteBuf inputByteBuf, List<Object> outputDecodedObjectList)
{
NetworkMessage message = MessageRegistry.INSTANCE.createMessage(inputByteBuf.readShort());
outputDecodedObjectList.add(INetworkObject.decode(message, inputByteBuf));
outputDecodedObjectList.add(INetworkObject.decodeStatic(message, inputByteBuf));
}
}
@@ -21,18 +21,27 @@ public class MessageRegistry
{
// Note: Messages must have parameterless constructors
// Keep messages below intact so client/server can disconnect if version does not match
// Opening & closing connection
// These messages should be compatible with any previous protocol versions
this.registerMessage(HelloMessage.class, HelloMessage::new);
this.registerMessage(CloseReasonMessage.class, CloseReasonMessage::new);
// Define your messages after this line
// Core
this.registerMessage(AckMessage.class, AckMessage::new);
this.registerMessage(CancelMessage.class, CancelMessage::new);
this.registerMessage(ExceptionMessage.class, ExceptionMessage::new);
// ID & config
this.registerMessage(PlayerUUIDMessage.class, PlayerUUIDMessage::new);
this.registerMessage(RemotePlayerConfigMessage.class, RemotePlayerConfigMessage::new);
// Full data requests
this.registerMessage(FullDataSourceRequestMessage.class, FullDataSourceRequestMessage::new);
this.registerMessage(FullDataSourceResponseMessage.class, FullDataSourceResponseMessage::new);
// Generation task prioritization
this.registerMessage(GenTaskPriorityRequestMessage.class, GenTaskPriorityRequestMessage::new);
this.registerMessage(GenTaskPriorityResponseMessage.class, GenTaskPriorityResponseMessage::new);
}
@@ -49,6 +49,8 @@ public class DhSectionPos implements INetworkObject
public static DhSectionPos zero() { return new DhSectionPos((byte) 0, 0, 0); };
public DhSectionPos(byte sectionDetailLevel, int sectionX, int sectionZ)
{
this.sectionDetailLevel = sectionDetailLevel;
@@ -39,7 +39,7 @@ public interface IWrapperFactory extends IBindable
{
AbstractBatchGenerationEnvironmentWrapper createBatchGenerator(IDhLevel targetLevel);
IBiomeWrapper deserializeBiomeWrapper(String str, ILevelWrapper levelWrapper) throws IOException;
IBlockStateWrapper deserializeBlockStateWrapper(String str) throws IOException;
IBlockStateWrapper deserializeBlockStateWrapper(String str, ILevelWrapper levelWrapper) throws IOException;
IBlockStateWrapper getAirBlockStateWrapper();
/**
@@ -1,11 +1,12 @@
package com.seibel.distanthorizons.core.wrapperInterfaces.block;
import com.seibel.distanthorizons.api.interfaces.block.IDhApiBlockStateWrapper;
import com.seibel.distanthorizons.core.wrapperInterfaces.world.ILevelWrapper;
/** A Minecraft version independent way of handling Blocks. */
public interface IBlockStateWrapper extends IDhApiBlockStateWrapper
{
String serialize();
String serialize(ILevelWrapper levelWrapper);
/**
* Returning a value of 0 means the block is completely transparent. <br.