Add DB leak tracking

This commit is contained in:
James Seibel
2025-02-16 19:34:13 -06:00
parent 276f2adf00
commit 977204abf0
3 changed files with 297 additions and 9 deletions
@@ -23,8 +23,8 @@ import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.sql.DatabaseUpdater;
import com.seibel.distanthorizons.core.sql.DbConnectionClosedException;
import com.seibel.distanthorizons.core.sql.dto.IBaseDTO;
import com.seibel.distanthorizons.core.sql.repo.phantoms.AutoClosableTrackingWrapper;
import com.seibel.distanthorizons.core.util.KeyedLockContainer;
import com.seibel.distanthorizons.core.util.ThreadUtil;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@@ -33,8 +33,7 @@ import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -53,8 +52,6 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
private static final ConcurrentHashMap<String, Connection> CONNECTIONS_BY_CONNECTION_STRING = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<AbstractDhRepo<?, ?>, String> ACTIVE_CONNECTION_STRINGS_BY_REPO = new ConcurrentHashMap<>();
private static final ThreadPoolExecutor WAL_FLUSH_THREAD = ThreadUtil.makeSingleDaemonThreadPool("Abstract Repo WAL Flush");
private static final AtomicBoolean FLUSH_THREAD_QUEUED = new AtomicBoolean(false);
private final String connectionString;
@@ -63,6 +60,8 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
public final String databaseType;
public final File databaseFile;
public final Set<AutoClosableTrackingWrapper> openClosables = ConcurrentHashMap.newKeySet();
public final Class<? extends TDTO> dtoClass;
protected final KeyedLockContainer<TKey> saveLockContainer = new KeyedLockContainer<>();
@@ -356,7 +355,7 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
// Note: this can only handle 1 command at a time
boolean resultSetPresent = statement.execute(sql);
try (ResultSet resultSet = statement.getResultSet())
try (ResultSet resultSet = AutoClosableTrackingWrapper.wrap(ResultSet.class, statement.getResultSet(), this.openClosables))
{
return this.convertResultSetToDictionaryList(resultSet, resultSetPresent);
}
@@ -405,7 +404,8 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
boolean resultSetPresent = statement.execute();
if (resultSetPresent)
{
return statement.getResultSet();
ResultSet resultSet = statement.getResultSet();
return AutoClosableTrackingWrapper.wrap(ResultSet.class, resultSet, this.openClosables);
}
else
{
@@ -443,7 +443,7 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
{
PreparedStatement statement = this.connection.prepareStatement(sql);
statement.setQueryTimeout(TIMEOUT_SECONDS);
return statement;
return AutoClosableTrackingWrapper.wrap(PreparedStatement.class, statement, this.openClosables);
}
catch(SQLException e)
{
@@ -528,6 +528,32 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
{
CONNECTIONS_BY_CONNECTION_STRING.remove(this.connectionString);
// log any leaked objects
int openClosableCount = this.openClosables.size();
if (openClosableCount != 0)
{
LOGGER.warn("[" + openClosableCount + "] objects not closed for repo [" + this.getClass().getSimpleName() + "]-[" + this.getTableName() + "] with connection: [" + this.connectionString + "]. A memory leak may be present and closing this connection may take longer than normal.");
// header
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Unclosed objects: \n");
// leaked objects
HashMap<String, AtomicInteger> unclosedObjectCountsByString = this.getUnclosedObjectStringsAndCounts();
for (String objString : unclosedObjectCountsByString.keySet())
{
AtomicInteger countRef = unclosedObjectCountsByString.get(objString);
if (countRef != null)
{
stringBuilder.append("[" + countRef.get() + "] - [" + objString + "] \n");
}
}
LOGGER.warn(stringBuilder.toString());
}
if (!this.connection.isClosed())
{
LOGGER.info("Closing database connection: [" + this.connectionString + "]...");
@@ -622,6 +648,49 @@ public abstract class AbstractDhRepo<TKey, TDTO extends IBaseDTO<TKey>> implemen
return list;
}
/** used for logging leaked objects */
public HashMap<String, AtomicInteger> getUnclosedObjectStringsAndCounts()
{
HashMap<String, AtomicInteger> closableCountsByToString = new HashMap<>();
for (AutoClosableTrackingWrapper closableWrapper : this.openClosables)
{
// custom to-strings for better merging
String str = closableWrapper.wrappedClosable.getClass().getSimpleName();
if (closableWrapper.wrappedClosable instanceof ResultSet)
{
str += " @ " + closableWrapper.wrappedClosable.toString();
}
else if (closableWrapper.wrappedClosable instanceof PreparedStatement)
{
String sql = closableWrapper.wrappedClosable.toString();
int parametersIndex = sql.indexOf("\n parameters="); // remove Sqlite parameters so queries aren't separated by properties
if (parametersIndex != -1)
{
sql = sql.substring(0, parametersIndex);
}
str += " @ " + sql;
}
else
{
str += " @ " + closableWrapper.wrappedClosable.toString();
}
closableCountsByToString.compute(str, (stringVal, countRef) ->
{
if (countRef == null)
{
countRef = new AtomicInteger(0);
}
countRef.incrementAndGet();
return countRef;
});
}
return closableCountsByToString;
}
//==================//
@@ -0,0 +1,93 @@
package com.seibel.distanthorizons.core.sql.repo.phantoms;
import com.seibel.distanthorizons.coreapi.ModInfo;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Set;
/**
* This is used to detect leaks
* with our JDBC implementation, specifically
* to make sure all {@link AutoCloseable} objects
* are inside try-finally resources.
*/
public class AutoClosableTrackingWrapper implements InvocationHandler
{
//private static final Logger LOGGER = DhLoggerBuilder.getLogger();
/**
* should be enabled during development to
* notify if any resources are being leaked.
*/
public static final boolean TRACK_WRAPPERS = ModInfo.IS_DEV_BUILD;
@NotNull
public final AutoCloseable wrappedClosable;
@NotNull
public final Set<AutoClosableTrackingWrapper> parentTrackingSet;
//==============//
// constructors //
//==============//
@Nullable
public static <TStatic extends AutoCloseable> TStatic wrap(
@NotNull Class<?> clazz,
@Nullable TStatic wrappedClosable,
@NotNull Set<AutoClosableTrackingWrapper> trackingSet)
{
if (!TRACK_WRAPPERS)
{
return wrappedClosable;
}
// done to prevent null pointers
if (wrappedClosable == null)
{
return null;
}
return (TStatic) Proxy.newProxyInstance(
wrappedClosable.getClass().getClassLoader(),
new Class[]{ clazz },
new AutoClosableTrackingWrapper(wrappedClosable, trackingSet)
);
}
private AutoClosableTrackingWrapper(@NotNull AutoCloseable wrappedClosable, @NotNull Set<AutoClosableTrackingWrapper> trackingSet)
{
this.wrappedClosable = wrappedClosable;
this.parentTrackingSet = trackingSet;
this.parentTrackingSet.add(this);
}
//============//
// reflection //
//============//
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
// Track the close() method
if ("close".equals(method.getName()))
{
this.wrappedClosable.close();
this.parentTrackingSet.remove(this);
return null;
}
// Delegate all other methods to the wrapped object
return method.invoke(this.wrappedClosable, args);
}
}
+127 -1
View File
@@ -19,9 +19,11 @@
package tests;
import com.seibel.distanthorizons.core.logging.DhLoggerBuilder;
import com.seibel.distanthorizons.core.pos.DhChunkPos;
import com.seibel.distanthorizons.core.sql.DatabaseUpdater;
import com.seibel.distanthorizons.core.sql.repo.AbstractDhRepo;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,13 +36,14 @@ import java.io.File;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
/**
* Validates {@link AbstractDhRepo} is set up correctly.
*/
public class DhRepoSqliteTest
{
private static final Logger LOGGER = DhLoggerBuilder.getLogger();
public static String DATABASE_TYPE = "jdbc:sqlite";
public static String DB_FILE_NAME = "test.sqlite";
@@ -208,5 +211,128 @@ public class DhRepoSqliteTest
}
}
/**
* leak detection is done to make sure {@link ResultSet} and {@link PreparedStatement}'s
* are properly cleaned up.
*/
@Test
public void testRepoLeakDetection()
{
TestPrimaryKeyRepo primaryKeyRepo = null;
try
{
primaryKeyRepo = new TestPrimaryKeyRepo(DATABASE_TYPE, new File(DB_FILE_NAME));
int insertCount = 10;
int readCount = 10;
Assert.assertEquals(0, primaryKeyRepo.openClosables.size());
//=============================//
// correctly closed statements //
//=============================//
{
// insert
for (int i = 0; i < insertCount; i++)
{
TestSingleKeyDto insertDto = new TestSingleKeyDto(i, "a", 0L, (byte) 0);
try (PreparedStatement statement = primaryKeyRepo.createInsertStatement(insertDto))
{
primaryKeyRepo.query(statement);
if (i % 1_000 == 0)
{
System.out.println(i + " / " + insertCount);
}
}
}
Assert.assertEquals("Insert leaks", 0, primaryKeyRepo.openClosables.size());
// read
TestSingleKeyDto expectedReadDto = new TestSingleKeyDto(1, "a", 0L, (byte) 0);
for (int i = 0; i < readCount; i++)
{
try (PreparedStatement statement = primaryKeyRepo.createSelectStatementByKey(1);
ResultSet resultSet = primaryKeyRepo.query(statement))
{
TestSingleKeyDto readDto = primaryKeyRepo.convertResultSetToDto(resultSet);
Assert.assertEquals(expectedReadDto.id, readDto.id);
if (i % 1_000 == 0)
{
System.out.println(i + " / " + readCount);
}
}
}
Assert.assertEquals("read leaks", 0, primaryKeyRepo.openClosables.size());
}
//===================//
// leaked statements //
//===================//
{
// nuke the DB so we can insert without worries
primaryKeyRepo.deleteAll();
// insert
for (int i = 0; i < insertCount; i++)
{
TestSingleKeyDto insertDto = new TestSingleKeyDto(i, "a", 0L, (byte) 0);
PreparedStatement statement = primaryKeyRepo.createInsertStatement(insertDto);
primaryKeyRepo.query(statement);
if (i % 1_000 == 0)
{
System.out.println(i + " / " + insertCount);
}
}
Assert.assertNotEquals(0, primaryKeyRepo.openClosables.size());
primaryKeyRepo.openClosables.clear();
// read
for (int i = 0; i < readCount; i++)
{
PreparedStatement statement = primaryKeyRepo.createSelectStatementByKey(1);
ResultSet resultSet = primaryKeyRepo.query(statement);
TestSingleKeyDto readDto = primaryKeyRepo.convertResultSetToDto(resultSet);
Assert.assertEquals(1, readDto.id);
if (i % 1_000 == 0)
{
System.out.println(i + " / " + readCount);
}
}
Assert.assertNotEquals(0, primaryKeyRepo.openClosables.size());
}
}
catch (SQLException e)
{
Assert.fail(e.getMessage());
}
finally
{
if (primaryKeyRepo != null)
{
primaryKeyRepo.close();
}
}
}
}