Limit number of retries on request errors
Add a delay if rate limit is hit
This commit is contained in:
+37
-16
@@ -44,6 +44,8 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
|
||||
private static final Timer TASK_FINISH_TIMER = TimerUtil.CreateTimer("RequestTaskFinishTimer");
|
||||
|
||||
private static final int MAX_RETRY_ATTEMPTS = 3;
|
||||
|
||||
protected static final long SHUTDOWN_TIMEOUT_SECONDS = 5;
|
||||
|
||||
public final ClientNetworkState networkState;
|
||||
@@ -85,6 +87,16 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
LodUtil.assertTrue(DhSectionPos.getDetailLevel(sectionPos) == DhSectionPos.SECTION_MINIMUM_DETAIL_LEVEL, "Only highest-detail sections are allowed.");
|
||||
|
||||
RequestQueueEntry entry = new RequestQueueEntry(chunkDataConsumer, clientTimestamp);
|
||||
entry.future.whenComplete((result, throwable) ->
|
||||
{
|
||||
this.waitingTasks.remove(sectionPos);
|
||||
|
||||
this.finishedRequests.incrementAndGet();
|
||||
if (!result || throwable != null)
|
||||
{
|
||||
this.failedRequests.incrementAndGet();
|
||||
}
|
||||
});
|
||||
this.waitingTasks.put(sectionPos, entry);
|
||||
return entry.future;
|
||||
}
|
||||
@@ -161,12 +173,9 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
request.handleAsync((response, throwable) ->
|
||||
{
|
||||
this.pendingTasksSemaphore.release();
|
||||
this.finishedRequests.incrementAndGet();
|
||||
|
||||
try
|
||||
{
|
||||
this.waitingTasks.remove(sectionPos);
|
||||
|
||||
if (throwable != null)
|
||||
{
|
||||
throw throwable;
|
||||
@@ -187,26 +196,37 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
catch (InvalidLevelException | RequestRejectedException ignored)
|
||||
{
|
||||
// We're too late / some cases might trigger a bunch of expected rejections
|
||||
return entry.future.complete(false);
|
||||
}
|
||||
catch (SessionClosedException | RateLimitedException e)
|
||||
catch (SessionClosedException | CancellationException ignored)
|
||||
{
|
||||
if (e instanceof RateLimitedException)
|
||||
{
|
||||
LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage());
|
||||
}
|
||||
// Triggered when level is unloaded
|
||||
return entry.future.cancel(false);
|
||||
}
|
||||
catch (RateLimitedException e)
|
||||
{
|
||||
LOGGER.warn("Rate limited by server, re-queueing task [" + sectionPos + "]: " + e.getMessage());
|
||||
|
||||
// Skip 1 second
|
||||
this.rateLimiter.acquireOrDrain(Integer.MAX_VALUE);
|
||||
entry.request = null;
|
||||
this.finishedRequests.decrementAndGet();
|
||||
}
|
||||
catch (CancellationException ignored)
|
||||
{
|
||||
this.finishedRequests.decrementAndGet();
|
||||
return null;
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
LOGGER.error("Error while fetching full data source", e);
|
||||
this.failedRequests.incrementAndGet();
|
||||
return entry.future.complete(false);
|
||||
entry.retryAttempts--;
|
||||
LOGGER.error("Error while fetching full data source, attempts left: {} / {}", entry.retryAttempts, MAX_RETRY_ATTEMPTS, e);
|
||||
|
||||
// Retry logic
|
||||
if (entry.retryAttempts > 0)
|
||||
{
|
||||
entry.request = null;
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
return entry.future.complete(false);
|
||||
}
|
||||
}
|
||||
|
||||
// Hack to work around a race condition
|
||||
@@ -291,6 +311,7 @@ public abstract class AbstractFullDataRequestQueue implements IDebugRenderable,
|
||||
|
||||
@CheckForNull
|
||||
public CompletableFuture<?> request;
|
||||
public int retryAttempts = MAX_RETRY_ATTEMPTS;
|
||||
|
||||
public RequestQueueEntry(
|
||||
Consumer<FullDataSourceV2> chunkDataConsumer,
|
||||
|
||||
+3
-11
@@ -40,20 +40,12 @@ public class SupplierBasedRateLimiter<T>
|
||||
{
|
||||
this.rateLimiter.setRate(this.maxRateSupplier.get());
|
||||
|
||||
if (this.rateLimiter.tryAcquire(permits))
|
||||
{
|
||||
return permits;
|
||||
}
|
||||
|
||||
int acquired = 0;
|
||||
while ((permits /= 2) > 0)
|
||||
while (permits > 0 && this.rateLimiter.tryAcquire())
|
||||
{
|
||||
if (this.rateLimiter.tryAcquire(permits))
|
||||
{
|
||||
acquired += permits;
|
||||
}
|
||||
acquired++;
|
||||
permits--;
|
||||
}
|
||||
|
||||
return acquired;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user