package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.http.client.HttpClient;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.stats.TDigest;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.FeaturesConfig;
import io.trino.execution.TaskFailureListener;
import io.trino.execution.TaskId;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.HttpPageBufferClient;
import io.trino.operator.WorkProcessor;
import io.trino.plugin.base.metrics.TDigestHistogram;
import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@ThreadSafe
/* loaded from: input_file:io/trino/operator/DirectExchangeClient.class */
public class DirectExchangeClient implements Closeable {
    private static final Logger log;
    private final String selfAddress;
    private final FeaturesConfig.DataIntegrityVerification dataIntegrityVerification;
    private final DataSize maxResponseSize;
    private final int concurrentRequestMultiplier;
    private final Duration maxErrorDuration;
    private final boolean acknowledgePages;
    private final HttpClient httpClient;
    private final ScheduledExecutorService scheduledExecutor;

    @GuardedBy("this")
    private boolean noMoreLocations;
    private final DirectExchangeBuffer buffer;

    @GuardedBy("this")
    private long successfulRequests;

    @GuardedBy("this")
    private long averageBytesPerRequest;

    @GuardedBy("this")
    private boolean closed;

    @Nullable
    @GuardedBy("memoryContextLock")
    private LocalMemoryContext memoryContext;
    private final Executor pageBufferClientCallbackExecutor;
    private final TaskFailureListener taskFailureListener;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<URI, HttpPageBufferClient> allClients = new ConcurrentHashMap();

    @GuardedBy("this")
    private final Set<HttpPageBufferClient> queuedClients = new LinkedHashSet();

    @GuardedBy("this")
    private final Set<HttpPageBufferClient> runningClients = new LinkedHashSet();
    private final Set<HttpPageBufferClient> completedClients = Sets.newConcurrentHashSet();

    @GuardedBy("this")
    private final TDigest requestDuration = new TDigest();
    private final ReadWriteLock memoryContextLock = new ReentrantReadWriteLock();
    private final Lock memoryContextReadLock = this.memoryContextLock.readLock();
    private final Lock memoryContextWriteLock = this.memoryContextLock.writeLock();

    /* loaded from: input_file:io/trino/operator/DirectExchangeClient$ExchangeClientCallback.class */
    private class ExchangeClientCallback implements HttpPageBufferClient.ClientCallback {
        private ExchangeClientCallback() {
        }

        @Override // io.trino.operator.HttpPageBufferClient.ClientCallback
        public boolean addPages(HttpPageBufferClient httpPageBufferClient, List<Slice> list) {
            Objects.requireNonNull(httpPageBufferClient, "client is null");
            Objects.requireNonNull(list, "pages is null");
            return DirectExchangeClient.this.addPages(httpPageBufferClient, list);
        }

        @Override // io.trino.operator.HttpPageBufferClient.ClientCallback
        public void requestComplete(HttpPageBufferClient httpPageBufferClient) {
            Objects.requireNonNull(httpPageBufferClient, "client is null");
            DirectExchangeClient.this.requestComplete(httpPageBufferClient);
        }

        @Override // io.trino.operator.HttpPageBufferClient.ClientCallback
        public void clientFinished(HttpPageBufferClient httpPageBufferClient) {
            DirectExchangeClient.this.clientFinished(httpPageBufferClient);
        }

        @Override // io.trino.operator.HttpPageBufferClient.ClientCallback
        public void clientFailed(HttpPageBufferClient httpPageBufferClient, Throwable th) {
            Objects.requireNonNull(httpPageBufferClient, "client is null");
            Objects.requireNonNull(th, "cause is null");
            DirectExchangeClient.this.clientFailed(httpPageBufferClient, th);
        }
    }

    public DirectExchangeClient(String str, FeaturesConfig.DataIntegrityVerification dataIntegrityVerification, DirectExchangeBuffer directExchangeBuffer, DataSize dataSize, int i, Duration duration, boolean z, HttpClient httpClient, ScheduledExecutorService scheduledExecutorService, LocalMemoryContext localMemoryContext, Executor executor, TaskFailureListener taskFailureListener) {
        this.selfAddress = (String) Objects.requireNonNull(str, "selfAddress is null");
        this.dataIntegrityVerification = (FeaturesConfig.DataIntegrityVerification) Objects.requireNonNull(dataIntegrityVerification, "dataIntegrityVerification is null");
        this.buffer = (DirectExchangeBuffer) Objects.requireNonNull(directExchangeBuffer, "buffer is null");
        this.maxResponseSize = dataSize;
        this.concurrentRequestMultiplier = i;
        this.maxErrorDuration = duration;
        this.acknowledgePages = z;
        this.httpClient = httpClient;
        this.scheduledExecutor = scheduledExecutorService;
        this.memoryContext = localMemoryContext;
        this.pageBufferClientCallbackExecutor = (Executor) Objects.requireNonNull(executor, "pageBufferClientCallbackExecutor is null");
        this.taskFailureListener = (TaskFailureListener) Objects.requireNonNull(taskFailureListener, "taskFailureListener is null");
    }

