package io.trino.execution.executor.scheduler;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.testing.TestingTicker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:io/trino/execution/executor/scheduler/TestFairScheduler.class */
public class TestFairScheduler {

    /* loaded from: input_file:io/trino/execution/executor/scheduler/TestFairScheduler$TestFuture.class */
    private static class TestFuture extends AbstractFuture<Void> {
        private final CountDownLatch listenerAdded = new CountDownLatch(1);

        private TestFuture() {
        }

        public void addListener(Runnable runnable, Executor executor) {
            super.addListener(runnable, executor);
            this.listenerAdded.countDown();
        }

        public boolean set(Void r4) {
            return super.set(r4);
        }

        public void awaitListenerAdded() throws InterruptedException {
            this.listenerAdded.await();
        }
    }

    @Test
    public void testBasic() throws ExecutionException, InterruptedException {
        FairScheduler newInstance = FairScheduler.newInstance(1);
        try {
            Group createGroup = newInstance.createGroup("G1");
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            newInstance.submit(createGroup, 1, schedulerContext -> {
                atomicBoolean.set(true);
            }).get();
            ((AbstractBooleanAssert) Assertions.assertThat(atomicBoolean.get()).describedAs("Ran task", new Object[0])).isTrue();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Timeout(5)
    @Test
    public void testYield() throws ExecutionException, InterruptedException {
        TestingTicker testingTicker = new TestingTicker();
        FairScheduler newInstance = FairScheduler.newInstance(1, testingTicker);
        try {
            Group createGroup = newInstance.createGroup("G");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            ListenableFuture submit = newInstance.submit(createGroup, 1, schedulerContext -> {
                countDownLatch.countDown();
                while (!atomicBoolean.get() && schedulerContext.maybeYield()) {
                }
            });
            countDownLatch.await();
            ListenableFuture submit2 = newInstance.submit(createGroup, 2, schedulerContext2 -> {
                atomicBoolean.set(true);
            });
            while (!submit2.isDone()) {
                testingTicker.increment(FairScheduler.QUANTUM_NANOS * 2, TimeUnit.NANOSECONDS);
            }
            submit.get();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testBlocking() throws InterruptedException, ExecutionException {
        FairScheduler newInstance = FairScheduler.newInstance(1);
        try {
            Group createGroup = newInstance.createGroup("G");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            SettableFuture create = SettableFuture.create();
            ListenableFuture submit = newInstance.submit(createGroup, 1, schedulerContext -> {
                try {
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    ((AbstractBooleanAssert) Assertions.assertThat(atomicBoolean.get()).describedAs("Task 2 run", new Object[0])).isFalse();
                    schedulerContext.block(create);
                    ((AbstractBooleanAssert) Assertions.assertThat(atomicBoolean.get()).describedAs("Task 2 run", new Object[0])).isTrue();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            });
            countDownLatch.await();
            ListenableFuture submit2 = newInstance.submit(createGroup, 2, schedulerContext2 -> {
                countDownLatch3.countDown();
                atomicBoolean.set(true);
            });
            countDownLatch2.countDown();
            countDownLatch3.await();
            create.set((Object) null);
            submit.get();
            submit2.get();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCancelWhileYielding() throws InterruptedException, ExecutionException {
        TestingTicker testingTicker = new TestingTicker();
        FairScheduler newInstance = FairScheduler.newInstance(1, testingTicker);
        try {
            Group createGroup = newInstance.createGroup("G");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            CountDownLatch countDownLatch3 = new CountDownLatch(1);
            ListenableFuture submit = newInstance.submit(createGroup, 1, schedulerContext -> {
                try {
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    countDownLatch3.await();
                    ((AbstractBooleanAssert) Assertions.assertThat(schedulerContext.maybeYield()).describedAs("Cancelled while yielding", new Object[0])).isFalse();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            });
            countDownLatch.await();
            newInstance.pause();
            testingTicker.increment(FairScheduler.QUANTUM_NANOS * 2, TimeUnit.NANOSECONDS);
            countDownLatch2.countDown();
            newInstance.removeGroup(createGroup);
            countDownLatch3.countDown();
            submit.get();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCancelWhileBlocking() throws InterruptedException, ExecutionException {
        FairScheduler newInstance = FairScheduler.newInstance(1, new TestingTicker());
        try {
            Group createGroup = newInstance.createGroup("G");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            TestFuture testFuture = new TestFuture();
            ListenableFuture submit = newInstance.submit(createGroup, 1, schedulerContext -> {
                countDownLatch.countDown();
                ((AbstractBooleanAssert) Assertions.assertThat(schedulerContext.block(testFuture)).describedAs("Cancelled while blocking", new Object[0])).isFalse();
            });
            countDownLatch.await();
            testFuture.awaitListenerAdded();
            newInstance.removeGroup(createGroup);
            submit.get();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCleanupAfterFinish() throws InterruptedException, ExecutionException {
        FairScheduler newInstance = FairScheduler.newInstance(1, new TestingTicker());
        try {
            Group createGroup = newInstance.createGroup("G");
            AtomicInteger atomicInteger = new AtomicInteger();
            newInstance.submit(createGroup, 1, schedulerContext -> {
                atomicInteger.incrementAndGet();
            }).get();
            Assertions.assertThat(atomicInteger.get()).isEqualTo(1);
            Assertions.assertThat(newInstance.getTasks(createGroup)).isEmpty();
            if (newInstance != null) {
                newInstance.close();
            }
        } catch (Throwable th) {
            if (newInstance != null) {
                try {
                    newInstance.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
