package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:io/micronaut/configuration/kafka/processor/ConsumerState.class */
public abstract class ConsumerState {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    protected final KafkaConsumerProcessor kafkaConsumerProcessor;
    protected final Object consumerBean;

    @Nullable
    protected final Map<TopicPartition, PartitionRetryState> topicPartitionRetries;
    protected final Map<Argument<?>, Object> boundArguments;
    protected boolean failed;
    final ConsumerInfo info;
    final Consumer<?, ?> kafkaConsumer;
    final Set<String> subscriptions;
    Set<TopicPartition> assignments;
    private Set<TopicPartition> pausedTopicPartitions;
    private Set<TopicPartition> pauseRequests;
    private boolean autoPaused;
    private boolean pollingStarted;
    private volatile ConsumerCloseState closedState;

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerState(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo consumerInfo, Consumer<?, ?> consumer, Object obj) {
        this.kafkaConsumerProcessor = kafkaConsumerProcessor;
        this.info = consumerInfo;
        this.kafkaConsumer = consumer;
        this.consumerBean = obj;
        this.subscriptions = Collections.unmodifiableSet(this.kafkaConsumer.subscription());
        this.autoPaused = !consumerInfo.autoStartup;
        this.boundArguments = new HashMap(2);
        Optional.ofNullable(consumerInfo.consumerArg).ifPresent(argument -> {
            this.boundArguments.put(argument, this.kafkaConsumer);
        });
        this.closedState = ConsumerCloseState.NOT_STARTED;
        this.topicPartitionRetries = this.info.errorStrategy.isRetry() ? new HashMap() : null;
    }