    public DirectExchangeClientStatus getStatus() {
        DirectExchangeClientStatus directExchangeClientStatus;
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<HttpPageBufferClient> it = this.allClients.values().iterator();
        while (it.hasNext()) {
            builder.add(it.next().getStatus());
        }
        ImmutableList build = builder.build();
        synchronized (this) {
            directExchangeClientStatus = new DirectExchangeClientStatus(this.buffer.getRetainedSizeInBytes(), this.buffer.getMaxRetainedSizeInBytes(), this.averageBytesPerRequest, this.successfulRequests, this.buffer.getBufferedPageCount(), this.buffer.getSpilledPageCount(), this.buffer.getSpilledBytes(), this.noMoreLocations, build, new TDigestHistogram(TDigest.copyOf(this.requestDuration)));
        }
        return directExchangeClientStatus;
    }

    public synchronized void addLocation(TaskId taskId, URI uri) {
        Objects.requireNonNull(uri, "location is null");
        if (this.closed) {
            return;
        }
        Preconditions.checkArgument(!this.allClients.containsKey(uri), "location already exist: %s", uri);
        Preconditions.checkState(!this.noMoreLocations, "No more locations already set");
        this.buffer.addTask(taskId);
        HttpPageBufferClient httpPageBufferClient = new HttpPageBufferClient(this.selfAddress, this.httpClient, this.dataIntegrityVerification, this.maxResponseSize, this.maxErrorDuration, this.acknowledgePages, taskId, uri, new ExchangeClientCallback(), this.scheduledExecutor, this.pageBufferClientCallbackExecutor);
        this.allClients.put(uri, httpPageBufferClient);
        this.queuedClients.add(httpPageBufferClient);
        scheduleRequestIfNecessary();
    }

    public synchronized void noMoreLocations() {
        this.noMoreLocations = true;
        this.buffer.noMoreTasks();
        scheduleRequestIfNecessary();
    }

    public WorkProcessor<Slice> pages() {
        return WorkProcessor.create(() -> {
            Slice pollPage = pollPage();
            if (pollPage != null) {
                return WorkProcessor.ProcessState.ofResult(pollPage);
            }
            if (isFinished()) {
                return WorkProcessor.ProcessState.finished();
            }
            ListenableFuture<Void> isBlocked = isBlocked();
            return !isBlocked.isDone() ? WorkProcessor.ProcessState.blocked(isBlocked) : WorkProcessor.ProcessState.yielded();
        });
    }

    private void assertNotHoldsLock() {
        if (!$assertionsDisabled && Thread.holdsLock(this)) {
            throw new AssertionError("Cannot get next page while holding a lock on this");
        }
    }

    @Nullable
    public Slice pollPage() {
        assertNotHoldsLock();
        Slice pollPage = this.buffer.pollPage();
        if (pollPage == null) {
            return null;
        }
        updateRetainedMemory();
        scheduleRequestIfNecessary();
        return pollPage;
    }

    public boolean isFinished() {
        return this.buffer.isFinished() && this.completedClients.size() == this.allClients.size();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        Iterator<HttpPageBufferClient> it = this.allClients.values().iterator();
        while (it.hasNext()) {
            closeQuietly(it.next());
        }
        try {
            this.buffer.close();
        } catch (RuntimeException e) {
            log.warn(e, "error closing buffer");
        } finally {
            releaseMemoryContext();
        }
    }

