package io.trino.server;

import com.google.common.base.Ticker;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.bootstrap.LifeCycleMethodsMap;
import io.airlift.testing.TestingTicker;
import io.airlift.units.Duration;
import io.trino.execution.StageId;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.metadata.NodeState;
import io.trino.operator.TaskStats;
import io.trino.server.NodeStateManager;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;

/* loaded from: input_file:io/trino/server/TestNodeStateManager.class */
class TestNodeStateManager {
    public static final int GRACE_PERIOD_MILLIS = 200;
    private FakeScheduledExecutorService executor;
    private NodeStateManager nodeStateManager;
    private TestingTicker ticker;
    private TestingShutdownAction shutdownAction;
    private AtomicReference<List<TaskInfo>> tasks = new AtomicReference<>(new ArrayList());
    private TestTaskObservable sqlTasksObservable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/TestNodeStateManager$FakeScheduledExecutorService.class */
    public static class FakeScheduledExecutorService implements ScheduledExecutorService {
        private final Ticker clock;
        private List<Job> jobs = new ArrayList();

        public FakeScheduledExecutorService(Ticker ticker) {
            this.clock = ticker;
        }

        @Override // java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            this.jobs.add(new Job(runnable, this.clock.read() + timeUnit.toNanos(j)));
            return null;
        }