    protected abstract ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> map);

    protected abstract void processRecords(ConsumerRecords<?, ?> consumerRecords, Map<TopicPartition, OffsetAndMetadata> map);

    @Nullable
    protected abstract Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets();

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        pause(this.assignments);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void pause(@NonNull Collection<TopicPartition> collection) {
        if (this.pauseRequests == null) {
            this.pauseRequests = new HashSet();
        }
        this.pauseRequests.addAll(collection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void resume() {
        this.autoPaused = false;
        this.pauseRequests = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void resume(@NonNull Collection<TopicPartition> collection) {
        this.autoPaused = false;
        if (this.pauseRequests != null) {
            this.pauseRequests.removeAll(collection);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isPaused(@NonNull Collection<TopicPartition> collection) {
        return this.pauseRequests != null && this.pausedTopicPartitions != null && this.pauseRequests.containsAll(collection) && this.pausedTopicPartitions.containsAll(collection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void wakeUp() {
        this.kafkaConsumer.wakeup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.closedState == ConsumerCloseState.POLLING) {
            Instant now = Instant.now();
            Instant instant = now;
            do {
                if (LOG.isTraceEnabled()) {
                    Instant now2 = Instant.now();
                    if (now2.isAfter(instant)) {
                        LOG.trace("Consumer {} is not closed yet (waiting {})", this.info.clientId, Duration.between(now, now2));
                        instant = now2.plusSeconds(5L);
                    }
                }
            } while (this.closedState == ConsumerCloseState.POLLING);
        }
        LOG.debug("Consumer {} is closed", this.info.clientId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void threadPollLoop() {
        try {
            Consumer<?, ?> consumer = this.kafkaConsumer;
            while (true) {
                try {
                    refreshAssignmentsPollAndProcessRecords();
                } finally {
                }
            }
        } catch (WakeupException e) {
            this.closedState = ConsumerCloseState.CLOSED;
        }
    }

    private void refreshAssignmentsPollAndProcessRecords() {
        refreshAssignments();
        try {
            pollAndProcessRecords();
        } catch (Exception e) {
            handleException(e, null, null);
        } catch (WakeupException e2) {
            try {
                if (!this.failed && this.info.offsetStrategy != OffsetStrategy.DISABLED) {
                    this.kafkaConsumer.commitSync();
                }
            } catch (Exception e3) {
                LOG.warn("Error committing Kafka offsets on shutdown: {}", e3.getMessage(), e3);
            }
            throw e2;
        }
    }

    private void refreshAssignments() {
        Set assignment = this.kafkaConsumer.assignment();
        if (!assignment.equals(this.assignments)) {
            LOG.info("Consumer [{}] assignments changed: {} -> {}", new Object[]{this.info.clientId, this.assignments, assignment});
            this.assignments = Collections.unmodifiableSet(assignment);
        }
        synchronized (this) {
            if (this.autoPaused) {
                pause(this.assignments);
                this.kafkaConsumer.pause(this.assignments);
            }
        }
    }

    private void pollAndProcessRecords() {
        this.failed = true;
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = getCurrentOffsets();
        pauseTopicPartitions();
        ConsumerRecords<?, ?> pollRecords = pollRecords(currentOffsets);
        this.closedState = ConsumerCloseState.POLLING;
        if (!this.pollingStarted) {
            this.pollingStarted = true;
            this.kafkaConsumerProcessor.publishStartedPollingEvent(this.kafkaConsumer);
        }
        resumeTopicPartitions();
        if (pollRecords == null || pollRecords.isEmpty()) {
            return;
        }
        if (this.info.method.isSuspend()) {
            this.boundArguments.put(this.info.method.getArguments()[this.info.method.getArguments().length - 1], null);
        }
        processRecords(pollRecords, currentOffsets);
        if (this.failed) {
            return;
        }
        if (this.info.offsetStrategy != OffsetStrategy.SYNC) {
            if (this.info.offsetStrategy == OffsetStrategy.ASYNC) {
                this.kafkaConsumer.commitAsync(resolveCommitCallback());
            }
        } else {
            try {
                this.kafkaConsumer.commitSync();
            } catch (CommitFailedException e) {
                handleException(e, pollRecords, null);
            }
        }
    }

    private synchronized void pauseTopicPartitions() {
        if (this.pauseRequests == null || this.pauseRequests.isEmpty()) {
            return;
        }
        HashSet hashSet = new HashSet(this.pauseRequests);
        hashSet.retainAll(this.assignments);
        if (hashSet.isEmpty()) {
            return;
        }
        LOG.trace("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", this.info.clientId, hashSet);
        this.kafkaConsumer.pause(hashSet);
        LOG.debug("Paused Kafka consumption for Consumer [{}] from topic partition: {}", this.info.clientId, this.kafkaConsumer.paused());
        if (this.pausedTopicPartitions == null) {
            this.pausedTopicPartitions = new HashSet();
        }
        this.pausedTopicPartitions.addAll(hashSet);
    }

    private synchronized void resumeTopicPartitions() {
        Set paused = this.kafkaConsumer.paused();
        if (paused.isEmpty()) {
            return;
        }
        List list = paused.stream().filter(topicPartition -> {
            return this.pauseRequests == null || !this.pauseRequests.contains(topicPartition);
        }).toList();
        if (!list.isEmpty()) {
            LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", this.info.clientId, list);
            this.kafkaConsumer.resume(list);
        }
        if (this.pausedTopicPartitions != null) {
            Set<TopicPartition> set = this.pausedTopicPartitions;
            Objects.requireNonNull(set);
            list.forEach((v1) -> {
                r1.remove(v1);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleResultFlux(ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Flux<?> flux, boolean z) {
        Flux flatMap = flux.flatMap(obj -> {
            return sendToDestination(obj, consumerRecord, consumerRecords);
        });
        if (!z) {
            flatMap.subscribe(recordMetadata -> {
                LOG.trace("Method [{}] produced record metadata: {}", this.info.logMethod, recordMetadata);
            });
        } else {
            LOG.trace("Method [{}] produced record metadata: {}", this.info.method, (List) flatMap.collectList().block());
        }
    }

    private Publisher<RecordMetadata> sendToDestination(Object obj, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        Producer producer;
        if (obj == null || this.info.sendToTopics.isEmpty()) {
            return Flux.empty();
        }
        Object key = consumerRecord.key();
        if (this.info.shouldSendOffsetsToTransaction) {
            producer = this.kafkaConsumerProcessor.getTransactionalProducer(this.info.producerClientId, this.info.producerTransactionalId, byte[].class, Object.class);
        } else {
            producer = this.kafkaConsumerProcessor.getProducer((String) Optional.ofNullable(this.info.producerClientId).orElse(this.info.groupId), key != null ? key.getClass() : byte[].class, obj.getClass());
        }
        Producer producer2 = producer;
        return Flux.create(fluxSink -> {
            sendToDestination(fluxSink, producer2, key, obj, consumerRecord, consumerRecords);
        }).onErrorResume(th -> {
            return handleSendToError(th, consumerRecords, consumerRecord);
        });
    }

    private void sendToDestination(FluxSink<RecordMetadata> fluxSink, Producer<?, ?> producer, Object obj, Object obj2, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        try {
            if (this.info.shouldSendOffsetsToTransaction) {
                beginTransaction(producer);
            }
            sendToDestination(producer, new FluxCallback(fluxSink), obj, obj2, consumerRecord);
            if (this.info.shouldSendOffsetsToTransaction) {
                endTransaction(producer, consumerRecords);
            }
            fluxSink.complete();
        } catch (Exception e) {
            if (this.info.shouldSendOffsetsToTransaction) {
                abortTransaction(producer, e);
            }
            fluxSink.error(e);
        }
    }

    private void sendToDestination(Producer<?, ?> producer, Callback callback, Object obj, Object obj2, ConsumerRecord<?, ?> consumerRecord) {
        for (String str : this.info.sendToTopics) {
            if (this.info.returnsManyKafkaMessages) {
                Iterator it = ((Iterable) obj2).iterator();
                while (it.hasNext()) {
                    producer.send(createFromMessage(str, (KafkaMessage) it.next()), callback);
                }
            } else {
                ProducerRecord createFromMessage = this.info.returnsOneKafkaMessage ? createFromMessage(str, (KafkaMessage) obj2) : new ProducerRecord(str, (Integer) null, obj, obj2, consumerRecord.headers());
                LOG.trace("Sending record: {} for producer: {} {}", new Object[]{createFromMessage, producer, this.info.producerTransactionalId});
                producer.send(createFromMessage, callback);
            }
        }
    }

    private void beginTransaction(Producer<?, ?> producer) {
        try {
            LOG.trace("Beginning transaction for producer: {}", this.info.producerTransactionalId);
            producer.beginTransaction();
        } catch (ProducerFencedException e) {
            this.kafkaConsumerProcessor.handleProducerFencedException(producer, e);
        }
    }

    private void endTransaction(Producer<?, ?> producer, ConsumerRecords<?, ?> consumerRecords) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            List records = consumerRecords.records(topicPartition);
            hashMap.put(topicPartition, new OffsetAndMetadata(((ConsumerRecord) records.get(records.size() - 1)).offset() + 1));
        }
        sendOffsetsToTransaction(producer, hashMap);
    }

    private void abortTransaction(Producer<?, ?> producer, Exception exc) {
        try {
            LOG.trace("Aborting transaction for producer: {} because of error: {}", this.info.producerTransactionalId, exc.getMessage());
            producer.abortTransaction();
        } catch (ProducerFencedException e) {
            this.kafkaConsumerProcessor.handleProducerFencedException(producer, e);
        }
    }

    private void sendOffsetsToTransaction(Producer<?, ?> producer, Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", new Object[]{map, this.info.producerTransactionalId, this.info.groupId});
            producer.sendOffsetsToTransaction(map, new ConsumerGroupMetadata(this.info.groupId));
            LOG.trace("Committing transaction for producer: {}", this.info.producerTransactionalId);
            producer.commitTransaction();
            LOG.trace("Committed transaction for producer: {}", this.info.producerTransactionalId);
        } catch (ProducerFencedException e) {
            this.kafkaConsumerProcessor.handleProducerFencedException(producer, e);
        }
    }

    private Publisher<RecordMetadata> handleSendToError(Throwable th, ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord) {
        handleException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + this.info.method + "]: " + th.getMessage(), th, consumerRecords, consumerRecord);
        return !this.info.shouldRedeliver ? Flux.empty() : redeliver(consumerRecord).doOnError(th2 -> {
            handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + this.info.method + "]: " + th.getMessage(), th2, consumerRecords, consumerRecord);
        });
    }

    private Mono<RecordMetadata> redeliver(ConsumerRecord<?, ?> consumerRecord) {
        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        if (key == null || value == null) {
            return Mono.empty();
        }
        LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord);
        Producer producer = this.kafkaConsumerProcessor.getProducer((String) Optional.ofNullable(this.info.producerClientId).orElse(this.info.groupId), key.getClass(), value.getClass());
        ProducerRecord producerRecord = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, consumerRecord.headers());
        LOG.trace("Sending record: {} for producer: {} {}", new Object[]{producerRecord, producer, this.info.producerTransactionalId});
        return Mono.create(monoSink -> {
            producer.send(producerRecord, new MonoCallback(monoSink));
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void delayRetry(int i, Set<TopicPartition> set) {
        Duration computeRetryDelay = this.info.errorStrategy.computeRetryDelay(this.info.retryDelay, i);
        if (computeRetryDelay != null) {
            pause(set);
            this.kafkaConsumerProcessor.scheduleTask(computeRetryDelay, () -> {
                resume(set);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldRetryException(Throwable th, ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord) {
        if (this.info.errorStrategy.isConditionalRetry()) {
            if (!this.kafkaConsumerProcessor.shouldRetryMessage(this.consumerBean, wrapExceptionInKafkaListenerException(th.getMessage(), th, consumerRecords, consumerRecord))) {
                Stream<Class<? extends Throwable>> stream = this.info.exceptionTypes.stream();
                Class<?> cls = th.getClass();
                Objects.requireNonNull(cls);
                if (!stream.anyMatch((v1) -> {
                    return r1.equals(v1);
                })) {
                    return false;
                }
            }
            return true;
        }
        if (!this.info.exceptionTypes.isEmpty()) {
            Stream<Class<? extends Throwable>> stream2 = this.info.exceptionTypes.stream();
            Class<?> cls2 = th.getClass();
            Objects.requireNonNull(cls2);
            if (!stream2.anyMatch((v1) -> {
                return r1.equals(v1);
            })) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionRetryState getPartitionRetryState(TopicPartition topicPartition, long j) {
        PartitionRetryState computeIfAbsent = this.topicPartitionRetries.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new PartitionRetryState();
        });
        if (computeIfAbsent.currentRetryOffset != j) {
            computeIfAbsent.currentRetryOffset = j;
            computeIfAbsent.currentRetryCount = 1;
        } else {
            computeIfAbsent.currentRetryCount++;
        }
        return computeIfAbsent;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleException(Throwable th, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        handleException(th.getMessage(), th, consumerRecords, consumerRecord);
    }

    private void handleException(String str, Throwable th, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        this.kafkaConsumerProcessor.handleException(this.consumerBean, wrapExceptionInKafkaListenerException(str, th, consumerRecords, consumerRecord));
    }

    private KafkaListenerException wrapExceptionInKafkaListenerException(String str, Throwable th, @Nullable ConsumerRecords<?, ?> consumerRecords, @Nullable ConsumerRecord<?, ?> consumerRecord) {
        return new KafkaListenerException(str, th, this.consumerBean, this.kafkaConsumer, consumerRecords, consumerRecord);
    }

    private OffsetCommitCallback resolveCommitCallback() {
        return (map, exc) -> {
            Object obj = this.consumerBean;
            if (obj instanceof OffsetCommitCallback) {
                ((OffsetCommitCallback) obj).onComplete(map, exc);
            } else if (exc != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{map, exc.getMessage(), exc});
            }
        };
    }

    private static ProducerRecord createFromMessage(String str, KafkaMessage<?, ?> kafkaMessage) {
        return new ProducerRecord((String) Optional.ofNullable(kafkaMessage.getTopic()).orElse(str), kafkaMessage.getPartition(), kafkaMessage.getTimestamp(), kafkaMessage.getKey(), kafkaMessage.getBody(), (Iterable) Optional.ofNullable(kafkaMessage.getHeaders()).map(ConsumerState::convertHeaders).orElse(null));
    }

    private static List<RecordHeader> convertHeaders(Map<String, Object> map) {
        return map.entrySet().stream().map(entry -> {
            return new RecordHeader((String) entry.getKey(), entry.getValue().toString().getBytes(StandardCharsets.UTF_8));
        }).toList();
    }
}
