package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/companion/ConsumerBuilder.class */
public class ConsumerBuilder<K, V> implements ConsumerRebalanceListener, Closeable {
    private static final Logger LOGGER = Logger.getLogger(ConsumerBuilder.class);
    private final Map<String, Object> props;
    private final Function<Map<String, Object>, KafkaConsumer<K, V>> consumerCreator;
    private KafkaConsumer<K, V> kafkaConsumer;
    private ScheduledExecutorService consumerExecutor;
    private Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitAsyncFunction;
    private Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> commitSyncFunction;
    private OffsetCommitCallback commitAsyncCallback;
    private Consumer<Collection<TopicPartition>> onPartitionsAssigned;
    private Consumer<Collection<TopicPartition>> onPartitionsRevoked;
    private final Duration kafkaApiTimeout;
    private BiConsumer<KafkaConsumer<K, V>, Throwable> onTermination = this::terminate;
    private Duration pollTimeout = Duration.ofMillis(10);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private final Set<TopicPartition> assignment = new HashSet();

    public ConsumerBuilder(Map<String, Object> map, Duration duration, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.props = map;
        this.kafkaApiTimeout = duration;
        this.consumerCreator = map2 -> {
            return new KafkaConsumer(map2, deserializer, deserializer2);
        };
    }

    public ConsumerBuilder(Map<String, Object> map, Duration duration, String str, String str2) {
        this.props = map;
        this.kafkaApiTimeout = duration;
        map.put("key.deserializer", str);
        map.put("value.deserializer", str2);
        this.consumerCreator = KafkaConsumer::new;
    }

    private synchronized KafkaConsumer<K, V> getOrCreateConsumer() {
        if (this.kafkaConsumer == null) {
            this.kafkaConsumer = this.consumerCreator.apply(this.props);
        }
        return this.kafkaConsumer;
    }

