package com.huaweicloud.dis.producer;

import com.huaweicloud.dis.DISAsync;
import com.huaweicloud.dis.DISClientAsync;
import com.huaweicloud.dis.DISConfig;
import com.huaweicloud.dis.core.builder.DefaultExecutorFactory;
import com.huaweicloud.dis.core.handler.AsyncHandler;
import com.huaweicloud.dis.core.util.StringUtils;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequestEntry;
import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
import com.huaweicloud.dis.iface.stream.request.DescribeStreamRequest;
import com.huaweicloud.dis.iface.stream.response.DescribeStreamResult;
import com.huaweicloud.dis.producer.internals.RecordAccumulator;
import com.huaweicloud.dis.producer.internals.Sender;
import com.huaweicloud.dis.producer.internals.StreamPartition;
import com.huaweicloud.dis.util.ExponentialBackOff;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/dis/producer/DISProducer.class */
public class DISProducer {
    private static final Logger log = LoggerFactory.getLogger(DISProducer.class);
    private static final String STABLE_PARTITION_ID = "nb";
    private long maxBlockMs;
    private long lingerMs;
    private RecordAccumulator accumulator;
    private Sender sender;
    private DISAsync disAsync;
    private DISConfig disConfig;
    private boolean orderByPartition;
    private long metadataTimeoutMS;
    private ConcurrentHashMap<String, StreamInfo> metadata;
    private CopyOnWriteArrayList<String> onSyncStreams;

    /* loaded from: input_file:com/huaweicloud/dis/producer/DISProducer$PutRecordsResultAsyncHandler.class */
    private static class PutRecordsResultAsyncHandler implements AsyncHandler<PutRecordsResult> {
        private AsyncHandler<PutRecordsResultEntry> asyncHandler;

        public PutRecordsResultAsyncHandler(AsyncHandler<PutRecordsResultEntry> asyncHandler) {
            this.asyncHandler = asyncHandler;
        }

        @Override // com.huaweicloud.dis.core.handler.AsyncHandler
        public void onError(Exception exc) throws Exception {
            this.asyncHandler.onError(exc);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.huaweicloud.dis.core.handler.AsyncHandler
        public void onSuccess(PutRecordsResult putRecordsResult) throws Exception {
            this.asyncHandler.onSuccess(putRecordsResult.getRecords().get(0));
        }
    }

    /* loaded from: input_file:com/huaweicloud/dis/producer/DISProducer$PutRecordsResultEntryFuture.class */
    private static class PutRecordsResultEntryFuture implements Future<PutRecordsResultEntry> {
        private Future<PutRecordsResult> future;

