package com.huaweicloud.dis.producer.internals;

import com.huaweicloud.dis.DISAsync;
import com.huaweicloud.dis.core.handler.AsyncHandler;
import com.huaweicloud.dis.core.util.StringUtils;
import com.huaweicloud.dis.exception.DISClientException;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
import com.huaweicloud.dis.util.Utils;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dis/producer/internals/Sender.class */
public class Sender extends Thread {
    private final DISAsync client;
    private final RecordAccumulator accumulator;
    private volatile boolean running;
    private volatile boolean forceClose;
    private long retryBackoffMs;
    private static final int DEFAULT_SENDER_POLLING_MS = 50;
    private AtomicInteger flushId = new AtomicInteger();
    private AtomicLong inFlightRequestCount = new AtomicLong(0);
    private boolean exitRunLoop = false;
    private CopyOnWriteArrayList<StreamPartition> onSendingStreamPartitions = new CopyOnWriteArrayList<>();
    private static final Logger log = LoggerFactory.getLogger(Sender.class);
    public static AtomicLong totalSendCount = new AtomicLong();
    public static AtomicLong totalSendSuccessCount = new AtomicLong();
    public static AtomicLong totalSendFailedCount = new AtomicLong();
    public static AtomicLong totalSendTimes = new AtomicLong();
    public static AtomicLong totalSendSuccessTimes = new AtomicLong();
    public static AtomicLong totalSendFailedTimes = new AtomicLong();
    public static AtomicLong totalQueryTimes = new AtomicLong();

    public Sender(DISAsync dISAsync, RecordAccumulator recordAccumulator, long j) {
        setName("Sender Thread");
        this.client = dISAsync;
        this.accumulator = recordAccumulator;
        this.retryBackoffMs = j;
        this.running = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.debug("Starting DIS producer I/O thread.");
        while (this.running) {
            try {
                totalQueryTimes.incrementAndGet();
                run(System.currentTimeMillis());
            } catch (Exception e) {
                log.error("Uncaught error in DIS producer I/O thread: ", e);
            }
        }
        this.exitRunLoop = true;
        log.debug("Beginning shutdown of DIS producer I/O thread, sending remaining records.");
    }

    void run(long j) {
        if (sendProducerData(j)) {
            return;
        }
        if (this.retryBackoffMs <= 0) {
            LockSupport.parkNanos(50000000L);
            return;
        }
        long currentTimeMillis = this.retryBackoffMs - (System.currentTimeMillis() - j);
        if (currentTimeMillis > 0) {
            LockSupport.parkNanos(currentTimeMillis * 1000000);
        }
    }

