package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.RequestProfiler;
import com.google.cloud.bigquery.storage.v1.StreamConnection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorker.class */
public class ConnectionWorker implements AutoCloseable {
    private final Duration maxRetryDuration;
    private String streamName;
    private String location;
    private ProtoSchema writerSchema;
    private final long maxInflightRequests;
    private final long maxInflightBytes;
    private final FlowController.LimitExceededBehavior limitExceededBehavior;
    private final String traceId;
    private String compressorName;

    @GuardedBy("lock")
    private final Deque<AppendRequestAndResponse> waitingRequestQueue;

    @GuardedBy("lock")
    private final Deque<AppendRequestAndResponse> inflightRequestQueue;

    @GuardedBy("lock")
    private TableSchemaAndTimestamp updatedSchema;
    private BigQueryWriteClient client;
    private StreamConnection streamConnection;
    private Thread appendThread;
    private final RetrySettings retrySettings;
    private final RequestProfiler.RequestProfilerHook requestProfilerHook;
    private final TelemetryMetrics telemetryMetrics;
    private final Boolean isMultiplexing;
    private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
    private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000;
    static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(5);
    private static String projectMatching = "projects/[^/]+/";
    private static Pattern streamPatternProject = Pattern.compile(projectMatching);
    static final Pattern DEFAULT_STREAM_PATTERN = Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");
    private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
    private static Pattern streamPatternTable = Pattern.compile(tableMatching);
    private ExecutorService threadPool = Executors.newFixedThreadPool(1);

    @GuardedBy("lock")
    private long inflightRequests = 0;

    @GuardedBy("lock")
    private long inflightBytes = 0;

    @GuardedBy("lock")
    private long conectionRetryCountWithoutCallback = 0;

    @GuardedBy("lock")
    private long connectionRetryStartTime = 0;

    @GuardedBy("lock")
    private boolean streamConnectionIsConnected = false;

    @GuardedBy("lock")
    private boolean inflightCleanuped = false;

    @GuardedBy("lock")
    private boolean userClosed = false;

    @GuardedBy("lock")
    private Throwable connectionFinalStatus = null;
    private final Set<String> destinationSet = ConcurrentHashMap.newKeySet();
    private final AtomicLong inflightWaitSec = new AtomicLong(0);
    private final String writerId = UUID.randomUUID().toString();
    private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
    private long testOnlyAppendLoopSleepTime = 0;

    @GuardedBy("lock")
    private int responsesToIgnore = 0;
    private Lock lock = new ReentrantLock();
    private Condition hasMessageInWaitingQueue = this.lock.newCondition();
    private Condition inflightReduced = this.lock.newCondition();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorker$AppendRequestAndResponse.class */
    public static final class AppendRequestAndResponse {
        final AppendRowsRequest message;
        final long messageSize;
        String requestUniqueId;
        ExponentialRetryAlgorithm retryAlgorithm;
        final StreamWriter streamWriter;
        Instant requestSendTimeStamp;
        final SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
        Instant blockMessageSendDeadline = Instant.now();
        Integer retryCount = 0;
        TimedAttemptSettings attemptSettings = null;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AppendRequestAndResponse(AppendRowsRequest appendRowsRequest, StreamWriter streamWriter, RetrySettings retrySettings, String str) {
            this.message = appendRowsRequest;
            this.messageSize = appendRowsRequest.getProtoRows().getSerializedSize();
            this.streamWriter = streamWriter;
            this.requestUniqueId = str;
            if (retrySettings != null) {
                this.retryAlgorithm = new ExponentialRetryAlgorithm(retrySettings, NanoClock.getDefaultClock());
            } else {
                this.retryAlgorithm = null;
            }
        }

        void setRequestSendQueueTime() {
            this.requestSendTimeStamp = Instant.now();
        }
    }

    @AutoValue
    /* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorker$Load.class */
    public static abstract class Load {
        private static double overwhelmedInflightCount = 0.2d;
        private static double overwhelmedInflightBytes = 0.2d;
        public static final Comparator<Load> LOAD_COMPARATOR = Comparator.comparing(load -> {
            return Integer.valueOf((int) (load.inFlightRequestsBytes() / 1024));
        }).thenComparing(load2 -> {
            return Integer.valueOf((int) (load2.inFlightRequestsCount() / 100));
        }).thenComparing((v0) -> {
            return v0.destinationCount();
        });
        public static final Comparator<Load> TEST_LOAD_COMPARATOR = Comparator.comparing(load -> {
            return Integer.valueOf((int) load.inFlightRequestsBytes());
        }).thenComparing(load2 -> {
            return Integer.valueOf((int) load2.inFlightRequestsCount());
        }).thenComparing((v0) -> {
            return v0.destinationCount();
        });

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long inFlightRequestsBytes();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long inFlightRequestsCount();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long destinationCount();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long maxInflightBytes();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long maxInflightCount();

