package io.trino.server;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.metadata.NodeState;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.assertj.core.util.VisibleForTesting;

/* loaded from: input_file:io/trino/server/NodeStateManager.class */
public class NodeStateManager {
    private static final Logger log = Logger.get(NodeStateManager.class);
    private static final Duration LIFECYCLE_STOP_TIMEOUT = new Duration(30.0d, TimeUnit.SECONDS);
    private final ScheduledExecutorService shutdownHandler;
    private final ExecutorService lifeCycleStopper;
    private final LifeCycleManager lifeCycleManager;
    private final SqlTasksObservable sqlTasksObservable;
    private final Supplier<List<TaskInfo>> taskInfoSupplier;
    private final boolean isCoordinator;
    private final ShutdownAction shutdownAction;
    private final Duration gracePeriod;
    private final ScheduledExecutorService executor;
    private final AtomicReference<VersionedState> nodeState;
    private final AtomicLong stateVersionProvider;

    /* loaded from: input_file:io/trino/server/NodeStateManager$SqlTasksObservable.class */
    public interface SqlTasksObservable {
        void addStateChangeListener(TaskId taskId, StateMachine.StateChangeListener<TaskState> stateChangeListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/NodeStateManager$VersionedState.class */
    public class VersionedState {
        private final NodeState state;
        private final long version;

        private VersionedState(NodeState nodeState, long j) {
            this.state = (NodeState) Objects.requireNonNull(nodeState, "state is null");
            this.version = j;
        }

        public VersionedState toActive() {
            return new VersionedState(NodeState.ACTIVE, NodeStateManager.this.nextStateVersion());
        }

        public VersionedState toDraining() {
            return new VersionedState(NodeState.DRAINING, NodeStateManager.this.nextStateVersion());
        }

        public VersionedState toDrained() {
            return new VersionedState(NodeState.DRAINED, NodeStateManager.this.nextStateVersion());
        }

        public VersionedState toShuttingDown() {
            return new VersionedState(NodeState.SHUTTING_DOWN, NodeStateManager.this.nextStateVersion());
        }

        public NodeState state() {
            return this.state;
        }

        public long version() {
            return this.version;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            VersionedState versionedState = (VersionedState) obj;
            return this.version == versionedState.version && this.state == versionedState.state;
        }

        public int hashCode() {
            return Objects.hash(this.state, Long.valueOf(this.version));
        }

        public String toString() {
            return String.format("%s-%s", this.state.toString(), Long.valueOf(this.version));
        }
    }

    /* JADX WARN: Illegal instructions before constructor call */
    @com.google.inject.Inject
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public NodeStateManager(io.trino.execution.SqlTaskManager r9, io.trino.server.ServerConfig r10, io.trino.server.ShutdownAction r11, io.airlift.bootstrap.LifeCycleManager r12) {
        /*
            r8 = this;
            r0 = r8
            r1 = r9
            java.lang.String r2 = "sqlTaskManager is null"
            java.lang.Object r1 = java.util.Objects.requireNonNull(r1, r2)
            io.trino.execution.SqlTaskManager r1 = (io.trino.execution.SqlTaskManager) r1
            r2 = r1
            java.lang.Object r2 = java.util.Objects.requireNonNull(r2)
            void r1 = r1::addStateChangeListener
            r2 = r9
            java.lang.String r3 = "sqlTaskManager is null"
            java.lang.Object r2 = java.util.Objects.requireNonNull(r2, r3)
            io.trino.execution.SqlTaskManager r2 = (io.trino.execution.SqlTaskManager) r2
            r3 = r2
            java.lang.Object r3 = java.util.Objects.requireNonNull(r3)
            void r2 = r2::getAllTaskInfo
            r3 = r10
            r4 = r11
            r5 = r12
            java.lang.String r6 = "drain-handler-%s"
            java.util.concurrent.ThreadFactory r6 = io.airlift.concurrent.Threads.threadsNamed(r6)
            java.util.concurrent.ScheduledExecutorService r6 = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(r6)
            r0.<init>(r1, r2, r3, r4, r5, r6)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.trino.server.NodeStateManager.<init>(io.trino.execution.SqlTaskManager, io.trino.server.ServerConfig, io.trino.server.ShutdownAction, io.airlift.bootstrap.LifeCycleManager):void");
    }

    @VisibleForTesting
    public NodeStateManager(SqlTasksObservable sqlTasksObservable, Supplier<List<TaskInfo>> supplier, ServerConfig serverConfig, ShutdownAction shutdownAction, LifeCycleManager lifeCycleManager, ScheduledExecutorService scheduledExecutorService) {
        this.shutdownHandler = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed("shutdown-handler-%s"));
        this.lifeCycleStopper = Executors.newSingleThreadExecutor(Threads.threadsNamed("lifecycle-stopper-%s"));
        this.nodeState = new AtomicReference<>(new VersionedState(NodeState.ACTIVE, 0L));
        this.stateVersionProvider = new AtomicLong(0L);
        this.sqlTasksObservable = (SqlTasksObservable) Objects.requireNonNull(sqlTasksObservable, "sqlTasksObservable is null");
        this.taskInfoSupplier = (Supplier) Objects.requireNonNull(supplier, "taskInfoSupplier is null");
        this.shutdownAction = (ShutdownAction) Objects.requireNonNull(shutdownAction, "shutdownAction is null");
        this.lifeCycleManager = (LifeCycleManager) Objects.requireNonNull(lifeCycleManager, "lifeCycleManager is null");
        this.isCoordinator = serverConfig.isCoordinator();
        this.gracePeriod = serverConfig.getGracePeriod();
        this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "executor is null");
    }

    public NodeState getServerState() {
        return this.nodeState.get().state();
    }

    public synchronized void transitionState(NodeState nodeState) {
        VersionedState versionedState = this.nodeState.get();
        if (versionedState.state() == nodeState) {
            return;
        }
        switch (nodeState) {
            case ACTIVE:
                if (versionedState.state() == NodeState.DRAINING && this.nodeState.compareAndSet(versionedState, versionedState.toActive())) {
                    return;
                }
                if (versionedState.state() == NodeState.DRAINED && this.nodeState.compareAndSet(versionedState, versionedState.toActive())) {
                    return;
                }
                break;
            case SHUTTING_DOWN:
                if (versionedState.state() == NodeState.DRAINED && this.nodeState.compareAndSet(versionedState, versionedState.toShuttingDown())) {
                    requestTerminate();
                    return;
                } else {
                    requestGracefulShutdown();
                    this.nodeState.set(versionedState.toShuttingDown());
                    return;
                }
            case DRAINING:
                if (versionedState.state() == NodeState.ACTIVE && this.nodeState.compareAndSet(versionedState, versionedState.toDraining())) {
                    requestDrain();
                    return;
                }
                break;
            case DRAINED:
                throw new IllegalStateException(String.format("Invalid state transition from %s to %s, transition to DRAINED is internal only", versionedState, nodeState));
            case INACTIVE:
                throw new IllegalStateException(String.format("Invalid state transition from %s to %s, INACTIVE is not a valid internal state", versionedState, nodeState));
        }
        throw new IllegalStateException(String.format("Invalid state transition from %s to %s", versionedState, nodeState));
    }

    private long nextStateVersion() {
        return this.stateVersionProvider.incrementAndGet();
    }

    private synchronized void requestDrain() {
        log.debug("Drain requested, NodeState: %s", new Object[]{getServerState()});
        if (this.isCoordinator) {
            throw new UnsupportedOperationException("Cannot drain coordinator");
        }
        VersionedState versionedState = this.nodeState.get();
        this.executor.schedule(() -> {
            drain(versionedState);
        }, this.gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void requestTerminate() {
        log.info("Immediate Shutdown requested");
        if (this.isCoordinator) {
            throw new UnsupportedOperationException("Cannot shutdown coordinator");
        }
        this.shutdownHandler.schedule(this::terminate, 0L, TimeUnit.MILLISECONDS);
    }

    private void requestGracefulShutdown() {
        log.info("Shutdown requested");
        if (this.isCoordinator) {
            throw new UnsupportedOperationException("Cannot shutdown coordinator");
        }
        VersionedState versionedState = this.nodeState.get();
        this.shutdownHandler.schedule(() -> {
            shutdown(versionedState);
        }, this.gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void shutdown(VersionedState versionedState) {
        waitActiveTasksToFinish(versionedState);
        terminate();
    }

    private void terminate() {
        try {
            this.lifeCycleStopper.submit(() -> {
                this.lifeCycleManager.stop();
                return null;
            }).get(LIFECYCLE_STOP_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.warn(e, "Interrupted while waiting for the life cycle to stop");
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            log.warn(e2, "Problem stopping the life cycle");
        } catch (TimeoutException e3) {
            log.warn(e3, "Timed out waiting for the life cycle to stop");
        }
        this.shutdownAction.onShutdown();
    }

    private void drain(VersionedState versionedState) {
        if (this.nodeState.get() == versionedState) {
            waitActiveTasksToFinish(versionedState);
        }
        drainingComplete(versionedState);
    }

    private synchronized void drainingComplete(VersionedState versionedState) {
        if (this.nodeState.compareAndSet(versionedState, versionedState.toDrained())) {
            log.info("Worker State change: DRAINING -> DRAINED, server can be safely SHUT DOWN.");
        } else {
            log.info("Worker State change: %s, expected: %s, will not transition to DRAINED", new Object[]{this.nodeState.get(), versionedState});
        }
    }

    private void waitActiveTasksToFinish(VersionedState versionedState) {
        while (this.nodeState.get() == versionedState) {
            List<TaskInfo> activeTasks = getActiveTasks();
            log.info("Waiting for %s active tasks to finish", new Object[]{Integer.valueOf(activeTasks.size())});
            if (activeTasks.isEmpty()) {
                break;
            } else {
                waitTasksToFinish(activeTasks, versionedState);
            }
        }
        if (this.nodeState.get() == versionedState) {
            Uninterruptibles.sleepUninterruptibly(this.gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x005e, code lost:
    
        io.trino.server.NodeStateManager.log.info("Wait for tasks interrupted by state change, worker is no longer draining.");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void waitTasksToFinish(java.util.List<io.trino.execution.TaskInfo> r6, io.trino.server.NodeStateManager.VersionedState r7) {
        /*
            r5 = this;
            java.util.concurrent.CountDownLatch r0 = new java.util.concurrent.CountDownLatch
            r1 = r0
            r2 = r6
            int r2 = r2.size()
            r1.<init>(r2)
            r8 = r0
            r0 = r6
            java.util.Iterator r0 = r0.iterator()
            r9 = r0
        L16:
            r0 = r9
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L48
            r0 = r9
            java.lang.Object r0 = r0.next()
            io.trino.execution.TaskInfo r0 = (io.trino.execution.TaskInfo) r0
            r10 = r0
            r0 = r5
            io.trino.server.NodeStateManager$SqlTasksObservable r0 = r0.sqlTasksObservable
            r1 = r10
            io.trino.execution.TaskStatus r1 = r1.taskStatus()
            io.trino.execution.TaskId r1 = r1.getTaskId()
            r2 = r10
            r3 = r8
            void r2 = (v2) -> { // io.trino.execution.StateMachine.StateChangeListener.stateChanged(java.lang.Object):void
                lambda$waitTasksToFinish$3(r2, r3, v2);
            }
            r0.addStateChangeListener(r1, r2)
            goto L16
        L48:
            r0 = r8
            r1 = 1
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.InterruptedException -> L6d
            boolean r0 = r0.await(r1, r2)     // Catch: java.lang.InterruptedException -> L6d
            if (r0 != 0) goto L6a
            r0 = r5
            java.util.concurrent.atomic.AtomicReference<io.trino.server.NodeStateManager$VersionedState> r0 = r0.nodeState     // Catch: java.lang.InterruptedException -> L6d
            java.lang.Object r0 = r0.get()     // Catch: java.lang.InterruptedException -> L6d
            r1 = r7
            if (r0 == r1) goto L48
            io.airlift.log.Logger r0 = io.trino.server.NodeStateManager.log     // Catch: java.lang.InterruptedException -> L6d
            java.lang.String r1 = "Wait for tasks interrupted by state change, worker is no longer draining."
            r0.info(r1)     // Catch: java.lang.InterruptedException -> L6d
            goto L6a
        L6a:
            goto L7e
        L6d:
            r9 = move-exception
            io.airlift.log.Logger r0 = io.trino.server.NodeStateManager.log
            java.lang.String r1 = "Interrupted while waiting for all tasks to finish"
            r0.warn(r1)
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
        L7e:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.trino.server.NodeStateManager.waitTasksToFinish(java.util.List, io.trino.server.NodeStateManager$VersionedState):void");
    }

    private List<TaskInfo> getActiveTasks() {
        return (List) this.taskInfoSupplier.get().stream().filter(taskInfo -> {
            return !taskInfo.taskStatus().getState().isDone();
        }).collect(ImmutableList.toImmutableList());
    }
}
