minecraft-source/src/net/minecraft/world/level/chunk/storage/IOWorker.java

210 lines
7.2 KiB
Java

package net.minecraft.world.level.chunk.storage;
import org.apache.logging.log4j.LogManager;
import java.util.function.BiConsumer;
import java.util.Iterator;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import javax.annotation.Nullable;
import java.util.concurrent.CompletionException;
import java.io.IOException;
import net.minecraft.nbt.CompoundTag;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.util.concurrent.CompletableFuture;
import net.minecraft.world.level.ChunkPos;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
public class IOWorker implements AutoCloseable {
private static final Logger LOGGER;
private final Thread thread;
private final AtomicBoolean shutdownRequested;
private final Queue<Runnable> inbox;
private final RegionFileStorage storage;
private final Map<ChunkPos, PendingStore> pendingWrites;
private boolean running;
private CompletableFuture<Void> shutdownListener;
IOWorker(final RegionFileStorage cba, final String string) {
this.shutdownRequested = new AtomicBoolean();
this.inbox = Queues.newConcurrentLinkedQueue();
this.pendingWrites = Maps.newLinkedHashMap();
this.running = true;
this.shutdownListener = new CompletableFuture<Void>();
this.storage = cba;
(this.thread = new Thread(this::loop)).setName(string + " IO worker");
this.thread.start();
}
public CompletableFuture<Void> store(final ChunkPos bje, final CompoundTag jt) {
final PendingStore a5;
return this.<Void>submitTask(completableFuture -> () -> {
a5 = this.pendingWrites.computeIfAbsent(bje, bje -> new PendingStore());
a5.data = jt;
a5.result.whenComplete((void2, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
}
else {
completableFuture.complete(null);
}
});
});
}
@Nullable
public CompoundTag load(final ChunkPos bje) throws IOException {
final PendingStore a4;
CompoundTag jt5;
final CompletableFuture<CompoundTag> completableFuture2 = this.<CompoundTag>submitTask(completableFuture -> () -> {
a4 = this.pendingWrites.get(bje);
if (a4 != null) {
completableFuture.complete(a4.data);
}
else {
try {
jt5 = this.storage.read(bje);
completableFuture.complete(jt5);
}
catch (Exception exception5) {
IOWorker.LOGGER.warn("Failed to read chunk {}", bje, exception5);
completableFuture.completeExceptionally(exception5);
}
}
return;
});
try {
return completableFuture2.join();
}
catch (CompletionException completionException4) {
if (completionException4.getCause() instanceof IOException) {
throw (IOException)completionException4.getCause();
}
throw completionException4;
}
}
private CompletableFuture<Void> shutdown() {
return this.<Void>submitTask(completableFuture -> () -> {
this.running = false;
this.shutdownListener = completableFuture;
});
}
public CompletableFuture<Void> synchronize() {
final CompletableFuture<Void> completableFuture2;
return this.<Void>submitTask(completableFuture -> () -> {
completableFuture2 = CompletableFuture.allOf(this.pendingWrites.values().stream().map(a -> a.result).<CompletableFuture<?>>toArray(CompletableFuture[]::new));
completableFuture2.whenComplete((object, throwable) -> completableFuture.complete(null));
});
}
private <T> CompletableFuture<T> submitTask(final Function<CompletableFuture<T>, Runnable> function) {
final CompletableFuture<T> completableFuture3 = new CompletableFuture<T>();
this.inbox.add(function.apply(completableFuture3));
LockSupport.unpark(this.thread);
return completableFuture3;
}
private void waitForQueueNonEmpty() {
LockSupport.park("waiting for tasks");
}
private void loop() {
try {
while (this.running) {
final boolean boolean2 = this.processInbox();
final boolean boolean3 = this.storePendingChunk();
if (!boolean2 && !boolean3) {
this.waitForQueueNonEmpty();
}
}
this.processInbox();
this.storeRemainingPendingChunks();
}
finally {
this.closeStorage();
}
}
private boolean storePendingChunk() {
final Iterator<Map.Entry<ChunkPos, PendingStore>> iterator2 = this.pendingWrites.entrySet().iterator();
if (!iterator2.hasNext()) {
return false;
}
final Map.Entry<ChunkPos, PendingStore> entry3 = iterator2.next();
iterator2.remove();
this.runStore(entry3.getKey(), entry3.getValue());
return true;
}
private void storeRemainingPendingChunks() {
this.pendingWrites.forEach(this::runStore);
this.pendingWrites.clear();
}
private void runStore(final ChunkPos bje, final PendingStore a) {
try {
this.storage.write(bje, a.data);
a.result.complete(null);
}
catch (Exception exception4) {
IOWorker.LOGGER.error("Failed to store chunk {}", bje, exception4);
a.result.completeExceptionally(exception4);
}
}
private void closeStorage() {
try {
this.storage.close();
this.shutdownListener.complete(null);
}
catch (Exception exception2) {
IOWorker.LOGGER.error("Failed to close storage", (Throwable)exception2);
this.shutdownListener.completeExceptionally(exception2);
}
}
private boolean processInbox() {
boolean boolean2 = false;
Runnable runnable3;
while ((runnable3 = this.inbox.poll()) != null) {
boolean2 = true;
runnable3.run();
}
return boolean2;
}
@Override
public void close() throws IOException {
if (!this.shutdownRequested.compareAndSet(false, true)) {
return;
}
try {
this.shutdown().join();
}
catch (CompletionException completionException2) {
if (completionException2.getCause() instanceof IOException) {
throw (IOException)completionException2.getCause();
}
throw completionException2;
}
}
static {
LOGGER = LogManager.getLogger();
}
static class PendingStore {
private CompoundTag data;
private final CompletableFuture<Void> result;
private PendingStore() {
this.result = new CompletableFuture<Void>();
}
}
}