        static Load create(long j, long j2, long j3, long j4, long j5) {
            return new AutoValue_ConnectionWorker_Load(j, j2, j3, j4, j5);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isOverwhelmed() {
            return ((double) inFlightRequestsCount()) > overwhelmedInflightCount * ((double) maxInflightCount()) || ((double) inFlightRequestsBytes()) > overwhelmedInflightBytes * ((double) maxInflightBytes());
        }

        @VisibleForTesting
        public static void setOverwhelmedBytesThreshold(double d) {
            overwhelmedInflightBytes = d;
        }

        @VisibleForTesting
        public static void setOverwhelmedCountsThreshold(double d) {
            overwhelmedInflightCount = d;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:com/google/cloud/bigquery/storage/v1/ConnectionWorker$TableSchemaAndTimestamp.class */
    public static abstract class TableSchemaAndTimestamp {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long updateTimeStamp();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TableSchema updatedSchema();

        /* JADX INFO: Access modifiers changed from: package-private */
        public static TableSchemaAndTimestamp create(long j, TableSchema tableSchema) {
            return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(j, tableSchema);
        }
    }

    public static Boolean isDefaultStreamName(String str) {
        return Boolean.valueOf(DEFAULT_STREAM_PATTERN.matcher(str).matches());
    }

    public static long getApiMaxRequestBytes() {
        return 20000000L;
    }

    static String extractProjectName(String str) {
        Matcher matcher = streamPatternProject.matcher(str);
        if (matcher.find()) {
            return matcher.group();
        }
        throw new IllegalStateException(String.format("The passed in stream name does not match standard format %s", str));
    }

    static String getRoutingHeader(String str, String str2) {
        return extractProjectName(str) + "locations/" + str2;
    }

    private String getTableName() {
        Matcher matcher = streamPatternTable.matcher(this.streamName);
        return matcher.find() ? matcher.group(1) : "";
    }

    public boolean hasActiveConnection() {
        boolean z = false;
        this.lock.lock();
        try {
            if (this.streamConnectionIsConnected) {
                z = true;
            }
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    public int getInflightRequestQueueLength() {
        this.lock.lock();
        try {
            return this.inflightRequestQueue.size();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Attributes getTelemetryAttributes() {
        return this.telemetryMetrics.getTelemetryAttributes();
    }

    public ConnectionWorker(String str, String str2, ProtoSchema protoSchema, long j, long j2, Duration duration, FlowController.LimitExceededBehavior limitExceededBehavior, String str3, @Nullable String str4, BigQueryWriteSettings bigQueryWriteSettings, RetrySettings retrySettings, boolean z, boolean z2, boolean z3) throws IOException {
        this.location = null;
        this.compressorName = null;
        this.streamName = str;
        if (str2 != null && !str2.isEmpty()) {
            this.location = str2;
        }
        this.maxRetryDuration = duration != null ? duration : Duration.ofMinutes(5L);
        if (protoSchema == null) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("Writer schema must be provided when building this writer."));
        }
        this.maxInflightRequests = j;
        this.maxInflightBytes = j2;
        this.limitExceededBehavior = limitExceededBehavior;
        this.traceId = str3;
        this.waitingRequestQueue = new LinkedList();
        this.inflightRequestQueue = new LinkedList();
        this.compressorName = str4;
        this.retrySettings = retrySettings;
        this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(z);
        this.telemetryMetrics = new TelemetryMetrics(this, z2, getTableName(), this.writerId, str3);
        this.isMultiplexing = Boolean.valueOf(z3);
        HashMap hashMap = new HashMap();
        hashMap.putAll(bigQueryWriteSettings.m8toBuilder().getHeaderProvider().getHeaders());
        if (this.location == null) {
            hashMap.put("x-goog-request-params", "write_stream=" + this.streamName);
        } else {
            hashMap.put("x-goog-request-params", "write_location=" + getRoutingHeader(this.streamName, this.location));
        }
        ((BigQueryWriteSettings.Builder) bigQueryWriteSettings.m8toBuilder().setHeaderProvider(FixedHeaderProvider.create(hashMap))).m9build();
        this.client = BigQueryWriteClient.create(bigQueryWriteSettings);
        this.appendThread = new Thread(new Runnable() { // from class: com.google.cloud.bigquery.storage.v1.ConnectionWorker.1
            @Override // java.lang.Runnable
            public void run() {
                ConnectionWorker.this.appendLoop();
            }
        });
        this.appendThread.setUncaughtExceptionHandler((thread, th) -> {
            log.warning("Exception thrown from append loop, thus stream writer is shutdown due to exception: " + th.toString());
            this.lock.lock();
            try {
                this.connectionFinalStatus = th;
                while (!this.waitingRequestQueue.isEmpty()) {
                    this.inflightRequestQueue.addLast(this.waitingRequestQueue.pollFirst());
                }
                cleanupConnectionAndRequests(true);
            } finally {
                this.lock.unlock();
            }
        });
        this.appendThread.start();
    }

    private void resetConnection() {
        log.info("Start connecting stream: " + this.streamName + " id: " + this.writerId);
        this.telemetryMetrics.recordConnectionStart();
        if (this.streamConnection != null) {
            this.streamConnection.close();
            Uninterruptibles.sleepUninterruptibly(calculateSleepTimeMilli(this.conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS);
        }
        this.streamConnection = new StreamConnection(this.client, new StreamConnection.RequestCallback() { // from class: com.google.cloud.bigquery.storage.v1.ConnectionWorker.2
            @Override // com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback
            public void run(AppendRowsResponse appendRowsResponse) {
                ConnectionWorker.this.requestCallback(appendRowsResponse);
            }
        }, new StreamConnection.DoneCallback() { // from class: com.google.cloud.bigquery.storage.v1.ConnectionWorker.3
            @Override // com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback
            public void run(Throwable th) {
                ConnectionWorker.this.doneCallback(th);
            }
        }, this.compressorName);
        log.info("Finish connecting stream: " + this.streamName + " id: " + this.writerId);
    }

    @GuardedBy("lock")
    private boolean shouldWaitForBackoff(AppendRequestAndResponse appendRequestAndResponse) {
        if (this.retrySettings == null || !Instant.now().isBefore(appendRequestAndResponse.blockMessageSendDeadline)) {
            return false;
        }
        log.fine(String.format("Waiting for wait queue to unblock at %s for retry # %s", appendRequestAndResponse.blockMessageSendDeadline, appendRequestAndResponse.retryCount));
        return true;
    }

    private void waitForBackoffIfNecessary(AppendRequestAndResponse appendRequestAndResponse) {
        this.lock.lock();
        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.RETRY_BACKOFF, appendRequestAndResponse.requestUniqueId);
        try {
            try {
                Condition newCondition = this.lock.newCondition();
                while (shouldWaitForBackoff(appendRequestAndResponse)) {
                    newCondition.await(100L, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        } finally {
            this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RETRY_BACKOFF, appendRequestAndResponse.requestUniqueId);
            this.lock.unlock();
        }
    }

    @GuardedBy("lock")
    private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse appendRequestAndResponse) {
        addMessageToWaitingQueue(appendRequestAndResponse, true);
    }

    @GuardedBy("lock")
    private void addMessageToBackOfWaitingQueue(AppendRequestAndResponse appendRequestAndResponse) {
        addMessageToWaitingQueue(appendRequestAndResponse, false);
    }

    @GuardedBy("lock")
    private void addMessageToWaitingQueue(AppendRequestAndResponse appendRequestAndResponse, boolean z) {
        this.inflightRequests++;
        this.inflightBytes += appendRequestAndResponse.messageSize;
        this.hasMessageInWaitingQueue.signal();
        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, appendRequestAndResponse.requestUniqueId);
        if (z) {
            this.waitingRequestQueue.addFirst(appendRequestAndResponse);
        } else {
            this.waitingRequestQueue.add(appendRequestAndResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows protoRows, long j, String str) {
        if (this.location != null && !this.location.equals(streamWriter.getLocation())) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("StreamWriter with location " + streamWriter.getLocation() + " is scheduled to use a connection with location " + this.location));
        }
        if (this.location == null && !streamWriter.getStreamName().equals(this.streamName)) {
            throw new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("StreamWriter with stream name " + streamWriter.getStreamName() + " is scheduled to use a connection with stream name " + this.streamName));
        }
        Preconditions.checkNotNull(streamWriter);
        AppendRowsRequest.Builder newBuilder = AppendRowsRequest.newBuilder();
        newBuilder.setProtoRows(AppendRowsRequest.ProtoData.newBuilder().setWriterSchema(streamWriter.getProtoSchema()).setRows(protoRows).build());
        if (j >= 0) {
            newBuilder.setOffset(Int64Value.of(j));
        }
        newBuilder.setWriteStream(streamWriter.getStreamName());
        newBuilder.putAllMissingValueInterpretations(streamWriter.getMissingValueInterpretationMap());
        if (streamWriter.getDefaultValueInterpretation() != AppendRowsRequest.MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
            newBuilder.setDefaultMissingValueInterpretation(streamWriter.getDefaultValueInterpretation());
        }
        return appendInternal(streamWriter, newBuilder.build(), str);
    }

    Boolean isUserClosed() {
        this.lock.lock();
        try {
            return Boolean.valueOf(this.userClosed);
        } finally {
            this.lock.unlock();
        }
    }

    String getWriteLocation() {
        return this.location;
    }

    private ApiFuture<AppendRowsResponse> appendInternal(StreamWriter streamWriter, AppendRowsRequest appendRowsRequest, String str) {
        AppendRequestAndResponse appendRequestAndResponse = new AppendRequestAndResponse(appendRowsRequest, streamWriter, this.retrySettings, str);
        if (appendRequestAndResponse.messageSize > getApiMaxRequestBytes()) {
            appendRequestAndResponse.appendResult.setException(new StatusRuntimeException(Status.fromCode(Status.Code.INVALID_ARGUMENT).withDescription("MessageSize is too large. Max allow: " + getApiMaxRequestBytes() + " Actual: " + appendRequestAndResponse.messageSize)));
            return appendRequestAndResponse.appendResult;
        }
        this.lock.lock();
        try {
            if (this.userClosed) {
                appendRequestAndResponse.appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is already closed during append"), this.streamName, this.writerId));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = appendRequestAndResponse.appendResult;
                this.lock.unlock();
                return settableApiFuture;
            }
            if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
                if (this.inflightRequests + 1 >= this.maxInflightRequests) {
                    throw new Exceptions.InflightRequestsLimitExceededException(this.writerId, this.maxInflightRequests);
                }
                if (this.inflightBytes + appendRequestAndResponse.messageSize >= this.maxInflightBytes) {
                    throw new Exceptions.InflightBytesLimitExceededException(this.writerId, this.maxInflightBytes);
                }
            }
            if (this.connectionFinalStatus != null) {
                appendRequestAndResponse.appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is closed due to " + (this.connectionFinalStatus.toString().contains("com.google.api.gax.rpc.UnavailableException") ? this.connectionFinalStatus.toString() + ". This is a most likely a transient condition and may be corrected by retrying with a backoff." : this.connectionFinalStatus.toString())), this.streamName, this.writerId));
                SettableApiFuture<AppendRowsResponse> settableApiFuture2 = appendRequestAndResponse.appendResult;
                this.lock.unlock();
                return settableApiFuture2;
            }
            this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, str);
            this.inflightRequests++;
            this.inflightBytes += appendRequestAndResponse.messageSize;
            this.waitingRequestQueue.addLast(appendRequestAndResponse);
            this.hasMessageInWaitingQueue.signal();
            this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, str);
            try {
                maybeWaitForInflightQuota();
                this.requestProfilerHook.endOperation(RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, str);
                SettableApiFuture<AppendRowsResponse> settableApiFuture3 = appendRequestAndResponse.appendResult;
                this.lock.unlock();
                return settableApiFuture3;
            } catch (StatusRuntimeException e) {
                this.inflightRequests--;
                this.waitingRequestQueue.pollLast();
                this.inflightBytes -= appendRequestAndResponse.messageSize;
                throw e;
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @GuardedBy("lock")
    private void maybeWaitForInflightQuota() {
        long currentTimeMillis;
        long currentTimeMillis2 = System.currentTimeMillis();
        do {
            if (this.inflightRequests < this.maxInflightRequests && this.inflightBytes < this.maxInflightBytes) {
                this.inflightWaitSec.set((System.currentTimeMillis() - currentTimeMillis2) / 1000);
                return;
            }
            try {
                this.inflightReduced.await(100L, TimeUnit.MILLISECONDS);
                currentTimeMillis = System.currentTimeMillis() - currentTimeMillis2;
            } catch (InterruptedException e) {
                log.warning("Interrupted while waiting for inflight quota. Stream: " + this.streamName + " Error: " + e.toString());
                throw new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withCause(e).withDescription("Interrupted while waiting for quota."));
            }
        } while (currentTimeMillis <= INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI);
        throw new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withDescription(String.format("Interrupted while waiting for quota due to long waiting time %sms", Long.valueOf(currentTimeMillis))));
    }

    @VisibleForTesting
    static long calculateSleepTimeMilli(long j) {
        return (long) Math.min(Math.pow(2.0d, j) * 50.0d, 60000.0d);
    }

    @VisibleForTesting
    void setTestOnlyAppendLoopSleepTime(long j) {
        this.testOnlyAppendLoopSleepTime = j;
    }

    @VisibleForTesting
    void setTestOnlyRunTimeExceptionInAppendLoop(RuntimeException runtimeException) {
        this.testOnlyRunTimeExceptionInAppendLoop = runtimeException;
    }

    public long getInflightWaitSeconds() {
        return this.inflightWaitSec.longValue();
    }

    public String getWriterId() {
        return this.writerId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnectionInUnrecoverableState() {
        this.lock.lock();
        try {
            return this.connectionFinalStatus != null;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.info("User closing stream: " + this.streamName);
        this.lock.lock();
        try {
            this.userClosed = true;
            log.info("Waiting for append thread to finish. Stream: " + this.streamName + " id: " + this.writerId);
            try {
                this.appendThread.join();
            } catch (InterruptedException e) {
                log.warning("Append handler join is interrupted. Stream: " + this.streamName + " id: " + this.writerId + " Error: " + e.toString());
            }
            this.client.close();
            try {
                this.client.awaitTermination(150L, TimeUnit.SECONDS);
            } catch (InterruptedException e2) {
                log.warning("Client await termination timeout in writer id " + this.writerId);
            }
            try {
                log.fine("Begin shutting down user callback thread pool for stream " + this.streamName + " id: " + this.writerId);
                this.threadPool.shutdown();
                this.threadPool.awaitTermination(3L, TimeUnit.MINUTES);
                log.info("User close finishes for stream " + this.streamName);
            } catch (InterruptedException e3) {
                log.warning("Close on thread pool for " + this.streamName + " id: " + this.writerId + " is interrupted with exception: " + e3.toString());
                throw new IllegalStateException("Thread pool shutdown is interrupted for stream " + this.streamName);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendLoop() {
        Instant instant;
        LinkedList linkedList = new LinkedList();
        boolean z = false;
        boolean z2 = true;
        while (!waitingQueueDrained()) {
            this.lock.lock();
            try {
                try {
                    this.hasMessageInWaitingQueue.await(100L, TimeUnit.MILLISECONDS);
                    if (this.inflightRequestQueue.size() > 0 && (instant = this.inflightRequestQueue.getFirst().requestSendTimeStamp) != null) {
                        throwIfWaitCallbackTooLong(instant);
                    }
                    z = !this.streamConnectionIsConnected && this.connectionFinalStatus == null;
                    if (z) {
                        while (!this.inflightRequestQueue.isEmpty()) {
                            AppendRequestAndResponse pollLast = this.inflightRequestQueue.pollLast();
                            this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, pollLast.requestUniqueId);
                            pollLast.requestSendTimeStamp = null;
                            this.requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, pollLast.requestUniqueId);
                            this.waitingRequestQueue.addFirst(pollLast);
                        }
                        this.responsesToIgnore = 0;
                    }
                    while (!this.waitingRequestQueue.isEmpty()) {
                        AppendRequestAndResponse pollFirst = this.waitingRequestQueue.pollFirst();
                        this.requestProfilerHook.endOperation(RequestProfiler.OperationName.WAIT_QUEUE, pollFirst.requestUniqueId);
                        waitForBackoffIfNecessary(pollFirst);
                        this.inflightRequestQueue.add(pollFirst);
                        linkedList.addLast(pollFirst);
                    }
                    this.lock.unlock();
                } catch (InterruptedException e) {
                    log.warning("Interrupted while waiting for message. Stream: " + this.streamName + " id: " + this.writerId + " Error: " + e.toString());
                    this.lock.unlock();
                }
                if (!linkedList.isEmpty()) {
                    if (z) {
                        this.lock.lock();
                        try {
                            this.streamConnectionIsConnected = true;
                            this.lock.unlock();
                            if (this.testOnlyRunTimeExceptionInAppendLoop != null) {
                                Uninterruptibles.sleepUninterruptibly(this.testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS);
                                throw this.testOnlyRunTimeExceptionInAppendLoop;
                            }
                            resetConnection();
                            z2 = true;
                        } finally {
                            this.lock.unlock();
                        }
                    }
                    while (!linkedList.isEmpty()) {
                        ((AppendRequestAndResponse) linkedList.peekFirst()).setRequestSendQueueTime();
                        AppendRequestAndResponse appendRequestAndResponse = (AppendRequestAndResponse) linkedList.pollFirst();
                        AppendRowsRequest appendRowsRequest = appendRequestAndResponse.message;
                        String str = appendRequestAndResponse.requestUniqueId;
                        AppendRowsRequest.Builder builder = appendRowsRequest.toBuilder();
                        if (this.writerSchema == null) {
                            this.writerSchema = appendRowsRequest.getProtoRows().getWriterSchema();
                        }
                        if ((!appendRowsRequest.getWriteStream().isEmpty() && !this.streamName.isEmpty() && !appendRowsRequest.getWriteStream().equals(this.streamName)) || (appendRowsRequest.getProtoRows().hasWriterSchema() && !appendRowsRequest.getProtoRows().getWriterSchema().equals(this.writerSchema))) {
                            this.streamName = appendRowsRequest.getWriteStream();
                            this.telemetryMetrics.refreshOpenTelemetryTableNameAttributes(getTableName());
                            this.writerSchema = appendRowsRequest.getProtoRows().getWriterSchema();
                            z2 = true;
                        }
                        if (z2) {
                            this.destinationSet.add(this.streamName);
                            builder.setTraceId(appendRequestAndResponse.streamWriter.getFullTraceId());
                        } else if (!this.isMultiplexing.booleanValue()) {
                            builder.clearWriteStream();
                        }
                        if (!z2) {
                            builder.getProtoRowsBuilder().clearWriterSchema();
                        }
                        z2 = false;
                        this.requestProfilerHook.startOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, str);
                        this.streamConnection.send(builder.build());
                    }
                }
            } catch (Throwable th) {
                throw th;
            }
        }
        cleanupConnectionAndRequests(false);
    }

    private void cleanupConnectionAndRequests(boolean z) {
        log.info("Cleanup starts. Stream: " + this.streamName + " id: " + this.writerId + " userClose: " + this.userClosed + " final exception: " + (this.connectionFinalStatus == null ? "null" : this.connectionFinalStatus.toString()));
        if (this.streamConnection != null) {
            this.streamConnection.close();
            if (!z) {
                waitForDoneCallback(3L, TimeUnit.MINUTES);
            }
        }
        log.info("Stream connection is fully closed. Cleaning up inflight requests. Stream: " + this.streamName + " id: " + this.writerId);
        cleanupInflightRequests();
        log.info("Append thread is done. Stream: " + this.streamName + " id: " + this.writerId);
    }

    private void throwIfWaitCallbackTooLong(Instant instant) {
        Duration between = Duration.between(instant, Instant.now());
        if (between.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
            throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(between, this.writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
        }
    }

    private boolean waitingQueueDrained() {
        boolean z;
        this.lock.lock();
        try {
            if (this.userClosed || this.connectionFinalStatus != null) {
                if (this.waitingRequestQueue.isEmpty()) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    private void waitForDoneCallback(long j, TimeUnit timeUnit) {
        log.fine("Waiting for done callback from stream connection. Stream: " + this.streamName + " id: " + this.writerId);
        long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
        while (System.nanoTime() <= nanoTime) {
            this.lock.lock();
            try {
                if (!this.streamConnectionIsConnected) {
                    return;
                }
                this.lock.unlock();
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } finally {
                this.lock.unlock();
            }
        }
        this.lock.lock();
        try {
            log.warning("Donecallback is not triggered within timeout frame for writer " + this.writerId);
            if (this.connectionFinalStatus == null) {
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode(Status.Code.CANCELLED).withDescription("Timeout waiting for DoneCallback."));
            }
            this.lock.unlock();
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v15, types: [java.lang.Throwable] */
    private void cleanupInflightRequests() {
        Object streamWriterClosedException = new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Connection is already closed, cleanup inflight request"), this.streamName, this.writerId);
        LinkedList linkedList = new LinkedList();
        this.lock.lock();
        try {
            if (this.connectionFinalStatus != null) {
                streamWriterClosedException = this.connectionFinalStatus;
            }
            while (!this.inflightRequestQueue.isEmpty()) {
                linkedList.addLast(pollFirstInflightRequestQueue());
            }
            this.inflightCleanuped = true;
            log.fine("Cleaning " + linkedList.size() + " inflight requests with error: " + streamWriterClosedException + " for Stream " + this.streamName + " id: " + this.writerId);
            int size = linkedList.size();
            for (int i = 0; i < size; i++) {
                if (i == 0) {
                    ((AppendRequestAndResponse) linkedList.pollFirst()).appendResult.setException(streamWriterClosedException);
                } else {
                    ((AppendRequestAndResponse) linkedList.pollFirst()).appendResult.setException(new Exceptions.StreamWriterClosedException(Status.fromCode(Status.Code.ABORTED).withDescription("Connection is aborted due to an unrecoverable failure of another request sharing the connection. Please retry this request."), this.streamName, this.writerId));
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private Boolean retryOnRetryableError(Status.Code code, AppendRequestAndResponse appendRequestAndResponse) {
        if (this.retrySettings != null && this.retrySettings.getMaxAttempts() != 0) {
            if (!isConnectionErrorRetriable(code) && code != Status.Code.RESOURCE_EXHAUSTED) {
                return false;
            }
            if (appendRequestAndResponse.retryCount.intValue() >= this.retrySettings.getMaxAttempts()) {
                log.info(String.format("Max retry count reached for message in stream %s at offset %d.  Retry count: %d", this.streamName, Long.valueOf(appendRequestAndResponse.message.getOffset().getValue()), appendRequestAndResponse.retryCount));
                return false;
            }
            this.lock.lock();
            try {
                Integer num = appendRequestAndResponse.retryCount;
                appendRequestAndResponse.retryCount = Integer.valueOf(appendRequestAndResponse.retryCount.intValue() + 1);
                if (this.retrySettings != null && useBackoffForError(code, this.streamName)) {
                    appendRequestAndResponse.attemptSettings = appendRequestAndResponse.retryAlgorithm.createNextAttempt(appendRequestAndResponse.attemptSettings == null ? appendRequestAndResponse.retryAlgorithm.createFirstAttempt() : appendRequestAndResponse.attemptSettings);
                    appendRequestAndResponse.blockMessageSendDeadline = Instant.now().plusMillis(appendRequestAndResponse.attemptSettings.getRetryDelay().toMillis());
                    log.info("Messages blocked for retry for " + Duration.between(Instant.now(), appendRequestAndResponse.blockMessageSendDeadline) + " until " + appendRequestAndResponse.blockMessageSendDeadline);
                }
                Long valueOf = Long.valueOf(appendRequestAndResponse.message.hasOffset() ? appendRequestAndResponse.message.getOffset().getValue() : -1L);
                if (isDefaultStreamName(this.streamName).booleanValue() || valueOf.longValue() == -1) {
                    log.info(String.format("Retrying default stream message in stream %s for in-stream error: %s, retry count: %s", this.streamName, code, appendRequestAndResponse.retryCount));
                    addMessageToFrontOfWaitingQueue(appendRequestAndResponse);
                } else {
                    log.info(String.format("Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry count: %s", this.streamName, Long.valueOf(appendRequestAndResponse.message.getOffset().getValue()), code, appendRequestAndResponse.retryCount));
                    while (!this.inflightRequestQueue.isEmpty()) {
                        addMessageToFrontOfWaitingQueue(pollLastInflightRequestQueue());
                        this.responsesToIgnore++;
                    }
                    addMessageToFrontOfWaitingQueue(appendRequestAndResponse);
                }
                this.lock.unlock();
                return true;
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestCallback(AppendRowsResponse appendRowsResponse) {
        if (appendRowsResponse.hasUpdatedSchema()) {
            log.fine(String.format("Got response with schema updated (omitting updated schema in response here): %s writer id %s", appendRowsResponse.toBuilder().clearUpdatedSchema().build().toString(), this.writerId));
        }
        this.lock.lock();
        try {
            if (this.responsesToIgnore > 0) {
                if (appendRowsResponse.hasError()) {
                    log.fine(String.format("Ignoring response in stream %s at offset %s.", this.streamName, appendRowsResponse));
                } else {
                    log.warning(String.format("Unexpected successful response in stream %s at offset %s.  Due to a previous retryable error being inflight, this message is being ignored.", this.streamName, appendRowsResponse.getAppendResult().getOffset()));
                }
                this.responsesToIgnore--;
                this.lock.unlock();
                return;
            }
            if (appendRowsResponse.hasUpdatedSchema()) {
                this.updatedSchema = TableSchemaAndTimestamp.create(System.nanoTime(), appendRowsResponse.getUpdatedSchema());
            }
            if (this.conectionRetryCountWithoutCallback != 0) {
                this.conectionRetryCountWithoutCallback = 0L;
            }
            if (this.connectionRetryStartTime != 0) {
                this.connectionRetryStartTime = 0L;
            }
            if (this.inflightRequestQueue.isEmpty()) {
                if (this.inflightCleanuped) {
                    return;
                }
                log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue.");
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Request callback called on an empty inflight queue."));
                this.lock.unlock();
                return;
            }
            Instant instant = this.inflightRequestQueue.getFirst().requestSendTimeStamp;
            if (instant != null) {
                this.telemetryMetrics.recordNetworkLatency(Duration.between(instant, Instant.now()));
            }
            AppendRequestAndResponse pollFirstInflightRequestQueue = pollFirstInflightRequestQueue();
            this.requestProfilerHook.endOperation(RequestProfiler.OperationName.RESPONSE_LATENCY, pollFirstInflightRequestQueue.requestUniqueId);
            this.lock.unlock();
            this.telemetryMetrics.recordResponse(pollFirstInflightRequestQueue.messageSize, pollFirstInflightRequestQueue.message.getProtoRows().getRows().getSerializedRowsCount(), Status.Code.values()[appendRowsResponse.hasError() ? appendRowsResponse.getError().getCode() : Status.Code.OK.ordinal()].toString(), pollFirstInflightRequestQueue.retryCount.intValue() > 0);
            if (appendRowsResponse.hasError() && retryOnRetryableError(Status.Code.values()[appendRowsResponse.getError().getCode()], pollFirstInflightRequestQueue).booleanValue()) {
                log.info("Attempting to retry on error: " + appendRowsResponse.getError().toString());
            } else {
                this.threadPool.submit(() -> {
                    try {
                        if (appendRowsResponse.hasError()) {
                            StatusRuntimeException storageException = Exceptions.toStorageException(appendRowsResponse.getError(), null);
                            log.fine(String.format("Got error message: %s", appendRowsResponse.toString()));
                            if (storageException != null) {
                                pollFirstInflightRequestQueue.appendResult.setException(storageException);
                            } else if (appendRowsResponse.getRowErrorsCount() > 0) {
                                HashMap hashMap = new HashMap();
                                for (int i = 0; i < appendRowsResponse.getRowErrorsCount(); i++) {
                                    RowError rowErrors = appendRowsResponse.getRowErrors(i);
                                    hashMap.put(Integer.valueOf(Math.toIntExact(rowErrors.getIndex())), rowErrors.getMessage());
                                }
                                pollFirstInflightRequestQueue.appendResult.setException(new Exceptions.AppendSerializationError(appendRowsResponse.getError().getCode(), appendRowsResponse.getError().getMessage(), this.streamName, hashMap));
                            } else {
                                pollFirstInflightRequestQueue.appendResult.setException(new StatusRuntimeException(Status.fromCodeValue(appendRowsResponse.getError().getCode()).withDescription(appendRowsResponse.getError().getMessage())));
                            }
                        } else {
                            pollFirstInflightRequestQueue.appendResult.set(appendRowsResponse);
                        }
                    } finally {
                        this.requestProfilerHook.endOperation(RequestProfiler.OperationName.TOTAL_LATENCY, pollFirstInflightRequestQueue.requestUniqueId);
                    }
                });
            }
        } finally {
            this.lock.unlock();
        }
    }

    private boolean isConnectionErrorRetriable(Status.Code code) {
        return code == Status.Code.ABORTED || code == Status.Code.UNAVAILABLE || code == Status.Code.CANCELLED || code == Status.Code.INTERNAL || code == Status.Code.DEADLINE_EXCEEDED;
    }

    private boolean useBackoffForError(Status.Code code, String str) {
        return (isDefaultStreamName(str).booleanValue() && code == Status.Code.INTERNAL) || code == Status.Code.RESOURCE_EXHAUSTED;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [com.google.cloud.bigquery.storage.v1.Exceptions$StorageException] */
    public void doneCallback(Throwable th) {
        log.info("Received done callback. Stream: " + this.streamName + " worker id: " + this.writerId + " Final status: " + th.toString());
        boolean contains = th.toString().contains("Closing the stream because it has been inactive");
        this.lock.lock();
        try {
            this.streamConnectionIsConnected = false;
            this.telemetryMetrics.recordConnectionEnd(Status.Code.values()[Status.fromThrowable(th).getCode().ordinal()].toString());
            if (this.connectionFinalStatus == null) {
                if (!contains && this.connectionRetryStartTime == 0) {
                    this.connectionRetryStartTime = System.currentTimeMillis();
                }
                if (isConnectionErrorRetriable(Status.fromThrowable(th).getCode()) && !this.userClosed && (((float) this.maxRetryDuration.toMillis()) == 0.0f || contains || System.currentTimeMillis() - this.connectionRetryStartTime <= this.maxRetryDuration.toMillis())) {
                    if (!contains) {
                        this.conectionRetryCountWithoutCallback++;
                        this.telemetryMetrics.recordConnectionStartWithRetry();
                    }
                    log.info("Connection is going to be reestablished with the next request. Retriable error " + th.toString() + " received, retry count " + this.conectionRetryCountWithoutCallback + ", millis left to retry " + (this.maxRetryDuration.toMillis() - (this.connectionRetryStartTime > 0 ? System.currentTimeMillis() - this.connectionRetryStartTime : 0L)) + ", for stream " + this.streamName + " id:" + this.writerId);
                } else {
                    ?? storageException = Exceptions.toStorageException(th);
                    this.connectionFinalStatus = storageException != 0 ? storageException : th;
                    log.info("Connection finished with error " + th.toString() + " for stream " + this.streamName + " with write id: " + this.writerId + ", millis left to retry was " + (this.maxRetryDuration.toMillis() - (System.currentTimeMillis() - this.connectionRetryStartTime)));
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    @GuardedBy("lock")
    private AppendRequestAndResponse pollInflightRequestQueue(boolean z) {
        AppendRequestAndResponse pollLast = z ? this.inflightRequestQueue.pollLast() : this.inflightRequestQueue.poll();
        pollLast.requestSendTimeStamp = null;
        this.inflightRequests--;
        this.inflightBytes -= pollLast.messageSize;
        this.inflightReduced.signal();
        return pollLast;
    }

    @GuardedBy("lock")
    private AppendRequestAndResponse pollLastInflightRequestQueue() {
        return pollInflightRequestQueue(true);
    }

    @GuardedBy("lock")
    private AppendRequestAndResponse pollFirstInflightRequestQueue() {
        return pollInflightRequestQueue(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized TableSchemaAndTimestamp getUpdatedSchema() {
        return this.updatedSchema;
    }

    public Load getLoad() {
        return Load.create(this.inflightBytes, this.inflightRequests, this.destinationSet.size(), this.maxInflightBytes, this.maxInflightRequests);
    }

    @VisibleForTesting
    static void setMaxInflightQueueWaitTime(long j) {
        INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = j;
    }

    @VisibleForTesting
    static void setMaxInflightRequestWaitTime(Duration duration) {
        MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = duration;
    }
}