    private synchronized ScheduledExecutorService getOrCreateExecutor() {
        if (this.consumerExecutor == null) {
            this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
                return new Thread(runnable, "consumer-" + clientId());
            });
        }
        return this.consumerExecutor;
    }

    public KafkaConsumer<K, V> unwrap() {
        return this.kafkaConsumer;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        LOGGER.infof("%s revoked partitions %s", clientId(), collection);
        this.assignment.removeAll(collection);
        if (this.onPartitionsRevoked != null) {
            this.onPartitionsRevoked.accept(collection);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        LOGGER.infof("%s assigned partitions %s", clientId(), collection);
        this.assignment.addAll(collection);
        if (this.onPartitionsAssigned != null) {
            this.onPartitionsAssigned.accept(collection);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
            Uni.createFrom().voidItem().invoke(() -> {
                LOGGER.infof("Closing consumer %s", clientId());
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.close(this.kafkaApiTimeout);
                    this.kafkaConsumer = null;
                }
                if (this.consumerExecutor != null) {
                    this.consumerExecutor.shutdown();
                    this.consumerExecutor = null;
                }
                this.polling.compareAndSet(true, false);
            }).runSubscriptionOn(getOrCreateExecutor()).subscribeAsCompletionStage();
        }
    }

    public void terminate(KafkaConsumer<K, V> kafkaConsumer, Throwable th) {
        close();
    }

    public String clientId() {
        return (String) this.props.get("client.id");
    }

    public String groupId() {
        return (String) this.props.get("group.id");
    }

    public ConsumerBuilder<K, V> withProp(String str, Object obj) {
        this.props.put(str, obj);
        return this;
    }

    public ConsumerBuilder<K, V> withProps(Map<String, Object> map) {
        this.props.putAll(map);
        return this;
    }

    public ConsumerBuilder<K, V> withClientId(String str) {
        return withProp("client.id", str);
    }

    public ConsumerBuilder<K, V> withGroupId(String str) {
        return withProp("group.id", str);
    }

    public ConsumerBuilder<K, V> withOffsetReset(OffsetResetStrategy offsetResetStrategy) {
        return withProp("auto.offset.reset", offsetResetStrategy.name().toLowerCase());
    }

    public ConsumerBuilder<K, V> withIsolationLevel(IsolationLevel isolationLevel) {
        return withProp("isolation.level", isolationLevel.name().toLowerCase());
    }

    public ConsumerBuilder<K, V> withPollTimeout(Duration duration) {
        this.pollTimeout = duration;
        return this;
    }

    public ConsumerBuilder<K, V> withAutoCommit() {
        withProp("enable.auto.commit", "true");
        return this;
    }

    public ConsumerBuilder<K, V> withCommitAsyncWhen(Predicate<ConsumerRecord<K, V>> predicate) {
        return withCommitAsync(consumerRecord -> {
            return predicate.test(consumerRecord) ? getOffsetMapFromConsumerRecord(consumerRecord) : Collections.emptyMap();
        }, (map, exc) -> {
        });
    }

    public ConsumerBuilder<K, V> withCommitAsync(Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> function, OffsetCommitCallback offsetCommitCallback) {
        this.commitAsyncFunction = function;
        this.commitAsyncCallback = offsetCommitCallback;
        return this;
    }

    public ConsumerBuilder<K, V> withCommitSyncWhen(Predicate<ConsumerRecord<K, V>> predicate) {
        this.commitSyncFunction = consumerRecord -> {
            return predicate.test(consumerRecord) ? getOffsetMapFromConsumerRecord(consumerRecord) : Collections.emptyMap();
        };
        return this;
    }

    public ConsumerBuilder<K, V> withCommitSync(Function<ConsumerRecord<K, V>, Map<TopicPartition, OffsetAndMetadata>> function) {
        this.commitSyncFunction = function;
        return this;
    }

    public ConsumerBuilder<K, V> withOnPartitionsAssigned(Consumer<Collection<TopicPartition>> consumer) {
        this.onPartitionsAssigned = consumer;
        return this;
    }

    public ConsumerBuilder<K, V> withOnPartitionsRevoked(Consumer<Collection<TopicPartition>> consumer) {
        this.onPartitionsRevoked = consumer;
        return this;
    }

    public ConsumerBuilder<K, V> withOnTermination(BiConsumer<KafkaConsumer<K, V>, Throwable> biConsumer) {
        this.onTermination = biConsumer;
        return this;
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetMapFromConsumerRecord(ConsumerRecord<?, ?> consumerRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
        return hashMap;
    }

    public ConsumerGroupMetadata groupMetadata() {
        return getOrCreateConsumer().groupMetadata();
    }

    public Set<TopicPartition> currentAssignment() {
        return Collections.unmodifiableSet(this.assignment);
    }

    private Uni<Set<TopicPartition>> assignmentUni() {
        return Uni.createFrom().item(() -> {
            return getOrCreateConsumer().assignment();
        }).runSubscriptionOn(getOrCreateExecutor());
    }

    public Set<TopicPartition> assignment() {
        return (Set) Uni.createFrom().item(() -> {
            return getOrCreateConsumer().assignment();
        }).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Uni<Set<TopicPartition>> waitForAssignment() {
        return KafkaCompanion.waitFor(assignmentUni(), set -> {
            return (set == null || set.isEmpty()) ? false : true;
        }, this.pollTimeout);
    }

    public long position(TopicPartition topicPartition) {
        return ((Long) Uni.createFrom().item(() -> {
            return Long.valueOf(getOrCreateConsumer().position(topicPartition));
        }).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout)).longValue();
    }

    public Map<TopicPartition, OffsetAndMetadata> position() {
        return (Map) assignmentUni().onItem().transform(set -> {
            KafkaConsumer<K, V> orCreateConsumer = getOrCreateConsumer();
            return (Map) set.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return new OffsetAndMetadata(orCreateConsumer.position(topicPartition));
            }));
        }).await().atMost(this.kafkaApiTimeout);
    }

    public void resetToLastCommittedPositions() {
        Map<TopicPartition, OffsetAndMetadata> committed = committed();
        for (TopicPartition topicPartition : assignment()) {
            OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
            if (offsetAndMetadata != null) {
                getOrCreateConsumer().seek(topicPartition, offsetAndMetadata.offset());
            } else {
                getOrCreateConsumer().seekToBeginning(Collections.singleton(topicPartition));
            }
        }
    }

    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return (OffsetAndMetadata) Uni.createFrom().item(() -> {
            return getOrCreateConsumer().committed(Collections.singleton(topicPartition));
        }).onItem().transform(map -> {
            return (OffsetAndMetadata) map.get(topicPartition);
        }).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(TopicPartition... topicPartitionArr) {
        return (Map) Uni.createFrom().item(() -> {
            return getOrCreateConsumer().committed(new HashSet(Arrays.asList(topicPartitionArr)));
        }).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed() {
        return (Map) assignmentUni().onItem().transform(set -> {
            return getOrCreateConsumer().committed(set);
        }).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public void pause() {
        assignmentUni().onItem().invoke(set -> {
            getOrCreateConsumer().pause(set);
        }).replaceWithVoid().await().indefinitely();
    }

    public void resume() {
        assignmentUni().onItem().invoke(set -> {
            getOrCreateConsumer().resume(set);
        }).replaceWithVoid().await().atMost(this.kafkaApiTimeout);
    }

    private Uni<ConsumerRecords<K, V>> poll() {
        return Uni.createFrom().item(() -> {
            try {
                return getOrCreateConsumer().poll(this.pollTimeout);
            } catch (WakeupException e) {
                return ConsumerRecords.EMPTY;
            }
        }).onItem().transformToUni(consumerRecords -> {
            return consumerRecords.isEmpty() ? Uni.createFrom().item(consumerRecords).onItem().delayIt().onExecutor(getOrCreateExecutor()).by(Duration.ofMillis(2L)) : Uni.createFrom().item(consumerRecords);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Multi<T> process(Set<String> set, Function<Multi<ConsumerRecord<K, V>>, Multi<T>> function) {
        return Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
            }
            getOrCreateConsumer().subscribe(set, this);
            return getConsumeMulti().plug(function);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Multi<T> processBatch(Set<String> set, Function<ConsumerRecords<K, V>, Multi<T>> function) {
        return Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
            }
            getOrCreateConsumer().subscribe(set, this);
            return getProcessingMulti(function);
        });
    }

    private <T> Multi<T> getProcessingMulti(Function<ConsumerRecords<K, V>, Multi<T>> function) {
        return poll().repeat().indefinitely().onItem().transformToMulti(function).concatenate().runSubscriptionOn(getOrCreateExecutor()).onTermination().invoke((th, bool) -> {
            this.onTermination.accept(this.kafkaConsumer, th);
        });
    }

    private Multi<ConsumerRecord<K, V>> getConsumeMulti() {
        return poll().repeat().indefinitely().onItem().transformToMulti(consumerRecords -> {
            return Multi.createFrom().items(StreamSupport.stream(consumerRecords.spliterator(), false));
        }).concatenate().runSubscriptionOn(getOrCreateExecutor()).plug(multi -> {
            Multi multi = multi;
            if (this.commitAsyncFunction != null) {
                multi = multi.invoke(consumerRecord -> {
                    Map<TopicPartition, OffsetAndMetadata> apply = this.commitAsyncFunction.apply(consumerRecord);
                    if (apply.isEmpty()) {
                        return;
                    }
                    getOrCreateConsumer().commitAsync(apply, this.commitAsyncCallback);
                });
            }
            if (this.commitSyncFunction != null) {
                multi = multi.onItem().transformToUniAndConcatenate(consumerRecord2 -> {
                    Map<TopicPartition, OffsetAndMetadata> apply = this.commitSyncFunction.apply(consumerRecord2);
                    return !apply.isEmpty() ? Uni.createFrom().item(consumerRecord2).invoke(() -> {
                        getOrCreateConsumer().commitSync(apply);
                    }) : Uni.createFrom().item(consumerRecord2);
                });
            }
            return multi.onTermination().invoke((th, bool) -> {
                this.onTermination.accept(this.kafkaConsumer, th);
            });
        });
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> map, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> function) {
        return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
            }
            getOrCreateConsumer().unsubscribe();
            getOrCreateConsumer().assign(map.keySet());
            for (Map.Entry<K, V> entry : map.entrySet()) {
                TopicPartition topicPartition = (TopicPartition) entry.getKey();
                long longValue = ((Long) entry.getValue()).longValue();
                if (longValue > -1) {
                    getOrCreateConsumer().seek(topicPartition, longValue);
                } else {
                    getOrCreateConsumer().seekToEnd(Collections.singleton(topicPartition));
                    getOrCreateConsumer().seek(topicPartition, getOrCreateConsumer().position(topicPartition) + longValue);
                }
            }
            return getConsumeMulti().plug(function);
        }));
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> map) {
        return fromOffsets(map, Function.identity());
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> map, Duration duration) {
        return fromOffsets(map, RecordQualifiers.until(duration));
    }

    public ConsumerTask<K, V> fromOffsets(Map<TopicPartition, Long> map, long j) {
        return fromOffsets(map, RecordQualifiers.until(Long.valueOf(j)));
    }

    public ConsumerTask<K, V> fromTopics(Set<String> set, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> function) {
        return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
            }
            getOrCreateConsumer().subscribe(set, this);
            return getConsumeMulti().plug(function);
        }));
    }

    public ConsumerTask<K, V> fromTopics(Pattern pattern, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> function) {
        return new ConsumerTask<>(Multi.createFrom().deferred(() -> {
            if (!this.polling.compareAndSet(false, true)) {
                return Multi.createFrom().failure(new IllegalStateException("Consumer already in use"));
            }
            getOrCreateConsumer().subscribe(pattern, this);
            return getConsumeMulti().plug(function);
        }));
    }

    public ConsumerTask<K, V> fromTopics(String str, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> function) {
        return fromTopics(Collections.singleton(str), function);
    }

    public ConsumerTask<K, V> fromTopics(String... strArr) {
        return fromTopics(new HashSet(Arrays.asList(strArr)));
    }

    public ConsumerTask<K, V> fromTopics(Pattern pattern) {
        return fromTopics(pattern, Function.identity());
    }

    public ConsumerTask<K, V> fromTopics(Set<String> set) {
        return fromTopics(set, Function.identity());
    }

    public ConsumerTask<K, V> fromTopics(Set<String> set, long j) {
        return fromTopics(set, RecordQualifiers.until(Long.valueOf(j)));
    }

    public ConsumerTask<K, V> fromTopics(Pattern pattern, long j) {
        return fromTopics(pattern, RecordQualifiers.until(Long.valueOf(j)));
    }

    public ConsumerTask<K, V> fromTopics(Set<String> set, Duration duration) {
        return fromTopics(set, RecordQualifiers.until(duration));
    }

    public ConsumerTask<K, V> fromTopics(Pattern pattern, Duration duration) {
        return fromTopics(pattern, RecordQualifiers.until(duration));
    }

    public ConsumerTask<K, V> fromTopics(String str, long j) {
        return fromTopics(Collections.singleton(str), RecordQualifiers.until(Long.valueOf(j)));
    }

    public ConsumerTask<K, V> fromTopics(String str, Duration duration) {
        return fromTopics(Collections.singleton(str), RecordQualifiers.until(duration));
    }

    public ConsumerTask<K, V> fromTopics(String str, long j, Duration duration) {
        return fromTopics(Collections.singleton(str), RecordQualifiers.until(Long.valueOf(j), duration, null));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> kafkaTask, Function<Multi<ConsumerRecord<K, V>>, Multi<ConsumerRecord<K, V>>> function) {
        return fromOffsets((Map<TopicPartition, Long>) kafkaTask.latestOffsets().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((Long) entry.getValue()).longValue() + 1);
        })), function);
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> kafkaTask) {
        return fromPrevious(kafkaTask, Function.identity());
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> kafkaTask, long j) {
        return fromPrevious(kafkaTask, RecordQualifiers.until(Long.valueOf(j)));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> kafkaTask, Duration duration) {
        return fromPrevious(kafkaTask, RecordQualifiers.until(duration));
    }

    public ConsumerTask<K, V> fromPrevious(KafkaTask<?, ?> kafkaTask, long j, Duration duration) {
        return fromPrevious(kafkaTask, RecordQualifiers.until(Long.valueOf(j), duration, null));
    }

    public void commitAndClose(Map<TopicPartition, OffsetAndMetadata> map) {
        Uni.createFrom().voidItem().onItem().invoke(() -> {
            getOrCreateConsumer().commitSync(map);
        }).onItem().invoke(this::close).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }

    public void commitAndClose() {
        Uni.createFrom().voidItem().onItem().invoke(() -> {
            getOrCreateConsumer().commitSync();
        }).onItem().invoke(this::close).runSubscriptionOn(getOrCreateExecutor()).await().atMost(this.kafkaApiTimeout);
    }
}
