package com.huaweicloud.dis.producer.internals;

import com.huaweicloud.dis.core.handler.AsyncHandler;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
import com.huaweicloud.dis.util.CopyOnWriteMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dis/producer/internals/RecordAccumulator.class */
public final class RecordAccumulator {
    private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
    private final long maxBatchSize;
    private final int maxBatchCount;
    private final long maxBufferSize;
    private final int maxBufferCount;
    private final long retryBackoffMs;
    private boolean orderByPartition;
    private AtomicInteger bufferCount = new AtomicInteger(0);
    private AtomicLong bufferSize = new AtomicLong(0);
    private Map<StreamPartition, Future<PutRecordsResult>> onSendingPartitions = Collections.synchronizedMap(new HashMap());
    private volatile boolean closed = false;
    private final AtomicInteger flushesInProgress = new AtomicInteger(0);
    private final AtomicInteger appendsInProgress = new AtomicInteger(0);
    private final ConcurrentMap<StreamPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap();

    /* loaded from: input_file:com/huaweicloud/dis/producer/internals/RecordAccumulator$RecordAppendResult.class */
    public static final class RecordAppendResult {
        public final FutureRecordsMetadata future;
        public final boolean batchIsFull;
        public final boolean newBatchCreated;

        public RecordAppendResult(FutureRecordsMetadata futureRecordsMetadata, boolean z, boolean z2) {
            this.future = futureRecordsMetadata;
            this.batchIsFull = z;
            this.newBatchCreated = z2;
        }
    }

    public RecordAccumulator(long j, int i, long j2, int i2, long j3, boolean z) {
        this.maxBatchSize = j;
        this.maxBatchCount = i;
        this.maxBufferSize = j2;
        this.maxBufferCount = i2;
        this.retryBackoffMs = j3;
        this.orderByPartition = z;
    }

    /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
        java.lang.NullPointerException: Cannot invoke "String.charAt(int)" because "obj" is null
        	at jadx.core.utils.Utils.cleanObjectName(Utils.java:38)
        	at jadx.core.dex.instructions.args.ArgType.object(ArgType.java:86)
        	at jadx.core.dex.info.ClassInfo.fromName(ClassInfo.java:42)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.convertToHandlers(AttachTryCatchVisitor.java:113)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.initTryCatches(AttachTryCatchVisitor.java:54)
        	at jadx.core.dex.visitors.AttachTryCatchVisitor.visit(AttachTryCatchVisitor.java:42)
        */
    public com.huaweicloud.dis.producer.internals.RecordAccumulator.RecordAppendResult append(com.huaweicloud.dis.producer.internals.StreamPartition r9, long r10, com.huaweicloud.dis.iface.data.request.PutRecordsRequest r12, com.huaweicloud.dis.core.handler.AsyncHandler<com.huaweicloud.dis.iface.data.response.PutRecordsResult> r13, long r14) throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 689
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.huaweicloud.dis.producer.internals.RecordAccumulator.append(com.huaweicloud.dis.producer.internals.StreamPartition, long, com.huaweicloud.dis.iface.data.request.PutRecordsRequest, com.huaweicloud.dis.core.handler.AsyncHandler, long):com.huaweicloud.dis.producer.internals.RecordAccumulator$RecordAppendResult");
    }

    private RecordAppendResult tryAppend(long j, PutRecordsRequest putRecordsRequest, AsyncHandler<PutRecordsResult> asyncHandler, Deque<ProducerBatch> deque) {
        ProducerBatch peekLast = deque.peekLast();
        if (peekLast == null) {
            return null;
        }
        FutureRecordsMetadata tryAppend = peekLast.tryAppend(j, putRecordsRequest, asyncHandler);
        if (tryAppend != null) {
            return new RecordAppendResult(tryAppend, deque.size() > 1 || peekLast.isFull(), false);
        }
        peekLast.closeForRecordAppends();
        return null;
    }

    public void reenqueue(ProducerBatch producerBatch, long j) {
        producerBatch.reenqueued(j);
        Deque<ProducerBatch> orCreateDeque = getOrCreateDeque(producerBatch.getTp());
        synchronized (orCreateDeque) {
            orCreateDeque.addFirst(producerBatch);
        }
    }

    public boolean hasUndrained() {
        Iterator<Map.Entry<StreamPartition, Deque<ProducerBatch>>> it = this.batches.entrySet().iterator();
        while (it.hasNext()) {
            Deque<ProducerBatch> value = it.next().getValue();
            synchronized (value) {
                if (!value.isEmpty()) {
                    return true;
                }
            }
        }
        return false;
    }

    public void setOnSendingPartitionFuture(StreamPartition streamPartition, Future<PutRecordsResult> future) {
        this.onSendingPartitions.put(streamPartition, future);
    }

    public List<ProducerBatch> drain(long j, CopyOnWriteArrayList<StreamPartition> copyOnWriteArrayList) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<StreamPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            StreamPartition key = entry.getKey();
            Deque<ProducerBatch> value = entry.getValue();
            if (!this.orderByPartition || !copyOnWriteArrayList.contains(key)) {
                synchronized (value) {
                    ProducerBatch peekFirst = value.peekFirst();
                    if (peekFirst != null) {
                        if (peekFirst.waitedTimeMs(j) >= this.retryBackoffMs || peekFirst.isFull()) {
                            if (peekFirst.isEmpty()) {
                                peekFirst.reenqueued(j);
                            } else {
                                ProducerBatch pollFirst = value.pollFirst();
                                arrayList.add(pollFirst);
                                pollFirst.drained(j);
                                log.debug("Drain batch({} records) success, currentBufferCount is {}, currentBufferSize is {}, queueSize {}.", new Object[]{Integer.valueOf(pollFirst.getRelativeOffset()), Integer.valueOf(this.bufferCount.get()), Long.valueOf(this.bufferSize.get()), Integer.valueOf(value.size())});
                            }
                        }
                    }
                }
            }
        }
        return arrayList;
    }

    private Deque<ProducerBatch> getDeque(StreamPartition streamPartition) {
        return this.batches.get(streamPartition);
    }

    private Deque<ProducerBatch> getOrCreateDeque(StreamPartition streamPartition) {
        Deque<ProducerBatch> deque = this.batches.get(streamPartition);
        if (deque != null) {
            return deque;
        }
        ArrayDeque arrayDeque = new ArrayDeque();
        Deque<ProducerBatch> putIfAbsent = this.batches.putIfAbsent(streamPartition, arrayDeque);
        return putIfAbsent == null ? arrayDeque : putIfAbsent;
    }

    boolean flushInProgress() {
        return this.flushesInProgress.get() > 0;
    }

    public Map<StreamPartition, Deque<ProducerBatch>> batches() {
        return Collections.unmodifiableMap(this.batches);
    }

    public void beginFlush() {
        this.flushesInProgress.getAndIncrement();
    }

    private boolean appendsInProgress() {
        return this.appendsInProgress.get() > 0;
    }

    public void close() {
        this.closed = true;
    }

    public void batchIsDone(ProducerBatch producerBatch) {
        Deque<ProducerBatch> deque = getDeque(producerBatch.getTp());
        if (deque != null) {
            synchronized (deque) {
                this.bufferCount.decrementAndGet();
                this.bufferSize.addAndGet(-producerBatch.getTotolByteSize());
                deque.notify();
            }
        }
    }
}
