169 lines
7.7 KiB
Java
169 lines
7.7 KiB
Java
package net.minecraft.server.level;
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.mojang.datafixers.util.Either;
|
|
import java.util.stream.Stream;
|
|
import net.minecraft.Util;
|
|
import java.util.stream.Collector;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.Optional;
|
|
import java.util.function.IntConsumer;
|
|
import net.minecraft.world.level.ChunkPos;
|
|
import java.util.function.IntSupplier;
|
|
import com.google.common.collect.Sets;
|
|
import java.util.stream.Collectors;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.List;
|
|
import net.minecraft.util.thread.StrictQueue;
|
|
import net.minecraft.util.thread.ProcessorMailbox;
|
|
import java.util.Set;
|
|
import net.minecraft.util.Unit;
|
|
import java.util.function.Function;
|
|
import net.minecraft.util.thread.ProcessorHandle;
|
|
import java.util.Map;
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
public class ChunkTaskPriorityQueueSorter implements AutoCloseable, ChunkHolder.LevelChangeListener {
|
|
private static final Logger LOGGER;
|
|
private final Map<ProcessorHandle<?>, ChunkTaskPriorityQueue<? extends Function<ProcessorHandle<Unit>, ?>>> queues;
|
|
private final Set<ProcessorHandle<?>> sleeping;
|
|
private final ProcessorMailbox<StrictQueue.IntRunnable> mailbox;
|
|
|
|
public ChunkTaskPriorityQueueSorter(final List<ProcessorHandle<?>> list, final Executor executor, final int integer) {
|
|
final ChunkTaskPriorityQueue chunkTaskPriorityQueue;
|
|
this.queues = list.stream().collect(Collectors.toMap(Function.identity(), ags -> {
|
|
new ChunkTaskPriorityQueue(ags.name() + "_queue", integer);
|
|
return chunkTaskPriorityQueue;
|
|
}));
|
|
this.sleeping = Sets.newHashSet(list);
|
|
this.mailbox = new ProcessorMailbox<StrictQueue.IntRunnable>(new StrictQueue.FixedPriorityQueue(4), executor, "sorter");
|
|
}
|
|
|
|
public static Message<Runnable> message(final Runnable runnable, final long long2, final IntSupplier intSupplier) {
|
|
return new Message<Runnable>(ags -> () -> {
|
|
runnable.run();
|
|
ags.tell(Unit.INSTANCE);
|
|
}, long2, intSupplier);
|
|
}
|
|
|
|
public static Message<Runnable> message(final ChunkHolder uv, final Runnable runnable) {
|
|
return message(runnable, uv.getPos().toLong(), uv::getQueueLevel);
|
|
}
|
|
|
|
public static Release release(final Runnable runnable, final long long2, final boolean boolean3) {
|
|
return new Release(runnable, long2, boolean3);
|
|
}
|
|
|
|
public <T> ProcessorHandle<Message<T>> getProcessor(final ProcessorHandle<T> ags, final boolean boolean2) {
|
|
return this.mailbox.<ProcessorHandle<Message<T>>>ask(ags3 -> new StrictQueue.IntRunnable(0, () -> {
|
|
this.<T>getQueue(ags);
|
|
ags3.tell(ProcessorHandle.of("chunk priority sorter around " + ags.name(), a -> this.<T>submit(ags, a.task, a.pos, a.level, boolean2)));
|
|
})).join();
|
|
}
|
|
|
|
public ProcessorHandle<Release> getReleaseProcessor(final ProcessorHandle<Runnable> ags) {
|
|
return this.mailbox.<ProcessorHandle<Release>>ask(ags2 -> new StrictQueue.IntRunnable(0, () -> ags2.tell(ProcessorHandle.of("chunk priority sorter around " + ags.name(), b -> this.<Runnable>release(ags, b.pos, b.task, b.clearQueue))))).join();
|
|
}
|
|
|
|
@Override
|
|
public void onLevelChange(final ChunkPos bhd, final IntSupplier intSupplier, final int integer, final IntConsumer intConsumer) {
|
|
final int integer2;
|
|
this.mailbox.tell(new StrictQueue.IntRunnable(0, () -> {
|
|
integer2 = intSupplier.getAsInt();
|
|
this.queues.values().forEach(ux -> ux.resortChunkTasks(integer2, bhd, integer));
|
|
intConsumer.accept(integer);
|
|
}));
|
|
}
|
|
|
|
private <T> void release(final ProcessorHandle<T> ags, final long long2, final Runnable runnable, final boolean boolean4) {
|
|
final ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, Object>> ux7;
|
|
this.mailbox.tell(new StrictQueue.IntRunnable(1, () -> {
|
|
ux7 = this.getQueue(ags);
|
|
ux7.release(long2, boolean4);
|
|
if (this.sleeping.remove(ags)) {
|
|
this.pollTask(ux7, (ProcessorHandle<Object>)ags);
|
|
}
|
|
runnable.run();
|
|
}));
|
|
}
|
|
|
|
private <T> void submit(final ProcessorHandle<T> ags, final Function<ProcessorHandle<Unit>, T> function, final long long3, final IntSupplier intSupplier, final boolean boolean5) {
|
|
final ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, Object>> ux8;
|
|
final int integer9;
|
|
this.mailbox.tell(new StrictQueue.IntRunnable(2, () -> {
|
|
ux8 = this.getQueue(ags);
|
|
integer9 = intSupplier.getAsInt();
|
|
ux8.submit(Optional.<Function<ProcessorHandle<Unit>, Object>>of((Function<ProcessorHandle<Unit>, Object>)function), long3, integer9);
|
|
if (boolean5) {
|
|
ux8.submit(Optional.<Function<ProcessorHandle<Unit>, Object>>empty(), long3, integer9);
|
|
}
|
|
if (this.sleeping.remove(ags)) {
|
|
this.pollTask(ux8, (ProcessorHandle<Object>)ags);
|
|
}
|
|
}));
|
|
}
|
|
|
|
private <T> void pollTask(final ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, T>> ux, final ProcessorHandle<T> ags) {
|
|
final Stream<Either<Function<ProcessorHandle<Unit>, Object>, Runnable>> stream4;
|
|
this.mailbox.tell(new StrictQueue.IntRunnable(3, () -> {
|
|
stream4 = (Stream<Either<Function<ProcessorHandle<Unit>, Object>, Runnable>>)ux.pop();
|
|
if (stream4 == null) {
|
|
this.sleeping.add(ags);
|
|
}
|
|
else {
|
|
Util.sequence(stream4.map(either -> (CompletableFuture)either.map((Function)ags::ask, runnable -> {
|
|
runnable.run();
|
|
return CompletableFuture.<Unit>completedFuture(Unit.INSTANCE);
|
|
})).collect(Collectors.toList())).thenAccept(list -> this.pollTask((ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, Object>>)ux, (ProcessorHandle<Object>)ags));
|
|
}
|
|
}));
|
|
}
|
|
|
|
private <T> ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, T>> getQueue(final ProcessorHandle<T> ags) {
|
|
final ChunkTaskPriorityQueue<? extends Function<ProcessorHandle<Unit>, ?>> ux3 = this.queues.get(ags);
|
|
if (ux3 == null) {
|
|
throw new IllegalArgumentException("No queue for: " + ags);
|
|
}
|
|
return (ChunkTaskPriorityQueue<Function<ProcessorHandle<Unit>, T>>)ux3;
|
|
}
|
|
|
|
@VisibleForTesting
|
|
public String getDebugStatus() {
|
|
return this.queues.entrySet().stream().map(entry -> entry.getKey().name() + "=[" + ((ChunkTaskPriorityQueue)entry.getValue()).getAcquired().stream().map(long1 -> long1 + ":" + new ChunkPos(long1)).collect(Collectors.joining(",")) + "]").collect(Collectors.joining(",")) + ", s=" + this.sleeping.size();
|
|
}
|
|
|
|
@Override
|
|
public void close() {
|
|
this.queues.keySet().forEach(ProcessorHandle::close);
|
|
}
|
|
|
|
static {
|
|
LOGGER = LogManager.getLogger();
|
|
}
|
|
|
|
public static final class Message<T> {
|
|
private final Function<ProcessorHandle<Unit>, T> task;
|
|
private final long pos;
|
|
private final IntSupplier level;
|
|
|
|
private Message(final Function<ProcessorHandle<Unit>, T> function, final long long2, final IntSupplier intSupplier) {
|
|
this.task = function;
|
|
this.pos = long2;
|
|
this.level = intSupplier;
|
|
}
|
|
}
|
|
|
|
public static final class Release {
|
|
private final Runnable task;
|
|
private final long pos;
|
|
private final boolean clearQueue;
|
|
|
|
private Release(final Runnable runnable, final long long2, final boolean boolean3) {
|
|
this.task = runnable;
|
|
this.pos = long2;
|
|
this.clearQueue = boolean3;
|
|
}
|
|
}
|
|
}
|