    private boolean sendProducerData(long j) {
        List<ProducerBatch> drain = this.accumulator.drain(j, this.onSendingStreamPartitions);
        if (drain.isEmpty()) {
            log.trace("no data to send.");
            return false;
        }
        for (final ProducerBatch producerBatch : drain) {
            log.trace("begin to process batch {}, count {}, size {}B", new Object[]{producerBatch.getTp(), Integer.valueOf(producerBatch.getRelativeOffset()), Long.valueOf(producerBatch.getTotolByteSize())});
            final StreamPartition tp = producerBatch.getTp();
            final PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
            putRecordsRequest.setStreamName(tp.topic());
            putRecordsRequest.setStreamId(tp.streamId());
            putRecordsRequest.setRecords(producerBatch.getBatchPutRecordsRequestEntrys());
            totalSendTimes.incrementAndGet();
            totalSendCount.addAndGet(producerBatch.getRelativeOffset());
            this.inFlightRequestCount.incrementAndGet();
            this.onSendingStreamPartitions.add(tp);
            this.client.putRecordsAsync(putRecordsRequest, new AsyncHandler<PutRecordsResult>() { // from class: com.huaweicloud.dis.producer.internals.Sender.1
                long start = System.currentTimeMillis();

                @Override // com.huaweicloud.dis.core.handler.AsyncHandler
                public void onSuccess(PutRecordsResult putRecordsResult) {
                    Sender.totalSendSuccessTimes.incrementAndGet();
                    Sender.totalSendSuccessCount.addAndGet(putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get());
                    Sender.totalSendFailedCount.addAndGet(putRecordsResult.getFailedRecordCount().get());
                    if (putRecordsResult.getFailedRecordCount().get() > 0) {
                        String str = null;
                        int i = 0;
                        while (true) {
                            if (i >= putRecordsResult.getRecords().size()) {
                                break;
                            }
                            PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) putRecordsResult.getRecords().get(i);
                            if (!StringUtils.isNullOrEmpty(putRecordsResultEntry.getErrorCode())) {
                                str = putRecordsResultEntry.getErrorCode() + " : " + putRecordsResultEntry.getErrorMessage();
                                break;
                            }
                            i++;
                        }
                        Sender.log.error("Batch {} send partial successfully, cost {}ms, count {}, size {}B, failed count {}, failed info {}", new Object[]{tp.toString(), Long.valueOf(System.currentTimeMillis() - this.start), Integer.valueOf(producerBatch.getRelativeOffset()), Long.valueOf(producerBatch.getTotolByteSize()), Integer.valueOf(putRecordsResult.getFailedRecordCount().get()), str});
                    } else {
                        Sender.log.debug("Batch {} send successfully, cost {}ms, count {}, size {}B", new Object[]{tp.toString(), Long.valueOf(System.currentTimeMillis() - this.start), Integer.valueOf(producerBatch.getRelativeOffset()), Long.valueOf(producerBatch.getTotolByteSize())});
                    }
                    producerBatch.done(putRecordsResult, null);
                    Sender.this.batchIsDone(producerBatch);
                    Sender.this.inFlightRequestCount.decrementAndGet();
                    Sender.this.onSendingStreamPartitions.remove(tp);
                }

                @Override // com.huaweicloud.dis.core.handler.AsyncHandler
                public void onError(Exception exc) {
                    Sender.totalSendFailedTimes.incrementAndGet();
                    Sender.totalSendFailedCount.addAndGet(putRecordsRequest.getRecords().size());
                    Sender.log.error("Batch {} send failed, cost {}ms, count {}, size {}B, error info {}", new Object[]{tp.toString(), Long.valueOf(System.currentTimeMillis() - this.start), Integer.valueOf(producerBatch.getRelativeOffset()), Long.valueOf(producerBatch.getTotolByteSize()), exc.getMessage(), exc});
                    if (exc instanceof DISClientException) {
                        producerBatch.done(null, (DISClientException) exc);
                    } else {
                        producerBatch.done(null, new DISClientException(exc));
                    }
                    Sender.this.batchIsDone(producerBatch);
                    Sender.this.inFlightRequestCount.decrementAndGet();
                    Sender.this.onSendingStreamPartitions.remove(tp);
                }
            });
        }
        return true;
    }

    public void wakeup() {
        LockSupport.unpark(this);
    }

    public void close(long j) {
        this.running = false;
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.exitRunLoop && System.currentTimeMillis() - currentTimeMillis < j) {
            Utils.sleep(5L);
        }
        int i = 0;
        while (i < 3 && System.currentTimeMillis() - currentTimeMillis < j) {
            if (this.accumulator.hasUndrained() || this.inFlightRequestCount.get() > 0) {
                i = 0;
                try {
                    run(System.currentTimeMillis());
                } catch (Exception e) {
                    log.error("Uncaught error in DIS producer I/O thread: ", e);
                }
            } else {
                i++;
                Utils.sleep(5L);
            }
        }
        if (this.accumulator.hasUndrained() || this.inFlightRequestCount.get() > 0) {
            log.warn("The timeout period {}ms has been reached, but there are still records that have not been processed successfully.", Long.valueOf(j));
        }
    }

    public void flush() {
        wakeup();
    }

    public void batchIsDone(ProducerBatch producerBatch) {
        this.accumulator.batchIsDone(producerBatch);
    }
}