        public PutRecordsResultEntryFuture(Future<PutRecordsResult> future) {
            this.future = future;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public PutRecordsResultEntry get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            PutRecordsResult putRecordsResult = this.future.get(j, timeUnit);
            if (putRecordsResult == null || putRecordsResult.getRecords() == null || putRecordsResult.getRecords().isEmpty()) {
                return null;
            }
            return (PutRecordsResultEntry) putRecordsResult.getRecords().get(0);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public PutRecordsResultEntry get() throws InterruptedException, ExecutionException {
            PutRecordsResult putRecordsResult = this.future.get();
            if (putRecordsResult == null || putRecordsResult.getRecords() == null || putRecordsResult.getRecords().isEmpty()) {
                return null;
            }
            return (PutRecordsResultEntry) putRecordsResult.getRecords().get(0);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return this.future.cancel(z);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/huaweicloud/dis/producer/DISProducer$StreamInfo.class */
    public static class StreamInfo {
        private String streamName;
        private int partitions;
        private long syncTimestamp;

        private StreamInfo() {
        }

        public String getStreamName() {
            return this.streamName;
        }

        public void setStreamName(String str) {
            this.streamName = str;
        }

        public int getPartitions() {
            return this.partitions;
        }

        public void setPartitions(int i) {
            this.partitions = i;
        }

        public long getSyncTimestamp() {
            return this.syncTimestamp;
        }

        public void setSyncTimestamp(long j) {
            this.syncTimestamp = j;
        }
    }

    public DISProducer(DISConfig dISConfig) {
        this(dISConfig, new DefaultExecutorFactory(dISConfig.getMaxInFlightRequestsPerConnection()).newExecutor());
    }

    public DISProducer(DISConfig dISConfig, DISAsync dISAsync) {
        this(dISConfig, dISAsync, null);
    }

    public DISProducer(DISConfig dISConfig, ExecutorService executorService) {
        this(dISConfig, null, executorService);
    }

    private DISProducer(DISConfig dISConfig, DISAsync dISAsync, ExecutorService executorService) {
        this.maxBlockMs = 0L;
        this.lingerMs = 0L;
        this.metadata = new ConcurrentHashMap<>();
        this.onSyncStreams = new CopyOnWriteArrayList<>();
        DISConfig buildConfig = DISConfig.buildConfig(dISConfig);
        this.disConfig = buildConfig;
        this.lingerMs = buildConfig.getLingerMs();
        this.maxBlockMs = buildConfig.getMaxBlockMs();
        long batchSize = buildConfig.getBatchSize();
        int batchCount = buildConfig.getBatchCount();
        long bufferMemory = buildConfig.getBufferMemory();
        int bufferCount = buildConfig.getBufferCount();
        boolean isOrderByPartition = buildConfig.isOrderByPartition();
        this.orderByPartition = isOrderByPartition;
        this.metadataTimeoutMS = buildConfig.getMetadataTimeoutMs();
        if (dISAsync != null) {
            this.disAsync = dISAsync;
        } else {
            this.disAsync = new DISClientAsync(buildConfig, executorService);
        }
        this.accumulator = new RecordAccumulator(batchSize, batchCount, bufferMemory, bufferCount, this.lingerMs, isOrderByPartition);
        this.sender = new Sender(this.disAsync, this.accumulator, this.lingerMs);
        this.sender.start();
    }

    public Future<PutRecordsResultEntry> putRecordAsync(String str, PutRecordsRequestEntry putRecordsRequestEntry, AsyncHandler<PutRecordsResultEntry> asyncHandler) throws InterruptedException {
        PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(putRecordsRequestEntry);
        putRecordsRequest.setRecords(arrayList);
        return new PutRecordsResultEntryFuture(putRecordsAsync(putRecordsRequest, new PutRecordsResultAsyncHandler(asyncHandler)));
    }

    private StreamInfo fetchMetadata(final String str) {
        StreamInfo streamInfo = this.metadata.get(str);
        if (streamInfo == null) {
            DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
            describeStreamRequest.setStreamName(str);
            DescribeStreamResult describeStream = this.disAsync.describeStream(describeStreamRequest);
            streamInfo = new StreamInfo();
            streamInfo.setStreamName(str);
            streamInfo.setSyncTimestamp(System.currentTimeMillis());
            streamInfo.setPartitions(describeStream.getWritablePartitionCount());
            this.metadata.put(str, streamInfo);
        } else if (System.currentTimeMillis() - streamInfo.getSyncTimestamp() > this.metadataTimeoutMS && !this.onSyncStreams.contains(str)) {
            this.onSyncStreams.add(str);
            DescribeStreamRequest describeStreamRequest2 = new DescribeStreamRequest();
            describeStreamRequest2.setStreamName(str);
            this.disAsync.describeStreamAsync(describeStreamRequest2, new AsyncHandler<DescribeStreamResult>() { // from class: com.huaweicloud.dis.producer.DISProducer.1
                @Override // com.huaweicloud.dis.core.handler.AsyncHandler
                public void onError(Exception exc) {
                    DISProducer.log.error(exc.getMessage());
                    DISProducer.this.onSyncStreams.remove(str);
                }

                @Override // com.huaweicloud.dis.core.handler.AsyncHandler
                public void onSuccess(DescribeStreamResult describeStreamResult) {
                    StreamInfo streamInfo2 = new StreamInfo();
                    streamInfo2.setStreamName(str);
                    streamInfo2.setSyncTimestamp(System.currentTimeMillis());
                    streamInfo2.setPartitions(describeStreamResult.getWritablePartitionCount());
                    DISProducer.this.metadata.put(str, streamInfo2);
                    DISProducer.this.onSyncStreams.remove(str);
                }
            });
        }
        return streamInfo;
    }

    private int calPartitionId(StreamInfo streamInfo, PutRecordsRequestEntry putRecordsRequestEntry) {
        if (!StringUtils.isNullOrEmpty(putRecordsRequestEntry.getPartitionId())) {
            return PartitionKeyUtils.getPartitionNumberFromShardId(putRecordsRequestEntry.getPartitionId());
        }
        return PartitionKeyUtils.calPartitionIndex(streamInfo.getPartitions(), PartitionKeyUtils.getHashKey(putRecordsRequestEntry.getPartitionKey(), putRecordsRequestEntry.getExplicitHashKey()).longValue());
    }

    public Future<PutRecordsResult> putRecordsAsync(PutRecordsRequest putRecordsRequest, AsyncHandler<PutRecordsResult> asyncHandler) throws InterruptedException {
        String streamName = putRecordsRequest.getStreamName();
        String streamId = putRecordsRequest.getStreamId();
        String str = STABLE_PARTITION_ID;
        if (this.orderByPartition) {
            StreamInfo fetchMetadata = fetchMetadata(streamName);
            int i = -1;
            Iterator it = putRecordsRequest.getRecords().iterator();
            while (it.hasNext()) {
                int calPartitionId = calPartitionId(fetchMetadata, (PutRecordsRequestEntry) it.next());
                if (i != -1 && i != calPartitionId) {
                    throw new RuntimeException("one batch should in one partition when orderByPartition on.");
                }
                i = calPartitionId;
            }
            str = Integer.toString(i);
        }
        StreamPartition streamPartition = new StreamPartition(streamName, streamId, str);
        long currentTimeMillis = System.currentTimeMillis();
        log.trace("Sending records {} with callback {} to streampartition ", new Object[]{putRecordsRequest, asyncHandler, streamPartition});
        RecordAccumulator.RecordAppendResult append = this.accumulator.append(streamPartition, currentTimeMillis, putRecordsRequest, asyncHandler, this.maxBlockMs);
        if (append.batchIsFull || this.lingerMs == 0) {
            log.trace("Waking up the sender since topic partition {} is either full or getting a new batch", streamPartition);
            this.sender.wakeup();
        }
        return append.future;
    }

    public void flush() {
        this.sender.flush();
    }

    public void close() {
        close(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME, TimeUnit.MILLISECONDS);
    }

    public void close(long j, TimeUnit timeUnit) {
        if (j < 0) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        log.debug("Closing the DIS producer with timeoutMillis = {} ms.", Long.valueOf(timeUnit.toMillis(j)));
        this.accumulator.close();
        this.sender.close(timeUnit.toMillis(j));
        this.disAsync.close();
        log.debug("The DIS producer has closed.");
    }
}
