package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.trino.annotation.NotThreadSafe;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.metadata.InternalNode;
import io.trino.server.DynamicFilterService;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;

@NotThreadSafe
/* loaded from: input_file:io/trino/execution/scheduler/MultiSourcePartitionedScheduler.class */
public class MultiSourcePartitionedScheduler implements StageScheduler {
    private static final Logger log = Logger.get(MultiSourcePartitionedScheduler.class);
    private final StageExecution stageExecution;
    private final Queue<SourceScheduler> sourceSchedulers;
    private final DynamicFilterService dynamicFilterService;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final Map<InternalNode, RemoteTask> scheduledTasks = new HashMap();
    private final PartitionIdAllocator partitionIdAllocator = new PartitionIdAllocator();

    public MultiSourcePartitionedScheduler(StageExecution stageExecution, Map<PlanNodeId, SplitSource> map, SplitPlacementPolicy splitPlacementPolicy, int i, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier booleanSupplier) {
        Objects.requireNonNull(map, "splitSources is null");
        Preconditions.checkArgument(map.size() > 1, "It is expected that there will be more than one split sources");
        ImmutableList.Builder builder = ImmutableList.builder();
        for (PlanNodeId planNodeId : map.keySet()) {
            builder.add(SourcePartitionedScheduler.newSourcePartitionedSchedulerAsSourceScheduler(stageExecution, planNodeId, map.get(planNodeId), splitPlacementPolicy, i, dynamicFilterService, tableExecuteContextManager, booleanSupplier, this.partitionIdAllocator, this.scheduledTasks));
        }
        this.stageExecution = (StageExecution) Objects.requireNonNull(stageExecution, "stageExecution is null");
        this.sourceSchedulers = new ArrayDeque((Collection) builder.build());
        this.dynamicFilterService = (DynamicFilterService) Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.splitPlacementPolicy = (SplitPlacementPolicy) Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
    }

    @Override // io.trino.execution.scheduler.StageScheduler
    public synchronized void start() {
        if (this.dynamicFilterService.isCollectingTaskNeeded(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getFragment())) {
            this.stageExecution.beginScheduling();
            scheduleTaskOnRandomNode();
        }
    }

    @Override // io.trino.execution.scheduler.StageScheduler
    public synchronized ScheduleResult schedule() {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        ListenableFuture<Void> immediateVoidFuture = Futures.immediateVoidFuture();
        Optional<ScheduleResult.BlockedReason> empty = Optional.empty();
        int i = 0;
        while (!this.sourceSchedulers.isEmpty()) {
            SourceScheduler peek = this.sourceSchedulers.peek();
            ScheduleResult schedule = peek.schedule();
            i += schedule.getSplitsScheduled();
            builder.addAll(schedule.getNewTasks());
            immediateVoidFuture = schedule.getBlocked();
            empty = schedule.getBlockedReason();
            if (!immediateVoidFuture.isDone() || !schedule.isFinished()) {
                break;
            }
            this.stageExecution.schedulingComplete(peek.getPlanNodeId());
            this.sourceSchedulers.remove().close();
        }
        return empty.isPresent() ? new ScheduleResult(this.sourceSchedulers.isEmpty(), (Iterable<? extends RemoteTask>) builder.build(), immediateVoidFuture, empty.get(), i) : new ScheduleResult(this.sourceSchedulers.isEmpty(), builder.build(), i);
    }

    @Override // io.trino.execution.scheduler.StageScheduler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Iterator<SourceScheduler> it = this.sourceSchedulers.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th) {
                log.warn(th, "Error closing split source");
            }
        }
        this.sourceSchedulers.clear();
    }

    private void scheduleTaskOnRandomNode() {
        Preconditions.checkState(this.scheduledTasks.isEmpty(), "Stage task is already scheduled on node");
        List<InternalNode> allNodes = this.splitPlacementPolicy.allNodes();
        Preconditions.checkState(allNodes.size() > 0, "No nodes available");
        InternalNode internalNode = allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size()));
        this.stageExecution.scheduleTask(internalNode, this.partitionIdAllocator.getNextId(), ImmutableMultimap.of()).ifPresent(remoteTask -> {
            this.scheduledTasks.put(internalNode, remoteTask);
        });
    }
}
