package cern.nxcals.api.ingestion;

import cern.nxcals.api.domain.RecordKey;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.4.45.jar:cern/nxcals/api/ingestion/KafkaDataSink.class */
class KafkaDataSink implements DataSink<RecordData, DefaultCallback> {
    private static final int BUCKET_SIZE = 5;
    private static final String RECORD_TOO_BIG_MSG_PATTERN = "Record for entity={0} with id={1,number,#} too big, size={2,number,#}";
    private final int maxRecordSize;
    private final Producer<byte[], byte[]> kafkaProducer;
    private final Function<RecordData, byte[]> serializer;
    private final String topicName;
    private final PartitionInfo[] sortedPartitions;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaDataSink.class);
    private static final Result DUMMY_RESULT = new Result() { // from class: cern.nxcals.api.ingestion.KafkaDataSink.1
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaDataSink(String str, Producer<byte[], byte[]> producer, Function<RecordData, byte[]> function, int i) {
        this.topicName = (String) Objects.requireNonNull(str);
        this.kafkaProducer = (Producer) Objects.requireNonNull(producer);
        this.serializer = (Function) Objects.requireNonNull(function);
        this.sortedPartitions = getSortedPartitions(producer, str);
        this.maxRecordSize = i;
    }

    private static PartitionInfo[] getSortedPartitions(Producer<byte[], byte[]> producer, String str) {
        return (PartitionInfo[]) ((List) producer.partitionsFor(str).stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.partition();
        })).collect(Collectors.toList())).toArray(new PartitionInfo[0]);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.kafkaProducer.close();
    }

    @Override // cern.nxcals.api.ingestion.DataSink
    public void send(RecordData recordData, DefaultCallback defaultCallback) {
        Objects.requireNonNull(defaultCallback);
        try {
            byte[] keyBytes = toKeyBytes(recordData);
            byte[] apply = this.serializer.apply(recordData);
            if (apply.length > this.maxRecordSize) {
                defaultCallback.accept(null, new RecordTooBigRuntimeException(MessageFormat.format(RECORD_TOO_BIG_MSG_PATTERN, recordData.getEntityKeyValues(), Long.valueOf(recordData.getEntityId()), Integer.valueOf(apply.length))));
            } else {
                sendToKafka(recordData.getEntityId(), keyBytes, apply, getPartitionNumber(recordData), (recordMetadata, exc) -> {
                    defaultCallback.accept(DUMMY_RESULT, exc);
                });
            }
        } catch (Exception e) {
            defaultCallback.accept(null, e);
        }
    }

    private byte[] toKeyBytes(RecordData recordData) {
        return RecordKey.serialize(recordData.getSystemId(), recordData.getPartitionId(), recordData.getSchemaId(), recordData.getEntityId(), recordData.getTimestamp());
    }

    private Integer getPartitionNumber(RecordData recordData) {
        return Integer.valueOf(this.sortedPartitions[(int) (((recordData.getPartitionId() % Math.max(1, this.sortedPartitions.length / 5)) * 5) + (recordData.getEntityId() % 5))].partition());
    }

    private void sendToKafka(long j, byte[] bArr, byte[] bArr2, Integer num, BiConsumer<RecordMetadata, Exception> biConsumer) {
        log.trace("Sending to kafka entityId={}, size={}, kafkaPartition={}", Long.valueOf(j), Integer.valueOf(bArr2.length), num);
        Producer<byte[], byte[]> producer = this.kafkaProducer;
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.topicName, num, bArr, bArr2);
        biConsumer.getClass();
        producer.send(producerRecord, (v1, v2) -> {
            r2.accept(v1, v2);
        });
    }
}