        @Override // java.util.concurrent.ScheduledExecutorService
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
            return null;
        }

        @Override // java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return null;
        }

        @Override // java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return null;
        }

        void run() {
            long read = this.clock.read();
            List<Job> list = this.jobs.stream().filter(job -> {
                return job.time <= read;
            }).sorted(Comparator.comparing((v0) -> {
                return v0.time();
            })).toList();
            this.jobs.removeAll(list);
            new Thread(() -> {
                list.forEach(job2 -> {
                    job2.command.run();
                });
            }).start();
        }

        @Override // java.util.concurrent.ExecutorService
        public void shutdown() {
        }

        @Override // java.util.concurrent.ExecutorService
        public List<Runnable> shutdownNow() {
            return List.of();
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isShutdown() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isTerminated() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> Future<T> submit(Callable<T> callable) {
            return null;
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> Future<T> submit(Runnable runnable, T t) {
            return null;
        }

        @Override // java.util.concurrent.ExecutorService
        public Future<?> submit(Runnable runnable) {
            return null;
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) throws InterruptedException {
            return List.of();
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) throws InterruptedException {
            return List.of();
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> T invokeAny(Collection<? extends Callable<T>> collection) throws InterruptedException, ExecutionException {
            return null;
        }

        @Override // java.util.concurrent.ExecutorService
        public <T> T invokeAny(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            return null;
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/trino/server/TestNodeStateManager$Job.class */
    public static final class Job extends Record {
        private final Runnable command;
        private final long time;

        Job(Runnable runnable, long j) {
            this.command = runnable;
            this.time = j;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Job.class), Job.class, "command;time", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->command:Ljava/lang/Runnable;", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->time:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Job.class), Job.class, "command;time", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->command:Ljava/lang/Runnable;", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->time:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Job.class, Object.class), Job.class, "command;time", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->command:Ljava/lang/Runnable;", "FIELD:Lio/trino/server/TestNodeStateManager$Job;->time:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Runnable command() {
            return this.command;
        }

        public long time() {
            return this.time;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/TestNodeStateManager$TestTaskObservable.class */
    public static class TestTaskObservable implements NodeStateManager.SqlTasksObservable {
        Map<TaskId, StateMachine.StateChangeListener<TaskState>> tasks = new HashMap();

        private TestTaskObservable() {
        }

        public void addStateChangeListener(TaskId taskId, StateMachine.StateChangeListener<TaskState> stateChangeListener) {
            this.tasks.put(taskId, stateChangeListener);
        }

        Map<TaskId, StateMachine.StateChangeListener<TaskState>> getTasks() {
            return this.tasks;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/TestNodeStateManager$TestingShutdownAction.class */
    public static class TestingShutdownAction implements ShutdownAction {
        private boolean shuttingDown;

        private TestingShutdownAction() {
        }

        public void onShutdown() {
            this.shuttingDown = true;
        }

        public boolean isShuttingDown() {
            return this.shuttingDown;
        }
    }

    TestNodeStateManager() {
    }

    @BeforeEach
    public void setUp() {
        this.ticker = new TestingTicker();
        this.executor = new FakeScheduledExecutorService(this.ticker);
        this.shutdownAction = new TestingShutdownAction();
        this.sqlTasksObservable = new TestTaskObservable();
        this.nodeStateManager = createNodeStateManager(GRACE_PERIOD_MILLIS);
    }

    @Test
    void testDefaultServerState() {
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.ACTIVE);
    }

    @Test
    void testDrain() {
        this.nodeStateManager.transitionState(NodeState.DRAINING);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINING);
        this.ticker.increment(1L, TimeUnit.SECONDS);
        this.executor.run();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINED);
        });
    }

    @Test
    void testTransitionToShuttingDown() {
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.ACTIVE);
        this.nodeStateManager.transitionState(NodeState.SHUTTING_DOWN);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.SHUTTING_DOWN);
        Awaitility.await().atMost(500L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.shutdownAction.isShuttingDown());
        });
    }

    @Test
    void testCannotReactivateShuttingDown() {
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.ACTIVE);
        this.nodeStateManager.transitionState(NodeState.SHUTTING_DOWN);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.SHUTTING_DOWN);
        Awaitility.await().atMost(500L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.shutdownAction.isShuttingDown());
        });
        Assertions.assertThatThrownBy(() -> {
            this.nodeStateManager.transitionState(NodeState.ACTIVE);
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Invalid state transition");
    }

    @Test
    void testImmediateTransitionToShuttingDownWhenDrained() {
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.ACTIVE);
        this.nodeStateManager.transitionState(NodeState.DRAINING);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINING);
        this.ticker.increment(1L, TimeUnit.SECONDS);
        this.executor.run();
        Awaitility.await().atMost(500L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINED);
        });
        this.nodeStateManager.transitionState(NodeState.SHUTTING_DOWN);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.SHUTTING_DOWN);
        Awaitility.await().pollInterval(1L, TimeUnit.MILLISECONDS).atMost(100L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(this.shutdownAction.isShuttingDown());
        });
    }

    @Test
    void testWaitActiveTasksToFinish() throws URISyntaxException {
        ArrayList arrayList = new ArrayList();
        TaskInfo createInitialTask = TaskInfo.createInitialTask(new TaskId(new StageId("query1", 1), 1, 1), new URI(""), "1", false, Optional.empty(), new TaskStats(DateTime.now(), (DateTime) null));
        arrayList.add(createInitialTask);
        this.tasks.set(arrayList);
        this.nodeStateManager.transitionState(NodeState.DRAINING);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINING);
        this.ticker.increment(1L, TimeUnit.SECONDS);
        this.executor.run();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.sqlTasksObservable.getTasks().size() == 1);
        });
        this.tasks.set(Collections.emptyList());
        this.sqlTasksObservable.getTasks().get(createInitialTask.taskStatus().getTaskId()).stateChanged(TaskState.FINISHED);
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINED);
        });
    }

    @Test
    void testDrainToActiveToDrain() throws URISyntaxException, InterruptedException {
        this.nodeStateManager = createNodeStateManager(400);
        ArrayList arrayList = new ArrayList();
        TaskInfo createInitialTask = TaskInfo.createInitialTask(new TaskId(new StageId("query1", 1), 1, 1), new URI(""), "1", false, Optional.empty(), new TaskStats(DateTime.now(), (DateTime) null));
        arrayList.add(createInitialTask);
        this.tasks.set(arrayList);
        this.nodeStateManager.transitionState(NodeState.DRAINING);
        Assertions.assertThat(this.nodeStateManager.getServerState()).isEqualTo(NodeState.DRAINING);
        this.ticker.increment(2L, TimeUnit.SECONDS);
        this.executor.run();
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.sqlTasksObservable.getTasks().size() == 1);
        });
        this.tasks.set(Collections.emptyList());
        this.sqlTasksObservable.getTasks().get(createInitialTask.taskStatus().getTaskId()).stateChanged(TaskState.FINISHED);
        Thread.sleep(200L);
        this.nodeStateManager.transitionState(NodeState.ACTIVE);
        this.tasks.set(arrayList);
        this.nodeStateManager.transitionState(NodeState.DRAINING);
        Awaitility.await().during(800L, TimeUnit.MILLISECONDS).atMost(1500L, TimeUnit.MILLISECONDS).failFast("NodeState should never be drained, while there are still activeTasks", () -> {
            return Boolean.valueOf(this.nodeStateManager.getServerState().equals(NodeState.DRAINED));
        }).until(() -> {
            return Boolean.valueOf(this.nodeStateManager.getServerState().equals(NodeState.DRAINING));
        });
    }

    private NodeStateManager createNodeStateManager(int i) {
        ServerConfig serverConfig = new ServerConfig();
        serverConfig.setCoordinator(false);
        serverConfig.setGracePeriod(new Duration(i, TimeUnit.MILLISECONDS));
        return new NodeStateManager(this.sqlTasksObservable, () -> {
            return this.tasks.get();
        }, serverConfig, this.shutdownAction, new LifeCycleManager(Collections.emptyList(), (LifeCycleMethodsMap) null), this.executor);
    }
}
