re-implement data source pooling

This commit is contained in:
James Seibel
2024-03-21 22:07:35 -05:00
parent 18ad241025
commit 5d50775932
8 changed files with 118 additions and 71 deletions
@@ -386,6 +386,15 @@ public class FullDataSourceV1 implements IDataSource<IDhLevel>
public void setIdMapping(FullDataPointIdMap mappings) { this.mapping.mergeAndReturnRemappedEntityIds(mappings); }
//==================//
// override methods //
//==================//
@Override
public void close() throws Exception
{ /* not currently needed */ }
//================//
// helper classes //
@@ -903,63 +903,84 @@ public class FullDataSourceV2 implements IDataSource<IDhLevel>
// pooling //
//=========//
// TODO add pooled data sources
private static class Pooling
@Override
public void close() throws Exception
{
/** used when pooling data sources */
private final ArrayList<FullDataSourceV1> cachedSources = new ArrayList<>();
private final ReentrantLock cacheLock = new ReentrantLock();
/** @return null if no pooled source exists */
public FullDataSourceV1 tryGetPooledSource()
returnPooledDataSource(this);
}
/** used when pooling data sources */
private static final ArrayList<FullDataSourceV2> CACHED_SOURCES = new ArrayList<>();
private static final ReentrantLock CACHE_LOCK = new ReentrantLock();
/** @return an empty data source if non are cached */
public static FullDataSourceV2 getPooledSource(DhSectionPos pos, boolean clearData)
{
try
{
try
{
this.cacheLock.lock();
int index = this.cachedSources.size() - 1;
if (index == -1)
{
return null;
}
else
{
return this.cachedSources.remove(index);
}
}
finally
{
this.cacheLock.unlock();
}
}
/**
* Doesn't have to be called, if a data source isn't returned, nothing will be leaked.
* It just means a new source must be constructed next time {@link Pooling#tryGetPooledSource} is called.
*/
public void returnPooledDataSource(FullDataSourceV1 dataSource)
{
if (dataSource == null)
{
return;
}
else if (this.cachedSources.size() > 25)
{
return;
}
CACHE_LOCK.lock();
try
int index = CACHED_SOURCES.size() - 1;
if (index == -1)
{
this.cacheLock.lock();
this.cachedSources.add(dataSource);
// no pooled sources exist
return createEmpty(pos);
}
finally
else
{
this.cacheLock.unlock();
FullDataSourceV2 dataSource = CACHED_SOURCES.remove(index);
dataSource.pos = pos;
if (clearData)
{
dataSource.mapping.clear(pos);
for (int i = 0; i < dataSource.dataPoints.length; i++)
{
if (dataSource.dataPoints[i] != null)
{
dataSource.dataPoints[i].clear();
}
}
Arrays.fill(dataSource.columnGenerationSteps, (byte) 0);
}
return dataSource;
}
}
finally
{
CACHE_LOCK.unlock();
}
}
/**
* Doesn't have to be called, if a data source isn't returned, nothing will be leaked.
* It just means a new source must be constructed next time {@link FullDataSourceV2#getPooledSource} is called.
*/
public static void returnPooledDataSource(FullDataSourceV2 dataSource)
{
if (dataSource == null)
{
return;
}
else if (CACHED_SOURCES.size() > 25)
{
return;
}
try
{
CACHE_LOCK.lock();
CACHED_SOURCES.add(dataSource);
}
finally
{
CACHE_LOCK.unlock();
}
}
@@ -477,6 +477,10 @@ public class ColumnRenderSource implements IDataSource<IDhClientLevel>
return stringBuilder.toString();
}
@Override
public void close() throws Exception
{ /* not currently needed */ }
//==============//
@@ -222,21 +222,23 @@ public abstract class AbstractNewDataSourceHandler
// get or create the data source
TDataSource recipientDataSource = this.get(updatePos);
boolean dataModified = recipientDataSource.update(inputData, this.level);
if (dataModified)
try (TDataSource recipientDataSource = this.get(updatePos))
{
// save the updated data to the database
TDTO dto = this.createDtoFromDataSource(recipientDataSource);
this.repo.save(dto);
boolean dataModified = recipientDataSource.update(inputData, this.level);
for (IDataSourceUpdateFunc<TDataSource> listener : this.dateSourceUpdateListeners)
if (dataModified)
{
if (listener != null)
// save the updated data to the database
TDTO dto = this.createDtoFromDataSource(recipientDataSource);
this.repo.save(dto);
for (IDataSourceUpdateFunc<TDataSource> listener : this.dateSourceUpdateListeners)
{
listener.OnDataSourceUpdated(recipientDataSource);
if (listener != null)
{
listener.OnDataSourceUpdated(recipientDataSource);
}
}
}
}
@@ -11,11 +11,13 @@ import com.seibel.distanthorizons.core.util.objects.dataStreams.DhDataOutputStre
import java.io.IOException;
/**
* Base for all data sources.
* Base for all data sources. <br><br>
*
* AutoCloseable Can be implemented to allow for disposing of pooled data sources. <br><br>
*
* @param <TDhLevel> there are times when we need specifically a client level vs a more generic level
*/
public interface IDataSource<TDhLevel extends IDhLevel> extends IBaseDTO<DhSectionPos>
public interface IDataSource<TDhLevel extends IDhLevel> extends IBaseDTO<DhSectionPos>, AutoCloseable
{
DhSectionPos getSectionPos();
@@ -156,17 +156,17 @@ public class FullDataSourceProviderV2
@Override
protected FullDataSourceV2 createDataSourceFromDto(FullDataSourceV2DTO dto) throws InterruptedException, IOException
{ return dto.createDataSource(this.level.getLevelWrapper()); }
{ return dto.createPooledDataSource(this.level.getLevelWrapper()); }
@Override
protected FullDataSourceV2 createNewDataSourceFromExistingDtos(DhSectionPos pos)
{
// TODO maybe just set children update flags to true?
// TODO is any special logic necessary? All DTOs should be generated using their children via the update system anyway
return FullDataSourceV2.createEmpty(pos);
return FullDataSourceV2.getPooledSource(pos, true);
}
@Override
protected FullDataSourceV2 makeEmptyDataSource(DhSectionPos pos) { return FullDataSourceV2.createEmpty(pos); }
protected FullDataSourceV2 makeEmptyDataSource(DhSectionPos pos) { return FullDataSourceV2.getPooledSource(pos, true); }
@@ -248,9 +248,11 @@ public class FullDataSourceProviderV2
childReadLock.lock();
this.lockedPosSet.add(childPos);
FullDataSourceV2 dataSource = this.get(childPos);
this.updateDataSourceAtPos(parentUpdatePos, dataSource, false);
this.repo.setApplyToParent(childPos, false);
try (FullDataSourceV2 dataSource = this.get(childPos))
{
this.updateDataSourceAtPos(parentUpdatePos, dataSource, false);
this.repo.setApplyToParent(childPos, false);
}
}
catch (Exception e)
{
@@ -92,8 +92,12 @@ public class RenderSourceFileHandler extends AbstractLegacyDataSourceHandler<Col
{
ColumnRenderSource renderDataSource;
FullDataSourceV2 fullDataSource = this.fullDataSourceProvider.get(pos);
renderDataSource = FullDataToRenderDataTransformer.transformFullDataToRenderSource(fullDataSource, this.level);
try (FullDataSourceV2 fullDataSource = this.fullDataSourceProvider.get(pos))
{
renderDataSource = FullDataToRenderDataTransformer.transformFullDataToRenderSource(fullDataSource, this.level);
}
catch (Exception e) { throw new RuntimeException(e); }
return renderDataSource;
}
@@ -113,8 +113,11 @@ public class FullDataSourceV2DTO implements IBaseDTO<DhSectionPos>
// data source population //
//========================//
public FullDataSourceV2 createDataSource(@NotNull ILevelWrapper levelWrapper) throws IOException, InterruptedException
{ return this.populateDataSource(FullDataSourceV2.createEmpty(this.pos), levelWrapper); }
public FullDataSourceV2 createPooledDataSource(@NotNull ILevelWrapper levelWrapper) throws IOException, InterruptedException
{
FullDataSourceV2 dataSource = FullDataSourceV2.getPooledSource(this.pos, false);
return this.populateDataSource(dataSource, levelWrapper);
}
public FullDataSourceV2 populateDataSource(FullDataSourceV2 dataSource, @NotNull ILevelWrapper levelWrapper) throws IOException, InterruptedException
{ return this.internalPopulateDataSource(dataSource, levelWrapper, false); }