    @VisibleForTesting
    synchronized int scheduleRequestIfNecessary() {
        if ((this.buffer.isFinished() || this.buffer.isFailed()) && this.completedClients.size() == this.allClients.size()) {
            return 0;
        }
        long remainingCapacityInBytes = this.buffer.getRemainingCapacityInBytes();
        if (remainingCapacityInBytes <= 0) {
            return 0;
        }
        long sum = this.runningClients.stream().mapToLong((v0) -> {
            return v0.getAverageRequestSizeInBytes();
        }).sum();
        long j = 0;
        int i = 0;
        Iterator<HttpPageBufferClient> it = this.queuedClients.iterator();
        while (it.hasNext()) {
            HttpPageBufferClient next = it.next();
            if (j >= (remainingCapacityInBytes * this.concurrentRequestMultiplier) - sum) {
                break;
            }
            j += next.getAverageRequestSizeInBytes();
            next.scheduleRequest();
            it.remove();
            this.runningClients.add(next);
            i++;
        }
        return i;
    }

    public ListenableFuture<Void> isBlocked() {
        return this.buffer.isBlocked();
    }

    @VisibleForTesting
    Set<HttpPageBufferClient> getQueuedClients() {
        return this.queuedClients;
    }

    @VisibleForTesting
    Set<HttpPageBufferClient> getRunningClients() {
        return this.runningClients;
    }

    @VisibleForTesting
    Map<URI, HttpPageBufferClient> getAllClients() {
        return this.allClients;
    }

    private boolean addPages(HttpPageBufferClient httpPageBufferClient, List<Slice> list) {
        if (this.completedClients.contains(httpPageBufferClient)) {
            return false;
        }
        long j = 0;
        if (!list.isEmpty()) {
            while (list.iterator().hasNext()) {
                j += r0.next().length();
            }
            this.buffer.addPages(httpPageBufferClient.getRemoteTaskId(), list);
            updateRetainedMemory();
        }
        synchronized (this) {
            if (this.closed || this.buffer.isFinished() || this.buffer.isFailed()) {
                return false;
            }
            this.successfulRequests++;
            this.averageBytesPerRequest = (long) ((((1.0d * this.averageBytesPerRequest) * (this.successfulRequests - 1)) / this.successfulRequests) + (j / this.successfulRequests));
            return true;
        }
    }

    private void updateRetainedMemory() {
        this.memoryContextReadLock.lock();
        try {
            if (this.memoryContext != null) {
                this.memoryContext.setBytes(this.buffer.getRetainedSizeInBytes());
            }
        } finally {
            this.memoryContextReadLock.unlock();
        }
    }

    private void releaseMemoryContext() {
        this.memoryContextWriteLock.lock();
        try {
            if (this.memoryContext != null) {
                this.memoryContext.setBytes(0L);
                this.memoryContext = null;
            }
        } finally {
            this.memoryContextWriteLock.unlock();
        }
    }

    private synchronized void requestComplete(HttpPageBufferClient httpPageBufferClient) {
        this.requestDuration.add(httpPageBufferClient.getLastRequestDurationMillis());
        if (!this.completedClients.contains(httpPageBufferClient) && !this.queuedClients.contains(httpPageBufferClient)) {
            this.queuedClients.add(httpPageBufferClient);
            this.runningClients.remove(httpPageBufferClient);
        }
        scheduleRequestIfNecessary();
    }

    private synchronized void clientFinished(HttpPageBufferClient httpPageBufferClient) {
        Objects.requireNonNull(httpPageBufferClient, "client is null");
        if (this.completedClients.add(httpPageBufferClient)) {
            this.runningClients.remove(httpPageBufferClient);
            this.buffer.taskFinished(httpPageBufferClient.getRemoteTaskId());
        }
        scheduleRequestIfNecessary();
    }

    private synchronized void clientFailed(HttpPageBufferClient httpPageBufferClient, Throwable th) {
        Objects.requireNonNull(httpPageBufferClient, "client is null");
        if (this.completedClients.add(httpPageBufferClient)) {
            this.runningClients.remove(httpPageBufferClient);
            this.buffer.taskFailed(httpPageBufferClient.getRemoteTaskId(), th);
            this.scheduledExecutor.execute(() -> {
                this.taskFailureListener.onTaskFailed(httpPageBufferClient.getRemoteTaskId(), th);
            });
            closeQuietly(httpPageBufferClient);
        }
        scheduleRequestIfNecessary();
    }

    private static void closeQuietly(HttpPageBufferClient httpPageBufferClient) {
        try {
            httpPageBufferClient.close();
        } catch (RuntimeException e) {
        }
    }

    static {
        $assertionsDisabled = !DirectExchangeClient.class.desiredAssertionStatus();
        log = Logger.get(DirectExchangeClient.class);
    }
}
