package cern.nxcals.client;

import cern.nxcals.common.domain.EntityData;
import com.google.common.base.Charsets;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-client-0.1.106.jar:cern/nxcals/client/KafkaDataSink.class */
class KafkaDataSink implements DataSink<RecordData, DefaultCallback> {
    private static final String RECORD_TOO_BIG_MSG_PATTERN = "Record for entity={0} with id={1,number,#} too big, size={2,number,#}";
    private final int maxRecordSize;
    private final Producer<byte[], byte[]> kafkaProducer;
    private final Function<RecordData, byte[]> serializer;
    private final String topicName;
    private final PartitionInfo[] partitions;
    private static final Logger log = LoggerFactory.getLogger(KafkaDataSink.class);
    private static final Result DUMMY_RESULT = new Result() { // from class: cern.nxcals.client.KafkaDataSink.1
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaDataSink(String str, Producer<byte[], byte[]> producer, Function<RecordData, byte[]> function, int i) {
        this.topicName = (String) Objects.requireNonNull(str);
        this.kafkaProducer = (Producer) Objects.requireNonNull(producer);
        this.serializer = (Function) Objects.requireNonNull(function);
        this.partitions = getPartitions(producer, str);
        this.maxRecordSize = i;
    }

    private static PartitionInfo[] getPartitions(Producer<byte[], byte[]> producer, String str) {
        return (PartitionInfo[]) producer.partitionsFor(str).toArray(new PartitionInfo[0]);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.kafkaProducer.close();
    }

    @Override // cern.nxcals.client.DataSink
    public void send(RecordData recordData, DefaultCallback defaultCallback) {
        Objects.requireNonNull(defaultCallback);
        try {
            byte[] apply = this.serializer.apply(recordData);
            EntityData entityData = recordData.getEntityData();
            if (apply.length > this.maxRecordSize) {
                defaultCallback.accept(null, new RecordTooBigRuntimeException(MessageFormat.format(RECORD_TOO_BIG_MSG_PATTERN, entityData.getEntityKeyValues(), Long.valueOf(entityData.getId()), Integer.valueOf(apply.length))));
            } else {
                sendToKafka(Long.valueOf(recordData.getEntityData().getId()), apply, getPartitionNumber(recordData), (recordMetadata, exc) -> {
                    defaultCallback.accept(DUMMY_RESULT, exc);
                });
            }
        } catch (Exception e) {
            defaultCallback.accept(null, e);
        }
    }

    private Integer getPartitionNumber(RecordData recordData) {
        return Integer.valueOf(this.partitions[(int) ((recordData.getEntityData().getPartitionData().getId() + recordData.getEntityData().getSchemaData().getId()) % this.partitions.length)].partition());
    }

    private void sendToKafka(Long l, byte[] bArr, Integer num, BiConsumer<RecordMetadata, Exception> biConsumer) {
        log.trace("Sending to kafka entityId={}, size={}, kafkaPartition={}", l, Integer.valueOf(bArr.length), num);
        Producer<byte[], byte[]> producer = this.kafkaProducer;
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.topicName, num, l.toString().getBytes(Charsets.UTF_8), bArr);
        biConsumer.getClass();
        producer.send(producerRecord, (v1, v2) -> {
            r2.accept(v1, v2);
        });
    }
}
