Start rework on the generationQueue system so that it hooks directly into base DataFileHandler instead of watching for PlaceHolder obj.

This also means a hyper complex concurrent customized quad tree implementation... So, great.
This commit is contained in:
TomTheFurry
2022-08-14 23:39:46 +08:00
parent d3732306a6
commit 1fc6487374
14 changed files with 594 additions and 34 deletions
@@ -2,6 +2,7 @@ package com.seibel.lod.core.a7;
import com.seibel.lod.core.a7.datatype.PlaceHolderRenderSource;
@Deprecated
public interface PlaceHolderQueue {
void track(PlaceHolderRenderSource source); // Note: Implementation should only track a weak reference to the source.
}
@@ -0,0 +1,111 @@
package com.seibel.lod.core.a7.datatype.full;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.accessor.SingleFullArrayView;
import com.seibel.lod.core.a7.pos.DhLodPos;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.a7.save.io.file.IDataSourceProvider;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
public class FullDataDownSampler {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public static CompletableFuture<LodDataSource> createDownSamplingFuture(DhSectionPos newTarget, IDataSourceProvider provider) {
// TODO: Make this future somehow run with lowest priority (to ensure ram usage stays low)
return createDownSamplingFuture(FullDataSource.createEmpty(newTarget), provider);
}
public static CompletableFuture<LodDataSource> createDownSamplingFuture(FullDataSource target, IDataSourceProvider provider) {
int sectionSizeNeeded = 1 << target.getDataDetail();
ArrayList<CompletableFuture<LodDataSource>> futures;
DhLodPos basePos = target.getSectionPos().getSectionBBoxPos().getCorner(FullDataSource.SECTION_SIZE_OFFSET);
if (sectionSizeNeeded <= FullDataSource.SECTION_SIZE_OFFSET) {
futures = new ArrayList<>(sectionSizeNeeded * sectionSizeNeeded);
for (int ox = 0; ox < sectionSizeNeeded; ox++) {
for (int oz = 0; oz < sectionSizeNeeded; oz++) {
CompletableFuture<LodDataSource> future = provider.read(new DhSectionPos(
FullDataSource.SECTION_SIZE_OFFSET, basePos.x + ox, basePos.z + oz));
future = future.whenComplete((source, ex) -> {
if (ex == null && source != null && source instanceof FullDataSource) {
downSample(target, (FullDataSource) source);
} else if (ex != null) {
LOGGER.error("Error while down sampling", ex);
}
});
futures.add(future);
}
}
} else {
futures = new ArrayList<>(FullDataSource.SECTION_SIZE * FullDataSource.SECTION_SIZE);
int multiplier = sectionSizeNeeded / FullDataSource.SECTION_SIZE;
for (int ox = 0; ox < FullDataSource.SECTION_SIZE; ox++) {
for (int oz = 0; oz < FullDataSource.SECTION_SIZE; oz++) {
CompletableFuture<LodDataSource> future = provider.read(new DhSectionPos(
FullDataSource.SECTION_SIZE_OFFSET, basePos.x + ox * multiplier, basePos.z + oz * multiplier));
future = future.whenComplete((source, ex) -> {
if (ex == null && source != null && source instanceof FullDataSource) {
downSample(target, (FullDataSource) source);
} else if (ex != null) {
LOGGER.error("Error while down sampling", ex);
}
});
futures.add(future);
}
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(v -> target);
}
public static void downSample(FullDataSource target, FullDataSource source) {
LodUtil.assertTrue(target.getSectionPos().overlaps(source.getSectionPos()));
LodUtil.assertTrue(target.getDataDetail() > source.getDataDetail());
byte detailDiff = (byte) (target.getDataDetail() - source.getDataDetail());
DhSectionPos trgPos = target.getSectionPos();
DhSectionPos srcPos = source.getSectionPos();
if (detailDiff >= FullDataSource.SECTION_SIZE_OFFSET) {
// The source occupies only 1 datapoint in the target
// FIXME: TEMP method for down-sampling: take only the corner column
int sourceSectionPerTargetData = 1 << (detailDiff - FullDataSource.SECTION_SIZE_OFFSET);
if (srcPos.sectionX % sourceSectionPerTargetData != 0 || srcPos.sectionZ % sourceSectionPerTargetData != 0) {
return;
}
DhLodPos trgOffset = trgPos.getCorner(target.getDataDetail());
DhLodPos srcOffset = srcPos.getSectionBBoxPos().convertUpwardsTo(target.getDataDetail());
int offsetX = trgOffset.x - srcOffset.x;
int offsetZ = trgOffset.z - srcOffset.z;
LodUtil.assertTrue(offsetX >= 0 && offsetX < FullDataSource.SECTION_SIZE
&& offsetZ >= 0 && offsetZ < FullDataSource.SECTION_SIZE);
target.isEmpty = false;
source.get(0,0).deepCopyTo(target.get(offsetX, offsetZ));
} else if (detailDiff > 0) {
// The source occupies multiple data-points in the target
int srcDataPerTrgData = 1 << detailDiff;
int overlappedTrgDataSize = FullDataSource.SECTION_SIZE / srcDataPerTrgData;
DhLodPos trgOffset = trgPos.getCorner(target.getDataDetail());
DhLodPos srcOffset = srcPos.getSectionBBoxPos().getCorner(target.getDataDetail());
int offsetX = trgOffset.x - srcOffset.x;
int offsetZ = trgOffset.z - srcOffset.z;
LodUtil.assertTrue(offsetX >= 0 && offsetX < FullDataSource.SECTION_SIZE
&& offsetZ >= 0 && offsetZ < FullDataSource.SECTION_SIZE);
target.isEmpty = false;
for (int ox = 0; ox < overlappedTrgDataSize; ox++) {
for (int oz = 0; oz < overlappedTrgDataSize; oz++) {
SingleFullArrayView column = target.get(ox + offsetX, oz + offsetZ);
column.downsampleFrom(source.subView(srcDataPerTrgData, ox * srcDataPerTrgData, oz * srcDataPerTrgData));
}
}
} else {
LodUtil.assertNotReach();
}
}
}
@@ -0,0 +1,12 @@
package com.seibel.lod.core.a7.datatype.full;
import com.seibel.lod.core.a7.pos.DhSectionPos;
public class SampledDataSource extends FullDataSource {
private boolean[] isGenerated;
protected SampledDataSource(DhSectionPos sectionPos) {
super(sectionPos);
}
}
@@ -0,0 +1,26 @@
package com.seibel.lod.core.a7.generation;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
public class BlockingGenerationQueue {
final BiConsumer<DhSectionPos, ChunkSizedData> writeConsumer;
public BlockingGenerationQueue(BiConsumer<DhSectionPos, ChunkSizedData> writeConsumer) {
this.writeConsumer = writeConsumer;
}
public CompletableFuture<LodDataSource> generate(DhSectionPos sectPos, Supplier<LodDataSource> creator) {
}
}
@@ -1,12 +1,13 @@
package com.seibel.lod.core.a7.generation;
import com.seibel.lod.core.a7.PlaceHolderQueue;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.PlaceHolderRenderSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullDataSource;
import com.seibel.lod.core.a7.pos.DhBlockPos2D;
import com.seibel.lod.core.a7.pos.DhLodPos;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.a7.save.io.file.IDataSourceProvider;
import com.seibel.lod.core.a7.util.UncheckedInterruptedException;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.DHChunkPos;
@@ -16,20 +17,59 @@ import org.apache.logging.log4j.Logger;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class GenerationQueue implements PlaceHolderQueue {
public class GenerationQueue {
static class Request implements Comparable<Request> {
long cachedDist;
final DhSectionPos sectionPos;
Request(DhSectionPos sectionPos) {
this.sectionPos = sectionPos;
}
@Override
public int compareTo(Request o) {
return Long.compare(cachedDist, o.cachedDist);
}
@Override
public int hashCode() {
return sectionPos.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Request) {
return sectionPos.equals(((Request) obj).sectionPos);
}
return false;
}
}
public CompletableFuture<LodDataSource> generate(DhSectionPos sectionPos) {
return CompletableFuture.supplyAsync(() -> {
try {
return generate0(sectionPos);
} catch (Exception e) {
throw new CompletionException(e);
}
});
}
private final Logger logger = DhLoggerBuilder.getLogger();
DhBlockPos2D lastPlayerPos = new DhBlockPos2D(0, 0);
final ConcurrentHashMap<DhSectionPos, WeakReference<PlaceHolderRenderSource>> trackers = new ConcurrentHashMap<>();
final BiConsumer<DhSectionPos, ChunkSizedData> writeConsumer;
final HashSet<DhSectionPos> inProgressSections = new HashSet<>();
final HashSet<Request, CompletableFuture<?>> inProgressSections = new HashSet<>();
public GenerationQueue(BiConsumer<DhSectionPos, ChunkSizedData> writeConsumer) {
this.writeConsumer = writeConsumer;
@@ -4,7 +4,7 @@ import com.seibel.lod.core.a7.generation.GenerationQueue;
import com.seibel.lod.core.a7.generation.IGenerator;
import com.seibel.lod.core.a7.render.LodQuadTree;
import com.seibel.lod.core.a7.util.FileScanner;
import com.seibel.lod.core.a7.save.io.file.LocalDataFileHandler;
import com.seibel.lod.core.a7.save.io.file.DataFileHandler;
import com.seibel.lod.core.a7.save.io.render.RenderFileHandler;
import com.seibel.lod.core.a7.pos.DhBlockPos2D;
import com.seibel.lod.core.a7.render.RenderBufferHandler;
@@ -21,7 +21,6 @@ import com.seibel.lod.core.wrapperInterfaces.minecraft.IMinecraftClientWrapper;
import com.seibel.lod.core.wrapperInterfaces.minecraft.IProfilerWrapper;
import com.seibel.lod.core.wrapperInterfaces.world.IBiomeWrapper;
import com.seibel.lod.core.wrapperInterfaces.world.IClientLevelWrapper;
import com.seibel.lod.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.lod.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.apache.logging.log4j.Logger;
@@ -31,7 +30,7 @@ public class DhClientServerLevel implements IClientLevel, IServerLevel {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
private static final IMinecraftClientWrapper MC_CLIENT = SingletonInjector.INSTANCE.get(IMinecraftClientWrapper.class);
public final LocalSaveStructure save;
public final LocalDataFileHandler dataFileHandler;
public final DataFileHandler dataFileHandler;
public GenerationQueue generationQueue = null;
public RenderFileHandler renderFileHandler = null;
public RenderBufferHandler renderBufferHandler = null; //TODO: Should this be owned by renderer?
@@ -46,7 +45,7 @@ public class DhClientServerLevel implements IClientLevel, IServerLevel {
this.save = save;
save.getDataFolder(level).mkdirs();
save.getRenderCacheFolder(level).mkdirs();
dataFileHandler = new LocalDataFileHandler(this, save.getDataFolder(level));
dataFileHandler = new DataFileHandler(this, save.getDataFolder(level));
FileScanner.scanFile(save, serverLevel, dataFileHandler, null);
LOGGER.info("Started DHLevel for {} with saves at {}", level, save);
}
@@ -1,14 +1,9 @@
package com.seibel.lod.core.a7.level;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.pos.DhBlockPos2D;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.a7.util.FileScanner;
import com.seibel.lod.core.a7.save.io.file.LocalDataFileHandler;
import com.seibel.lod.core.a7.save.io.file.DataFileHandler;
import com.seibel.lod.core.a7.save.structure.LocalSaveStructure;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.DHChunkPos;
import com.seibel.lod.core.wrapperInterfaces.world.ILevelWrapper;
import com.seibel.lod.core.wrapperInterfaces.world.IServerLevelWrapper;
import org.apache.logging.log4j.Logger;
@@ -18,14 +13,14 @@ public class DhServerLevel implements IServerLevel {
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public final LocalSaveStructure save;
public final LocalDataFileHandler dataFileHandler;
public final DataFileHandler dataFileHandler;
public final IServerLevelWrapper level;
public DhServerLevel(LocalSaveStructure save, IServerLevelWrapper level) {
this.save = save;
this.level = level;
save.getDataFolder(level).mkdirs();
dataFileHandler = new LocalDataFileHandler(this, save.getDataFolder(level));
dataFileHandler = new DataFileHandler(this, save.getDataFolder(level));
FileScanner.scanFile(save, level, dataFileHandler, null);
LOGGER.info("Started DHLevel for {} with saves at {}", level, save);
}
@@ -1,10 +1,11 @@
package com.seibel.lod.core.a7.pos;
import com.seibel.lod.core.util.LodUtil;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
public class DhLodPos {
public class DhLodPos implements Comparable<DhLodPos> {
public final byte detail;
public final int x;
public final int z;
@@ -50,6 +51,16 @@ public class DhLodPos {
LodUtil.assertTrue(newDetail >= detail);
return new DhLodPos(newDetail, Math.floorDiv(x, 1<<(newDetail-detail)), Math.floorDiv(z, 1<<(newDetail-detail)));
}
public DhLodPos getChild(int child0to3) {
if (child0to3 < 0 || child0to3 > 3) throw new IllegalArgumentException("child0to3 must be between 0 and 3");
if (detail <= 0) throw new IllegalStateException("detail must be greater than 0");
return new DhLodPos((byte) (detail - 1),
x * 2 + (child0to3 & 1),
z * 2 + ((child0to3 & 2) >> 1));
}
public int getChildIndexOfParent() {
return (x & 1) + ((z & 1) << 1);
}
@Override
public boolean equals(Object o) {
@@ -78,4 +89,9 @@ public class DhLodPos {
if (width.detail < detail) throw new IllegalArgumentException("add called with width.detail < pos detail");
return new DhLodPos(detail, x + width.convertTo(detail).value, z + width.convertTo(detail).value);
}
@Override
public int compareTo(@NotNull DhLodPos o) {
return detail != o.detail ? Integer.compare(detail, o.detail) : x != o.x ? Integer.compare(x, o.x) : Integer.compare(z, o.z);
}
}
@@ -51,6 +51,9 @@ public class DhSectionPos {
sectionX * 2 + (child0to3 & 1),
sectionZ * 2 + ((child0to3 & 2) >> 1));
}
public int getChildIndexOfParent() {
return (sectionX & 1) + ((sectionZ & 1) << 1);
}
public void forEachChild(Consumer<DhSectionPos> callback){
for (int i = 0; i < 4; i++) {
@@ -1,16 +1,13 @@
package com.seibel.lod.core.a7.save.io.file;
import com.google.common.collect.HashMultimap;
import com.seibel.lod.core.a7.datatype.DataSourceLoader;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.datatype.full.ChunkSizedData;
import com.seibel.lod.core.a7.datatype.full.FullDataSource;
import com.seibel.lod.core.a7.datatype.full.FullFormat;
import com.seibel.lod.core.a7.level.IServerLevel;
import com.seibel.lod.core.a7.pos.DhLodPos;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import com.seibel.lod.core.logging.DhLoggerBuilder;
import com.seibel.lod.core.objects.DHChunkPos;
import com.seibel.lod.core.util.LodUtil;
import org.apache.logging.log4j.Logger;
@@ -24,8 +21,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class LocalDataFileHandler implements IDataSourceProvider {
public class DataFileHandler implements IDataSourceProvider {
// Note: Single main thread only for now. May make it multi-thread later, depending on the usage.
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
final ExecutorService fileReaderThread = LodUtil.makeThreadPool(4, "FileReaderThread");
@@ -34,11 +32,14 @@ public class LocalDataFileHandler implements IDataSourceProvider {
final File saveDir;
AtomicInteger topDetailLevel = new AtomicInteger(-1);
final int minDetailLevel = FullDataSource.SECTION_SIZE_OFFSET;
final Function<DhSectionPos, CompletableFuture<LodDataSource>> dataSourceCreator;
public LocalDataFileHandler(IServerLevel level, File saveRootDir) {
public DataFileHandler(IServerLevel level, File saveRootDir,
Function<DhSectionPos, CompletableFuture<LodDataSource>> dataSourceCreator) {
this.saveDir = saveRootDir;
this.level = level;
this.dataSourceCreator = dataSourceCreator;
}
/*
@@ -114,7 +115,26 @@ public class LocalDataFileHandler implements IDataSourceProvider {
topDetailLevel.updateAndGet(v -> Math.max(v, pos.sectionDetail));
DataMetaFile metaFile = files.get(pos);
if (metaFile == null) {
return CompletableFuture.completedFuture(null);
File file = computeDefaultFilePath(pos);
//FIXME: Handle file already exists issue. Possibly by renaming the file.
LodUtil.assertTrue(!file.exists(), "File {} already exist for path {}", file, pos);
CompletableFuture<LodDataSource> gen = new CompletableFuture<>();
DataMetaFile newMetaFile = new DataMetaFile(level, file, pos, gen);
metaFile = files.putIfAbsent(pos, newMetaFile); // This is a CAS with expected null value.
if (metaFile == null) {
dataSourceCreator.apply(pos).handle((source, ex) -> {
if (ex != null) {
LOGGER.error("Failed to create data source for {}", pos, ex);
gen.completeExceptionally(ex);
} else {
gen.complete(source);
}
return null;
});
metaFile = newMetaFile;
} else {
gen.cancel(true);
}
}
return metaFile.loadOrGetCached(fileReaderThread);
}
@@ -142,7 +162,8 @@ public class LocalDataFileHandler implements IDataSourceProvider {
DataMetaFile metaFile = files.get(sectionPos);
if (metaFile != null) { // Fast path: if there is a file for this section, just write to it.
metaFile.addToWriteQueue(chunkData);
} else if (sectionPos.sectionDetail <= minDetailLevel) {
}
/* else if (sectionPos.sectionDetail <= minDetailLevel) {
File file = computeDefaultFilePath(sectionPos);
//FIXME: Handle file already exists issue. Possibly by renaming the file.
LodUtil.assertTrue(!file.exists(), "File {} already exist for path {}", file, sectionPos);
@@ -190,7 +211,7 @@ public class LocalDataFileHandler implements IDataSourceProvider {
metaFile.addToWriteQueue(chunkData);
gen.cancel(true);
}
}
}*/
if (sectionPos.sectionDetail <= topDetailLevel.get()) {
recursiveWrite(sectionPos.getParent(), chunkData);
}
@@ -0,0 +1,17 @@
package com.seibel.lod.core.a7.save.io.file;
import com.seibel.lod.core.a7.datatype.LodDataSource;
import com.seibel.lod.core.a7.generation.GenerationQueue;
import com.seibel.lod.core.a7.level.IServerLevel;
import com.seibel.lod.core.a7.pos.DhSectionPos;
import java.io.File;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class GeneratedDataFileHandler extends DataFileHandler {
public GeneratedDataFileHandler(IServerLevel level, File saveRootDir, GenerationQueue queue) {
super(level, saveRootDir, dataSourceCreator);
}
}
@@ -0,0 +1,202 @@
package com.seibel.lod.core.a7.util;
import com.seibel.lod.core.a7.pos.DhLodPos;
import com.seibel.lod.core.util.Atomics;
import com.seibel.lod.core.util.LodUtil;
import com.sun.jna.platform.unix.X11;
import org.apache.commons.lang3.NotImplementedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
interface CombinableResult<T> {
T combineWith(T b, T c, T d);
}
public class ConcurrentGenerationPosTree<R extends CombinableResult<R>> {
public class Node {
private final DhLodPos pos;
public final AtomicReference<CompletableFuture<R>> future;
public final AtomicReferenceArray<Node> children = new AtomicReferenceArray<>(4);
private Node(DhLodPos pos, CompletableFuture<R> future) {
this.pos = pos;
this.future = new AtomicReference<>(future);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Node node = (Node) o;
return pos.equals(node.pos);
}
@Override
public int hashCode() {
return pos.hashCode();
}
}
class RootMap {
private final ConcurrentSkipListMap<DhLodPos, Node> roots = new ConcurrentSkipListMap<>();
private final int topLevel;
RootMap(int topLevel) {
this.topLevel = topLevel;
}
Node get(DhLodPos pos) {
return roots.get(pos);
}
Node swap(DhLodPos pos, Node newRoot) {
return roots.put(pos, newRoot);
}
Node compareNullAndExchange(DhLodPos pos, Node newRoot) {
return roots.putIfAbsent(pos, newRoot);
}
boolean compareNullAndSet(DhLodPos pos, Node newRoot) {
Node oldRoot = roots.putIfAbsent(pos, newRoot);
return oldRoot == null;
}
Node setIfNullAndGet(DhLodPos pos, Node newRoot) {
Node oldRoot = roots.putIfAbsent(pos, newRoot);
return oldRoot == null ? newRoot : oldRoot;
}
Node swapNull(DhLodPos pos) {
return roots.remove(pos);
}
}
private final AtomicReference<RootMap> rootMap = new AtomicReference<>(new RootMap(0));
public ConcurrentGenerationPosTree() {}
@Override
public int hashCode() {
throw new NotImplementedException();
}
@Override
public boolean equals(Object obj) {
throw new NotImplementedException();
}
@Override
protected Object clone() throws CloneNotSupportedException {
throw new NotImplementedException();
}
@Override
public String toString() {
throw new NotImplementedException();
}
// Atomically update and get the generation future
private CompletableFuture<R> checkAndMakeFuture(Node node, BiConsumer<DhLodPos, CompletableFuture<R>> allNullCompleter) {
CompletableFuture<R> future = new CompletableFuture<>();
CompletableFuture<R> casValue = Atomics.compareAndExchange(node.future, null, future);
if (casValue != null) { // cas failed. Existing future. Return it.
return future;
}
// Next, we need to make the future completable.
// We first check for each child connection if it exists. If it does, we store it for a later 'allOf'.
boolean allNull = true;
@SuppressWarnings("unchecked")
CompletableFuture<R>[] childFutures = new CompletableFuture[4];
for (int i = 0; i < 4; i++) {
Node nextChild = node.children.get(i);
if (nextChild != null) { // child node exists. Recursively make or get the child's future.
allNull = false;
childFutures[i] = checkAndMakeFuture(nextChild, allNullCompleter);
}
}
if (allNull) { // all children are null. We can then just run the allNullCompleter in this node.
allNullCompleter.accept(node.pos, future);
} else { // some children exist. We need to wait for some or all of them to complete.
// But before that, we need to create the children node where they are missing.
for (int i = 0; i < 4; i++) {
if (childFutures[i] == null) {
CompletableFuture<R> newChildFuture = new CompletableFuture<>();
node.children.set(i, new Node(node.pos.getChild(i), newChildFuture));
// Since the child is new, we can be sure that it doesn't have any children.
// So, we need to make the new child's future completable by running the allNullCompleter.
// (The above relies on the fact that we did a CAS on the beginning of this method,
// which means that we have unique access to the node and it's links to the children, and that
// no other thread can be concurrently modifying its links)
allNullCompleter.accept(node.pos.getChild(i), newChildFuture);
}
}
// Now, we can wait for all the child futures to complete, and then complete this node's future with
// the combined result of all child futures.
CompletableFuture.allOf(childFutures).thenRun(() ->
future.complete(childFutures[0].join().combineWith(
childFutures[1].join(), childFutures[2].join(), childFutures[3].join())));
}
return future;
}
public CompletableFuture<R> createOrUseExisting(DhLodPos pos, BiConsumer<DhLodPos, CompletableFuture<R>> completer) {
RootMap map = rootMap.get();
if (map.topLevel == pos.detail) {
CompletableFuture<R> future = new CompletableFuture<>();
Node newNode = new Node(pos, future);
Node cas = map.compareNullAndExchange(pos, newNode);
if (cas == null) { // cas succeeded. No existing overlapping node in same detail level.
// Since any lower level nodes should have upper level nodes as parent up to the top level,
// and that there are no same level nodes, we can assume that the new node does not overlap any existing nodes.
// Therefore, we can apply the completer function to the new node, and return the future.
completer.accept(pos, future);
return future;
} else { // cas failed. Existing overlapping node.
// Run the checkAndMakeFuture method on the existing node to update and get the generation future.
return checkAndMakeFuture(cas, completer);
}
} else if (map.topLevel > pos.detail) {
// We need to traverse down the tree with the following rules during the traversal:
// 1. If the next node is not null and has a future, halt and return that future.
// 2. If the next node is not null with no future, continue traversing down the tree.
// 3. if the next node is null, create a new node and CompareExchange it into the current node, and run rule 1/2.
// Note that DO NOT assume that all subsequent nodes will fall into case 3, as someone else can concurrently
// use and modify the newly created node!
// To start, just treat the rootMap as the... well, root, and it's content as the children node.
// We can then traverse down the tree until we reach the target node or hit the 1st case and return prematurely.
// First iteration:
Node currentNode;
Node childNode = map.setIfNullAndGet(pos.convertUpwardsTo((byte) map.topLevel), new Node(pos, null)); // rule 3: if null, create a new node.
CompletableFuture<R> future = childNode.future.get();
if (future != null) { // rule 1: if future is not null, halt and return the future.
return future;
} else { // rule 2: if future is null, continue traversing down the tree.
currentNode = childNode;
// Second and subsequent iterations:
while (currentNode.pos.detail > pos.detail) {
Node newNode = new Node(pos.convertUpwardsTo((byte)(currentNode.pos.detail-1)), null);
childNode = Atomics.compareAndSetThenGet(currentNode.children,
newNode.pos.getChildIndexOfParent(), null, newNode); // rule 3: if null, create a new node.
if (childNode == null) { // rule 1: if future is not null, halt and return the future.
return newNode.future.get();
} else { // rule 2: if future is null, continue traversing down the tree.
currentNode = childNode;
}
}
}
// At this point, we have reached the target node.
LodUtil.assertTrue(currentNode.pos.equals(pos));
// We can now run the checkAndMakeFuture method on the target node to update and get the generation future.
return checkAndMakeFuture(currentNode, completer); // Technically, this will rerun the 1st rule. But code is cleaner this way.
} else { // map.topLevel < pos.detail
// Now, this is the complex case. We need to rebase the tree to the higher detail level.
// For now, this implementation will do a lock based version. However, I will figure out a way to do this without a lock.
// Before we do anything, we ...
// TODO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
return null;
}
}
}
@@ -136,17 +136,44 @@ public class LazySectionPosTree<T> implements ConcurrentMap<DhLodPos, T> {
switch (child0to3) {
case 0:
child0 = null;
break;
case 1:
child1 = null;
break;
case 2:
child2 = null;
break;
case 3:
child3 = null;
break;
}
LodUtil.assertNotReach();
}
private void setChild(int child0to3, Node child) {
LodUtil.assertTrue(child0to3 >= 0 && child0to3 <= 3);
child.parent = this;
switch (child0to3) {
case 0:
child0 = child;
child.child0to3 = 0;
break;
case 1:
child1 = child;
child.child0to3 = 1;
break;
case 2:
child2 = child;
child.child0to3 = 2;
break;
case 3:
child3 = child;
child.child0to3 = 3;
break;
}
LodUtil.assertNotReach();
}
}
private final ConcurrentSkipListMap<DhLodPos, Node> nodes = new ConcurrentSkipListMap<>();
private ConcurrentSkipListMap<DhLodPos, Node> nodes = new ConcurrentSkipListMap<>();
private byte topLevel = 0;
private AtomicInteger size = new AtomicInteger(0);
public LazySectionPosTree() {}
@@ -201,9 +228,32 @@ public class LazySectionPosTree<T> implements ConcurrentMap<DhLodPos, T> {
return from;
}
private void upcastTreeBase(byte level) {
//TODO
private void upcastTreeBase() {
}
private byte upcastSingeTreeBase() {
byte nextLevel = (byte) (topLevel + 1);
ConcurrentSkipListMap<DhLodPos, Node> newBase = new ConcurrentSkipListMap<>();
nodes.forEach((pos, node) ->
newBase.compute(pos.convertUpwardsTo(nextLevel), (key, old) -> {
if (old == null) {
old = new Node(null, 0, pos.convertUpwardsTo(nextLevel));
}
old.setChild(pos.getChildIndexOfParent(), node);
return old;
})
);
nodes = newBase; // todo: cas operation to here. (Will be block free but not wait free)
topLevel = nextLevel; //todo: atomic???
return nextLevel;
}
private void downcastTreeBase() {
byte prevLevel = (byte) (topLevel - 1);
ConcurrentSkipListMap<DhLodPos, Node> newBase = new ConcurrentSkipListMap<>();
}
@Override
@@ -247,7 +297,7 @@ public class LazySectionPosTree<T> implements ConcurrentMap<DhLodPos, T> {
}
// key.detail > topLevel:
// Rebase the tree
upcastTreeBase(key.detail);
//upcastTreeBase(key.detail);
return nodes.computeIfAbsent(key, k -> new Node(null, 0, key, value)).setValue(value);
}
@@ -348,7 +398,7 @@ public class LazySectionPosTree<T> implements ConcurrentMap<DhLodPos, T> {
}
// key.detail > topLevel:
// Rebase the tree
upcastTreeBase(key.detail);
//upcastTreeBase(key.detail);
return nodes.computeIfAbsent(key, k -> new Node(null, 0, key, null)).setIfAbsent(value);
}
@@ -364,7 +414,7 @@ public class LazySectionPosTree<T> implements ConcurrentMap<DhLodPos, T> {
}
// key.detail > topLevel:
// Rebase the tree
upcastTreeBase(key.detail);
//upcastTreeBase(key.detail);
return nodes.computeIfAbsent(key, k -> new Node(null, 0, key, null)).computeIfAbsent(mappingFunction);
}
@@ -0,0 +1,67 @@
package com.seibel.lod.core.util;
import it.unimi.dsi.fastutil.booleans.BooleanObjectImmutablePair;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class Atomics {
// While java 8 does have built in atomic operations, there doesn't seem to be any Compare And Exchange operation...
// So here we implement our own.
public static <T> T compareAndExchange(AtomicReference<T> atomic, T expected, T newValue) {
while (true) {
T oldValue = atomic.get();
if (oldValue != expected) return oldValue;
if (atomic.weakCompareAndSet(expected, newValue)) return expected;
}
}
public static <T> BooleanObjectImmutablePair<T> compareAndExchangeWeak(AtomicReference<T> atomic, T expected, T newValue) {
T oldValue = atomic.get();
if (oldValue == expected && atomic.weakCompareAndSet(expected, newValue)) {
return new BooleanObjectImmutablePair<>(true, expected);
} else {
return new BooleanObjectImmutablePair<>(false, oldValue);
}
}
// Additionally, we implement some helper methods for frequently used atomic operations.
// Compare with expected value and set new value if equal. Then return whatever value the atomic now contains.
public static <T> T compareAndSetThenGet(AtomicReference<T> atomic, T expected, T newValue) {
while (true) {
T oldValue = atomic.get();
if (oldValue != expected) return oldValue;
if (atomic.weakCompareAndSet(expected, newValue)) return newValue;
}
}
// Below is the array version of the above.
public static <T> T compareAndExchange(AtomicReferenceArray<T> array, int index, T expected, T newValue) {
while (true) {
T oldValue = array.get(index);
if (oldValue != expected) return oldValue;
if (array.weakCompareAndSet(index, expected, newValue)) return expected;
}
}
public static <T> BooleanObjectImmutablePair<T> compareAndExchangeWeak(AtomicReferenceArray<T> array, int index, T expected, T newValue) {
T oldValue = array.get(index);
if (oldValue == expected && array.weakCompareAndSet(index, expected, newValue)) {
return new BooleanObjectImmutablePair<>(true, expected);
} else {
return new BooleanObjectImmutablePair<>(false, oldValue);
}
}
public static <T> T compareAndSetThenGet(AtomicReferenceArray<T> array, int index, T expected, T newValue) {
while (true) {
T oldValue = array.get(index);
if (oldValue != expected) return oldValue;
if (array.weakCompareAndSet(index, expected, newValue)) return newValue;
}
}
}