package io.trino.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.concurrent.SetThreadName;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.trino.ExceededCpuLimitException;
import io.trino.ExceededScanLimitException;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.QueryExecution;
import io.trino.execution.StateMachine;
import io.trino.memory.ClusterMemoryManager;
import io.trino.server.BasicQueryInfo;
import io.trino.server.ResultQueryInfo;
import io.trino.server.protocol.Slug;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.sql.planner.Plan;
import io.trino.tracing.ScopedSpan;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

@ThreadSafe
/* loaded from: input_file:io/trino/execution/SqlQueryManager.class */
public class SqlQueryManager implements QueryManager {
    private static final Logger log = Logger.get(SqlQueryManager.class);
    private final ClusterMemoryManager memoryManager;
    private final Tracer tracer;
    private final QueryTracker<QueryExecution> queryTracker;
    private final Duration maxQueryCpuTime;
    private final Optional<DataSize> maxQueryScanPhysicalBytes;
    private final ExecutorService queryExecutor = Executors.newCachedThreadPool(Threads.threadsNamed("query-scheduler-%s"));
    private final ThreadPoolExecutorMBean queryExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) this.queryExecutor);
    private final ScheduledExecutorService queryManagementExecutor;
    private final ThreadPoolExecutorMBean queryManagementExecutorMBean;

    @Inject
    public SqlQueryManager(ClusterMemoryManager clusterMemoryManager, Tracer tracer, QueryManagerConfig queryManagerConfig) {
        this.memoryManager = (ClusterMemoryManager) Objects.requireNonNull(clusterMemoryManager, "memoryManager is null");
        this.tracer = (Tracer) Objects.requireNonNull(tracer, "tracer is null");
        this.maxQueryCpuTime = queryManagerConfig.getQueryMaxCpuTime();
        this.maxQueryScanPhysicalBytes = queryManagerConfig.getQueryMaxScanPhysicalBytes();
        this.queryManagementExecutor = Executors.newScheduledThreadPool(queryManagerConfig.getQueryManagerExecutorPoolSize(), Threads.threadsNamed("query-management-%s"));
        this.queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) this.queryManagementExecutor);
        this.queryTracker = new QueryTracker<>(queryManagerConfig, this.queryManagementExecutor);
    }

    @PostConstruct
    public void start() {
        this.queryTracker.start();
        this.queryManagementExecutor.scheduleWithFixedDelay(() -> {
            try {
                enforceMemoryLimits();
            } catch (Throwable th) {
                log.error(th, "Error enforcing memory limits");
            }
            try {
                enforceCpuLimits();
            } catch (Throwable th2) {
                log.error(th2, "Error enforcing query CPU time limits");
            }
            try {
                enforceScanLimits();
            } catch (Throwable th3) {
                log.error(th3, "Error enforcing query scan bytes limits");
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryTracker.stop();
        this.queryManagementExecutor.shutdownNow();
        this.queryExecutor.shutdownNow();
    }

    @Override // io.trino.execution.QueryManager
    public List<BasicQueryInfo> getQueries() {
        return (List) this.queryTracker.getAllQueries().stream().map(queryExecution -> {
            try {
                return queryExecution.getBasicQueryInfo();
            } catch (RuntimeException e) {
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(ImmutableList.toImmutableList());
    }

    @Override // io.trino.execution.QueryManager
    public void setOutputInfoListener(QueryId queryId, Consumer<QueryExecution.QueryOutputInfo> consumer) {
        Objects.requireNonNull(consumer, "listener is null");
        this.queryTracker.getQuery(queryId).setOutputInfoListener(consumer);
    }

    @Override // io.trino.execution.QueryManager
    public void outputTaskFailed(TaskId taskId, Throwable th) {
        this.queryTracker.getQuery(taskId.getQueryId()).outputTaskFailed(taskId, th);
    }

    @Override // io.trino.execution.QueryManager
    public void resultsConsumed(QueryId queryId) {
        this.queryTracker.getQuery(queryId).resultsConsumed();
    }

    @Override // io.trino.execution.QueryManager
    public void addStateChangeListener(QueryId queryId, StateMachine.StateChangeListener<QueryState> stateChangeListener) {
        Objects.requireNonNull(stateChangeListener, "listener is null");
        this.queryTracker.getQuery(queryId).addStateChangeListener(stateChangeListener);
    }

    @Override // io.trino.execution.QueryManager
    public ListenableFuture<QueryState> getStateChange(QueryId queryId, QueryState queryState) {
        return (ListenableFuture) this.queryTracker.tryGetQuery(queryId).map(queryExecution -> {
            return queryExecution.getStateChange(queryState);
        }).orElseGet(() -> {
            return Futures.immediateFailedFuture(new NoSuchElementException());
        });
    }

    @Override // io.trino.execution.QueryManager
    public BasicQueryInfo getQueryInfo(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getBasicQueryInfo();
    }

    @Override // io.trino.execution.QueryManager
    public QueryInfo getFullQueryInfo(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getQueryInfo();
    }

    @Override // io.trino.execution.QueryManager
    public ResultQueryInfo getResultQueryInfo(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getResultQueryInfo();
    }

    @Override // io.trino.execution.QueryManager
    public Session getQuerySession(QueryId queryId) throws NoSuchElementException {
        return this.queryTracker.getQuery(queryId).getSession();
    }

    @Override // io.trino.execution.QueryManager
    public Slug getQuerySlug(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getSlug();
    }

    public Optional<Plan> getQueryPlan(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getQueryPlan();
    }

    public void addFinalQueryInfoListener(QueryId queryId, StateMachine.StateChangeListener<QueryInfo> stateChangeListener) {
        this.queryTracker.getQuery(queryId).addFinalQueryInfoListener(stateChangeListener);
    }

    @Override // io.trino.execution.QueryManager
    public QueryState getQueryState(QueryId queryId) {
        return this.queryTracker.getQuery(queryId).getState();
    }

    @Override // io.trino.execution.QueryManager
    public boolean hasQuery(QueryId queryId) {
        return this.queryTracker.hasQuery(queryId);
    }

    @Override // io.trino.execution.QueryManager
    public void recordHeartbeat(QueryId queryId) {
        this.queryTracker.tryGetQuery(queryId).ifPresent((v0) -> {
            v0.recordHeartbeat();
        });
    }

    @Override // io.trino.execution.QueryManager
    public void createQuery(QueryExecution queryExecution) {
        Objects.requireNonNull(queryExecution, "queryExecution is null");
        if (!this.queryTracker.addQuery(queryExecution)) {
            throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Query %s already registered", queryExecution.getQueryId()));
        }
        queryExecution.addFinalQueryInfoListener(queryInfo -> {
            this.queryTracker.expireQuery(queryExecution.getQueryId());
        });
        SetThreadName setThreadName = new SetThreadName("Query-" + String.valueOf(queryExecution.getQueryId()));
        try {
            ScopedSpan scopedSpan = ScopedSpan.scopedSpan(this.tracer.spanBuilder("query-start").setParent(Context.current().with(queryExecution.getSession().getQuerySpan())).startSpan());
            try {
                queryExecution.start();
                if (scopedSpan != null) {
                    scopedSpan.close();
                }
                setThreadName.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                setThreadName.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.trino.execution.QueryManager
    public void failQuery(QueryId queryId, Throwable th) {
        Objects.requireNonNull(th, "cause is null");
        this.queryTracker.tryGetQuery(queryId).ifPresent(queryExecution -> {
            queryExecution.fail(th);
        });
    }

    @Override // io.trino.execution.QueryManager
    public void cancelQuery(QueryId queryId) {
        log.debug("Cancel query %s", new Object[]{queryId});
        this.queryTracker.tryGetQuery(queryId).ifPresent((v0) -> {
            v0.cancelQuery();
        });
    }

    @Override // io.trino.execution.QueryManager
    public void cancelStage(StageId stageId) {
        Objects.requireNonNull(stageId, "stageId is null");
        log.debug("Cancel stage %s", new Object[]{stageId});
        this.queryTracker.tryGetQuery(stageId.getQueryId()).ifPresent(queryExecution -> {
            queryExecution.cancelStage(stageId);
        });
    }

    @Managed(description = "Query scheduler executor")
    @Nested
    public ThreadPoolExecutorMBean getExecutor() {
        return this.queryExecutorMBean;
    }

    @Managed(description = "Query query management executor")
    @Nested
    public ThreadPoolExecutorMBean getManagementExecutor() {
        return this.queryManagementExecutorMBean;
    }

    @Managed
    @Nested
    public QueryTracker<QueryExecution> getQueryTracker() {
        return this.queryTracker;
    }

    private void enforceMemoryLimits() {
        this.memoryManager.process((List) this.queryTracker.getAllQueries().stream().filter(queryExecution -> {
            return queryExecution.getState() == QueryState.RUNNING;
        }).collect(ImmutableList.toImmutableList()), this::getQueries);
    }

    private void enforceCpuLimits() {
        for (QueryExecution queryExecution : this.queryTracker.getAllQueries()) {
            Duration totalCpuTime = queryExecution.getTotalCpuTime();
            Duration duration = (Duration) Ordering.natural().min(this.maxQueryCpuTime, SystemSessionProperties.getQueryMaxCpuTime(queryExecution.getSession()));
            if (totalCpuTime.compareTo(duration) > 0) {
                queryExecution.fail(new ExceededCpuLimitException(duration));
            }
        }
    }

    private void enforceScanLimits() {
        for (QueryExecution queryExecution : this.queryTracker.getAllQueries()) {
            Optional<DataSize> queryMaxScanPhysicalBytes = SystemSessionProperties.getQueryMaxScanPhysicalBytes(queryExecution.getSession());
            if (this.maxQueryScanPhysicalBytes.isPresent()) {
                queryMaxScanPhysicalBytes = queryMaxScanPhysicalBytes.flatMap(dataSize -> {
                    return this.maxQueryScanPhysicalBytes.map(dataSize -> {
                        return (DataSize) Ordering.natural().min(dataSize, dataSize);
                    });
                }).or(() -> {
                    return this.maxQueryScanPhysicalBytes;
                });
            }
            queryMaxScanPhysicalBytes.ifPresent(dataSize2 -> {
                if (queryExecution.getBasicQueryInfo().getQueryStats().getPhysicalInputDataSize().compareTo(dataSize2) > 0) {
                    queryExecution.fail(new ExceededScanLimitException(dataSize2));
                }
            });
        }
    }
}
