package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnTerminate;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

@Experimental("Experimental API")
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/KafkaCompanion.class */
public class KafkaCompanion implements AutoCloseable {
    private final Map<Class<?>, Serde<?>> serdeMap;
    private final Map<String, Object> commonClientConfig;
    private final String bootstrapServers;
    private final Duration kafkaApiTimeout;
    private AdminClient adminClient;
    private final List<Runnable> onClose;

    public KafkaCompanion(String str) {
        this(str, Duration.ofSeconds(10L));
    }

    public KafkaCompanion(String str, Duration duration) {
        this.serdeMap = new HashMap();
        this.commonClientConfig = new HashMap();
        this.onClose = new CopyOnWriteArrayList();
        this.bootstrapServers = str;
        this.kafkaApiTimeout = duration;
    }

    public Duration getKafkaApiTimeout() {
        return this.kafkaApiTimeout;
    }

    public Map<String, Object> getCommonClientConfig() {
        return this.commonClientConfig;
    }

    public void setCommonClientConfig(Map<String, Object> map) {
        this.commonClientConfig.putAll(map);
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public synchronized AdminClient getOrCreateAdminClient() {
        if (this.adminClient == null) {
            HashMap hashMap = new HashMap(getCommonClientConfig());
            hashMap.put("bootstrap.servers", getBootstrapServers());
            hashMap.put("metadata.max.age.ms", 1000);
            hashMap.put("client.id", "companion-admin-for-" + getBootstrapServers());
            this.adminClient = AdminClient.create(hashMap);
            registerOnClose(() -> {
                this.adminClient.close(this.kafkaApiTimeout);
            });
        }
        return this.adminClient;
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        Iterator it = new ArrayList(this.onClose).iterator();
        while (it.hasNext()) {
            ((Runnable) it.next()).run();
        }
    }

    public static void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static String getHeader(Headers headers, String str) {
        return new String(headers.lastHeader(str).value(), StandardCharsets.UTF_8);
    }

    public static TopicPartition tp(String str, int i) {
        return new TopicPartition(str, i);
    }

    public static <K, V> ProducerRecord<K, V> record(String str, V v) {
        return new ProducerRecord<>(str, v);
    }

    public static <K, V> ProducerRecord<K, V> record(String str, K k, V v) {
        return new ProducerRecord<>(str, k, v);
    }

    public static <K, V> ProducerRecord<K, V> record(String str, Integer num, K k, V v) {
        return new ProducerRecord<>(str, num, k, v);
    }

    public static <T> Uni<T> waitFor(Uni<T> uni, Predicate<T> predicate, Duration duration) {
        return uni.repeat().withDelay(duration).whilst(obj -> {
            return predicate.negate().test(obj);
        }).select().last().toUni();
    }

    public static Uni<Void> waitFor(BooleanSupplier booleanSupplier, Duration duration) {
        return Uni.createFrom().voidItem().repeat().withDelay(duration).whilst(r3 -> {
            return !booleanSupplier.getAsBoolean();
        }).select().last().toUni();
    }

    @Deprecated(forRemoval = true)
    protected static <T> Uni<T> toUni(KafkaFuture<T> kafkaFuture) {
        return Uni.createFrom().completionStage(kafkaFuture.toCompletionStage());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> Uni<T> toUni(Supplier<KafkaFuture<T>> supplier) {
        return Uni.createFrom().completionStage(() -> {
            return ((KafkaFuture) supplier.get()).toCompletionStage();
        });
    }

    public <T> void registerSerde(Class<T> cls, Serde<T> serde) {
        this.serdeMap.put(cls, serde);
    }

    public <T> void registerSerde(Class<T> cls, Serializer<T> serializer, Deserializer<T> deserializer) {
        registerSerde(cls, Serdes.serdeFrom(serializer, deserializer));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> Serde<T> getSerdeForType(Class<T> cls) {
        Serde<?> serde = this.serdeMap.get(cls);
        return serde != null ? serde : Serdes.serdeFrom(cls);
    }

    public TopicsCompanion topics() {
        return new TopicsCompanion(getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public OffsetsCompanion offsets() {
        return new OffsetsCompanion(getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public ConsumerGroupsCompanion consumerGroups() {
        return new ConsumerGroupsCompanion(getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public ClusterCompanion cluster() {
        return new ClusterCompanion(getOrCreateAdminClient(), this.kafkaApiTimeout);
    }

    public void deleteRecords(Map<TopicPartition, RecordsToDelete> map) {
        toUni(() -> {
            return getOrCreateAdminClient().deleteRecords(map).all();
        }).await().atMost(this.kafkaApiTimeout);
    }

    public void deleteRecords(TopicPartition topicPartition, Long l) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, RecordsToDelete.beforeOffset(l.longValue()));
        deleteRecords(hashMap);
    }

    public Map<String, Object> getConsumerProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServers);
        hashMap.put("group.id", "companion-" + String.valueOf(UUID.randomUUID()));
        hashMap.put("client.id", "companion-" + String.valueOf(UUID.randomUUID()));
        hashMap.put("enable.auto.commit", Boolean.FALSE.toString());
        hashMap.put("auto.offset.reset", OffsetResetStrategy.EARLIEST.toString().toLowerCase());
        hashMap.putAll(getCommonClientConfig());
        return hashMap;
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Class<? extends Deserializer<?>> cls) {
        return consumeWithDeserializers(cls.getName());
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Class<? extends Deserializer<?>> cls, Class<? extends Deserializer<?>> cls2) {
        return consumeWithDeserializers(cls.getName(), cls2.getName());
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(String str) {
        return consumeWithDeserializers(StringDeserializer.class.getName(), str);
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(String str, String str2) {
        ConsumerBuilder<K, V> consumerBuilder = new ConsumerBuilder<>(getConsumerProperties(), this.kafkaApiTimeout, str, str2);
        Objects.requireNonNull(consumerBuilder);
        registerOnClose(consumerBuilder::close);
        return consumerBuilder;
    }

    public <K, V> ConsumerBuilder<K, V> consumeWithDeserializers(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        ConsumerBuilder<K, V> consumerBuilder = new ConsumerBuilder<>(getConsumerProperties(), this.kafkaApiTimeout, deserializer, deserializer2);
        Objects.requireNonNull(consumerBuilder);
        registerOnClose(consumerBuilder::close);
        return consumerBuilder;
    }

    public <K, V> ConsumerBuilder<K, V> consume(Serde<K> serde, Serde<V> serde2) {
        return consumeWithDeserializers(serde.deserializer(), serde2.deserializer());
    }

    public <K, V> ConsumerBuilder<K, V> consume(Class<K> cls, Class<V> cls2) {
        return consume(getSerdeForType(cls), getSerdeForType(cls2));
    }

    public <V> ConsumerBuilder<String, V> consume(Class<V> cls) {
        return consume(Serdes.String(), getSerdeForType(cls));
    }

    public ConsumerBuilder<String, String> consumeStrings() {
        return consume(Serdes.String(), Serdes.String());
    }

    public ConsumerBuilder<String, Integer> consumeIntegers() {
        return consume(Serdes.String(), Serdes.Integer());
    }

    public ConsumerBuilder<String, Double> consumeDoubles() {
        return consume(Serdes.String(), Serdes.Double());
    }

    public Map<String, Object> getProducerProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.bootstrapServers);
        hashMap.put("client.id", "companion-" + String.valueOf(UUID.randomUUID()));
        hashMap.putAll(getCommonClientConfig());
        return hashMap;
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Class<? extends Serializer<?>> cls, Class<? extends Serializer<?>> cls2) {
        return produceWithSerializers(cls.getName(), cls2.getName());
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Class<? extends Serializer<?>> cls) {
        return produceWithSerializers(cls.getName());
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(String str) {
        return produceWithSerializers(StringSerializer.class.getName(), str);
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(String str, String str2) {
        ProducerBuilder<K, V> producerBuilder = new ProducerBuilder<>(getProducerProperties(), this.kafkaApiTimeout, str, str2);
        Objects.requireNonNull(producerBuilder);
        registerOnClose(producerBuilder::close);
        return producerBuilder;
    }

    public <K, V> ProducerBuilder<K, V> produceWithSerializers(Serializer<K> serializer, Serializer<V> serializer2) {
        ProducerBuilder<K, V> producerBuilder = new ProducerBuilder<>(getProducerProperties(), this.kafkaApiTimeout, serializer, serializer2);
        Objects.requireNonNull(producerBuilder);
        registerOnClose(producerBuilder::close);
        return producerBuilder;
    }

    public <K, V> ProducerBuilder<K, V> produce(Serde<K> serde, Serde<V> serde2) {
        ProducerBuilder<K, V> producerBuilder = new ProducerBuilder<>(getProducerProperties(), this.kafkaApiTimeout, serde, serde2);
        Objects.requireNonNull(producerBuilder);
        registerOnClose(producerBuilder::close);
        return producerBuilder;
    }

    public <K, V> ProducerBuilder<K, V> produce(Class<K> cls, Class<V> cls2) {
        return produce(getSerdeForType(cls), getSerdeForType(cls2));
    }

    public <V> ProducerBuilder<String, V> produce(Class<V> cls) {
        return produce(Serdes.String(), getSerdeForType(cls));
    }

    public ProducerBuilder<String, String> produceStrings() {
        return produce(Serdes.String(), Serdes.String());
    }

    public ProducerBuilder<String, Integer> produceIntegers() {
        return produce(Serdes.String(), Serdes.Integer());
    }

    public ProducerBuilder<String, Double> produceDoubles() {
        return produce(Serdes.String(), Serdes.Double());
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <K, C, P> ProducerTask process(Set<String> set, ConsumerBuilder<K, C> consumerBuilder, ProducerBuilder<K, P> producerBuilder, Function<ConsumerRecord<K, C>, ProducerRecord<K, P>> function) {
        return producerBuilder.fromMulti(consumerBuilder.process(set, multi -> {
            return multi.onItem().transform(function);
        }));
    }

    public <K, C, P> ProducerTask processTransactional(Set<String> set, ConsumerBuilder<K, C> consumerBuilder, ProducerBuilder<K, P> producerBuilder, Function<ConsumerRecord<K, C>, ProducerRecord<K, P>> function) {
        if (!producerBuilder.isTransactional()) {
            throw new IllegalStateException("producer must be transactional");
        }
        ProducerBuilder<K, P> withOnTermination = producerBuilder.withOnTermination((kafkaProducer, th) -> {
            if (th != null) {
                kafkaProducer.abortTransaction();
                consumerBuilder.resetToLastCommittedPositions();
                return;
            }
            try {
                kafkaProducer.sendOffsetsToTransaction(consumerBuilder.position(), consumerBuilder.unwrap().groupMetadata());
                kafkaProducer.commitTransaction();
            } catch (Throwable th) {
                kafkaProducer.abortTransaction();
                consumerBuilder.resetToLastCommittedPositions();
            }
        });
        MultiOnTerminate onTermination = consumerBuilder.processBatch(set, consumerRecords -> {
            return consumerRecords.isEmpty() ? Multi.createFrom().empty() : withOnTermination.getProduceMulti(Multi.createFrom().iterable(consumerRecords).onItem().transform(function));
        }).onTermination();
        Objects.requireNonNull(withOnTermination);
        return new ProducerTask(onTermination.invoke(withOnTermination::close));
    }

    private <K, V> void registerOnClose(Runnable runnable) {
        this.onClose.add(() -> {
            try {
                runnable.run();
            } catch (Exception e) {
            }
        });
    }
}
