package io.trino.execution;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.slice.Slice;
import io.airlift.stats.CounterStat;
import io.airlift.stats.TestingGcMonitor;
import io.airlift.tracing.Tracing;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.SessionTestUtils;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.timesharing.TimeSharingTaskExecutor;
import io.trino.memory.MemoryPool;
import io.trino.memory.QueryContext;
import io.trino.operator.TaskContext;
import io.trino.spi.QueryId;
import io.trino.spi.predicate.Domain;
import io.trino.spi.type.BigintType;
import io.trino.spiller.SpillSpaceTracker;
import io.trino.testing.TestingSession;
import io.trino.testing.assertions.Assert;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(ExecutionMode.CONCURRENT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/execution/TestSqlTask.class */
public class TestSqlTask {
    public static final PipelinedOutputBuffers.OutputBufferId OUT = new PipelinedOutputBuffers.OutputBufferId(0);
    private TaskExecutor taskExecutor;
    private ScheduledExecutorService taskNotificationExecutor;
    private ScheduledExecutorService driverYieldExecutor;
    private ScheduledExecutorService driverTimeoutExecutor;
    private SqlTaskExecutionFactory sqlTaskExecutionFactory;
    private final AtomicInteger nextTaskId = new AtomicInteger();

    @BeforeAll
    public void setUp() {
        this.taskExecutor = new TimeSharingTaskExecutor(8, 16, 3, 4, Ticker.systemTicker());
        this.taskExecutor.start();
        this.taskNotificationExecutor = Executors.newScheduledThreadPool(10, Threads.threadsNamed("task-notification-%s"));
        this.driverYieldExecutor = Executors.newScheduledThreadPool(2, Threads.threadsNamed("driver-yield-%s"));
        this.driverTimeoutExecutor = Executors.newScheduledThreadPool(2, Threads.threadsNamed("driver-timeout-%s"));
        this.sqlTaskExecutionFactory = new SqlTaskExecutionFactory(this.taskNotificationExecutor, this.taskExecutor, TaskTestUtils.createTestingPlanner(), TaskTestUtils.createTestSplitMonitor(), Tracing.noopTracer(), new TaskManagerConfig());
    }

    @AfterAll
    public void destroy() {
        this.taskExecutor.stop();
        this.taskExecutor = null;
        this.taskNotificationExecutor.shutdownNow();
        this.driverYieldExecutor.shutdown();
        this.driverTimeoutExecutor.shutdown();
        this.sqlTaskExecutionFactory = null;
    }

    @Timeout(30)
    @Test
    public void testEmptyQuery() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        TaskInfo updateTask = createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT), ImmutableList.of(), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withNoMoreBufferIds(), ImmutableMap.of(), false);
        Assertions.assertThat(updateTask.taskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(updateTask.taskStatus().getVersion()).isEqualTo(0L);
        TaskInfo taskInfo = createInitialTask.getTaskInfo();
        Assertions.assertThat(taskInfo.taskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(taskInfo.taskStatus().getVersion()).isEqualTo(0L);
        Assertions.assertThat(createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT), ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(), true)), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withNoMoreBufferIds(), ImmutableMap.of(), false).taskStatus().getState()).isEqualTo(TaskState.FINISHED);
        Assertions.assertThat(((TaskInfo) createInitialTask.getTaskInfo(0L).get()).taskStatus().getState()).isEqualTo(TaskState.FINISHED);
    }

    @Timeout(30)
    @Test
    public void testSimpleQuery() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        Assertions.assertThat(createInitialTask.getTaskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(createInitialTask.getTaskStatus().getVersion()).isEqualTo(0L);
        createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT), ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(TaskTestUtils.SPLIT), true)), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(), ImmutableMap.of(), false);
        TaskInfo taskInfo = (TaskInfo) createInitialTask.getTaskInfo(0L).get();
        Assertions.assertThat(taskInfo.taskStatus().getState()).isEqualTo(TaskState.FLUSHING);
        Assertions.assertThat(taskInfo.taskStatus().getVersion()).isEqualTo(1L);
        Assertions.assertThat(createInitialTask.getTaskInfo(0L).isDone()).isTrue();
        BufferResult bufferResult = (BufferResult) createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE)).get();
        Assertions.assertThat(bufferResult.isBufferComplete()).isFalse();
        Assertions.assertThat(bufferResult.getSerializedPages()).hasSize(1);
        Assertions.assertThat(PagesSerdeUtil.getSerializedPagePositionCount((Slice) bufferResult.getSerializedPages().get(0))).isEqualTo(1);
        boolean z = true;
        while (z) {
            bufferResult = (BufferResult) createInitialTask.getTaskResults(OUT, bufferResult.getToken() + bufferResult.getSerializedPages().size(), DataSize.of(1L, DataSize.Unit.MEGABYTE)).get();
            z = !bufferResult.isBufferComplete();
        }
        Assertions.assertThat(bufferResult.getSerializedPages()).isEmpty();
        TaskInfo destroyTaskResults = createInitialTask.destroyTaskResults(OUT);
        Assertions.assertThat(destroyTaskResults.outputBuffers().getState()).isEqualTo(BufferState.FINISHED);
        Assertions.assertThat(((TaskInfo) createInitialTask.getTaskInfo(destroyTaskResults.taskStatus().getVersion()).get()).taskStatus().getState()).isEqualTo(TaskState.FINISHED);
        Assertions.assertThat(createInitialTask.getTaskInfo(100L).isDone()).isTrue();
        Assertions.assertThat(createInitialTask.getTaskInfo().taskStatus().getState()).isEqualTo(TaskState.FINISHED);
    }

    @Test
    public void testCancel() {
        SqlTask createInitialTask = createInitialTask();
        TaskInfo updateTask = createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT), ImmutableList.of(), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(), ImmutableMap.of(), false);
        Assertions.assertThat(updateTask.taskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(updateTask.stats().getEndTime()).isNull();
        TaskInfo taskInfo = createInitialTask.getTaskInfo();
        Assertions.assertThat(taskInfo.taskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(taskInfo.stats().getEndTime()).isNull();
        TaskInfo cancel = createInitialTask.cancel();
        Assertions.assertThat(cancel.taskStatus().getState().isTerminatingOrDone()).isTrue();
        int i = 1;
        while (!cancel.taskStatus().getState().isDone() && i < 3) {
            cancel = (TaskInfo) Futures.getUnchecked(createInitialTask.getTaskInfo(cancel.taskStatus().getVersion()));
            i++;
        }
        ((AbstractComparableAssert) Assertions.assertThat(cancel.taskStatus().getState()).describedAs("Failed to see CANCELED after " + i + " attempts", new Object[0])).isEqualTo(TaskState.CANCELED);
        Assertions.assertThat(cancel.stats().getEndTime()).isNotNull();
        TaskInfo taskInfo2 = createInitialTask.getTaskInfo();
        Assertions.assertThat(taskInfo2.taskStatus().getState()).isEqualTo(TaskState.CANCELED);
        Assertions.assertThat(taskInfo2.stats().getEndTime()).isNotNull();
    }

    @Timeout(30)
    @Test
    public void testAbort() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        Assertions.assertThat(createInitialTask.getTaskStatus().getState()).isEqualTo(TaskState.RUNNING);
        Assertions.assertThat(createInitialTask.getTaskStatus().getVersion()).isEqualTo(0L);
        createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT), ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(TaskTestUtils.SPLIT), true)), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(), ImmutableMap.of(), false);
        TaskInfo taskInfo = (TaskInfo) createInitialTask.getTaskInfo(0L).get();
        Assertions.assertThat(taskInfo.taskStatus().getState()).isEqualTo(TaskState.FLUSHING);
        Assertions.assertThat(taskInfo.taskStatus().getVersion()).isEqualTo(1L);
        createInitialTask.destroyTaskResults(OUT);
        Assertions.assertThat(((TaskInfo) createInitialTask.getTaskInfo(taskInfo.taskStatus().getVersion()).get()).taskStatus().getState()).isEqualTo(TaskState.FINISHED);
        Assertions.assertThat(createInitialTask.getTaskInfo().taskStatus().getState()).isEqualTo(TaskState.FINISHED);
    }

    @Test
    public void testBufferCloseOnFinish() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        PipelinedOutputBuffers withNoMoreBufferIds = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds();
        TaskTestUtils.updateTask(createInitialTask, TaskTestUtils.EMPTY_SPLIT_ASSIGNMENTS, withNoMoreBufferIds);
        ListenableFuture taskResults = createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE));
        Assertions.assertThat(taskResults.isDone()).isFalse();
        TaskTestUtils.updateTask(createInitialTask, ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(), true)), withNoMoreBufferIds);
        createInitialTask.destroyTaskResults(OUT);
        taskResults.get(1L, TimeUnit.SECONDS);
        ListenableFuture taskResults2 = createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE));
        Assertions.assertThat(taskResults2.isDone()).isTrue();
        Assertions.assertThat(((BufferResult) taskResults2.get()).isBufferComplete()).isTrue();
    }

    @Test
    public void testBufferCloseOnCancel() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        TaskTestUtils.updateTask(createInitialTask, TaskTestUtils.EMPTY_SPLIT_ASSIGNMENTS, PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());
        ListenableFuture taskResults = createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE));
        Assertions.assertThat(taskResults.isDone()).isFalse();
        createInitialTask.cancel();
        Assertions.assertThat(createInitialTask.getTaskInfo().taskStatus().getState().isTerminatingOrDone()).isTrue();
        taskResults.get(1L, TimeUnit.SECONDS);
        ListenableFuture taskResults2 = createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE));
        Assertions.assertThat(taskResults2.isDone()).isTrue();
        Assertions.assertThat(((BufferResult) taskResults2.get()).isBufferComplete()).isTrue();
    }

    @Timeout(30)
    @Test
    public void testBufferNotCloseOnFail() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        TaskTestUtils.updateTask(createInitialTask, TaskTestUtils.EMPTY_SPLIT_ASSIGNMENTS, PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());
        ListenableFuture taskResults = createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE));
        Assertions.assertThat(taskResults.isDone()).isFalse();
        long version = createInitialTask.getTaskInfo().taskStatus().getVersion();
        createInitialTask.failed(new Exception("test"));
        TaskInfo taskInfo = (TaskInfo) createInitialTask.getTaskInfo(version).get();
        Assertions.assertThat(taskInfo.taskStatus().getState().isTerminatingOrDone()).isTrue();
        Assertions.assertThat(((TaskInfo) createInitialTask.getTaskInfo(taskInfo.taskStatus().getVersion()).get()).taskStatus().getState()).isEqualTo(TaskState.FAILED);
        Assertions.assertThatThrownBy(() -> {
            taskResults.get(1L, TimeUnit.SECONDS);
        }).isInstanceOf(TimeoutException.class).hasMessageContaining("Waited 1 seconds");
        Assertions.assertThat(createInitialTask.getTaskResults(OUT, 0L, DataSize.of(1L, DataSize.Unit.MEGABYTE)).isDone()).isFalse();
    }

    @Timeout(30)
    @Test
    public void testDynamicFilters() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT_WITH_DYNAMIC_FILTER_SOURCE), ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(TaskTestUtils.SPLIT), false)), PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(), ImmutableMap.of(), false);
        Assertions.assertThat(createInitialTask.getTaskStatus().getDynamicFiltersVersion()).isEqualTo(0L);
        TaskContext taskContextByTaskId = createInitialTask.getQueryContext().getTaskContextByTaskId(createInitialTask.getTaskId());
        ListenableFuture taskStatus = createInitialTask.getTaskStatus(0L);
        Assertions.assertThat(taskStatus.isDone()).isFalse();
        taskContextByTaskId.updateDomains(ImmutableMap.of(TaskTestUtils.DYNAMIC_FILTER_SOURCE_ID, Domain.none(BigintType.BIGINT)));
        Assertions.assertThat(createInitialTask.getTaskStatus().getVersion()).isEqualTo(1L);
        Assertions.assertThat(createInitialTask.getTaskStatus().getDynamicFiltersVersion()).isEqualTo(1L);
        taskStatus.get();
    }

    @Timeout(30)
    @Test
    public void testDynamicFilterFetchAfterTaskDone() throws Exception {
        SqlTask createInitialTask = createInitialTask();
        PipelinedOutputBuffers withNoMoreBufferIds = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds();
        createInitialTask.updateTask(SessionTestUtils.TEST_SESSION, Span.getInvalid(), Optional.of(TaskTestUtils.PLAN_FRAGMENT_WITH_DYNAMIC_FILTER_SOURCE), ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(), false)), withNoMoreBufferIds, ImmutableMap.of(), false);
        Assertions.assertThat(createInitialTask.getTaskStatus().getDynamicFiltersVersion()).isEqualTo(0L);
        TaskTestUtils.updateTask(createInitialTask, ImmutableList.of(new SplitAssignment(TaskTestUtils.TABLE_SCAN_NODE_ID, ImmutableSet.of(), true)), withNoMoreBufferIds);
        TaskInfo destroyTaskResults = createInitialTask.destroyTaskResults(OUT);
        Assertions.assertThat(destroyTaskResults.outputBuffers().getState()).isEqualTo(BufferState.FINISHED);
        Assert.assertEventually(new Duration(10.0d, TimeUnit.SECONDS), () -> {
            TaskStatus taskStatus = (TaskStatus) createInitialTask.getTaskStatus(destroyTaskResults.taskStatus().getVersion()).get();
            Assertions.assertThat(taskStatus.getState()).isEqualTo(TaskState.FINISHED);
            Assertions.assertThat(taskStatus.getDynamicFiltersVersion()).isEqualTo(1L);
        });
        DynamicFiltersCollector.VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains = createInitialTask.acknowledgeAndGetNewDynamicFilterDomains(0L);
        Assertions.assertThat(acknowledgeAndGetNewDynamicFilterDomains.getVersion()).isEqualTo(1L);
        Assertions.assertThat(acknowledgeAndGetNewDynamicFilterDomains.getDynamicFilterDomains()).isEqualTo(ImmutableMap.of(TaskTestUtils.DYNAMIC_FILTER_SOURCE_ID, Domain.none(BigintType.BIGINT)));
    }

    private SqlTask createInitialTask() {
        TaskId taskId = new TaskId(new StageId("query", 0), this.nextTaskId.incrementAndGet(), 0);
        URI create = URI.create("fake://task/" + String.valueOf(taskId));
        QueryContext queryContext = new QueryContext(new QueryId("query"), DataSize.of(1L, DataSize.Unit.MEGABYTE), new MemoryPool(DataSize.of(1L, DataSize.Unit.GIGABYTE)), new TestingGcMonitor(), this.taskNotificationExecutor, this.driverYieldExecutor, this.driverTimeoutExecutor, DataSize.of(1L, DataSize.Unit.MEGABYTE), new SpillSpaceTracker(DataSize.of(1L, DataSize.Unit.GIGABYTE)));
        queryContext.addTaskContext(new TaskStateMachine(taskId, this.taskNotificationExecutor), TestingSession.testSessionBuilder().build(), () -> {
        }, false, false);
        return SqlTask.createSqlTask(taskId, create, "fake", queryContext, Tracing.noopTracer(), this.sqlTaskExecutionFactory, this.taskNotificationExecutor, sqlTask -> {
        }, DataSize.of(32L, DataSize.Unit.MEGABYTE), DataSize.of(200L, DataSize.Unit.MEGABYTE), new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), new CounterStat());
    }
}
