package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.seek.KafkaSeekOperations;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.DefaultExecutableBinder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:io/micronaut/configuration/kafka/processor/ConsumerStateSingle.class */
public final class ConsumerStateSingle extends ConsumerState {
    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerStateSingle(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo consumerInfo, Consumer<?, ?> consumer, Object obj) {
        super(kafkaConsumerProcessor, consumerInfo, consumer, obj);
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    @Nullable
    protected Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        if (this.info.trackPartitions) {
            return new HashMap();
        }
        return null;
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    protected ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            return this.kafkaConsumer.poll(this.info.pollTimeout);
        } catch (RecordDeserializationException e) {
            LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", this.info.logMethod, e);
            this.kafkaConsumer.seek(e.topicPartition(), e.offset() + 1);
            resolveWithErrorStrategy(null, makeConsumerRecord(e), e);
            return null;
        }
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    protected void processRecords(ConsumerRecords<?, ?> consumerRecords, Map<TopicPartition, OffsetAndMetadata> map) {
        Iterator<? extends ConsumerRecord<?, ?>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<?, ?> next = it.next();
            LOG.trace("Kafka consumer [{}] received record: {}", this.info.logMethod, next);
            if (this.info.trackPartitions) {
                map.put(getTopicPartition(next), new OffsetAndMetadata(next.offset() + 1, (String) null));
            }
            KafkaSeekOperations kafkaSeekOperations = (KafkaSeekOperations) Optional.ofNullable(this.info.seekArg).map(argument -> {
                return KafkaSeekOperations.newInstance();
            }).orElse(null);
            Optional.ofNullable(this.info.seekArg).ifPresent(argument2 -> {
                this.boundArguments.put(argument2, kafkaSeekOperations);
            });
            Optional.ofNullable(this.info.ackArg).ifPresent(argument3 -> {
                this.boundArguments.put(argument3, () -> {
                    this.kafkaConsumer.commitSync(map);
                });
            });
            try {
                process(next, consumerRecords);
            } catch (Exception e) {
                if (resolveWithErrorStrategy(consumerRecords, next, e)) {
                    resetTheFollowingPartitions(next, it);
                    this.failed = true;
                    return;
                }
            }
            if (this.info.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                commitSync(consumerRecords, next, map);
            } else if (this.info.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) {
                this.kafkaConsumer.commitAsync(map, this::resolveCommitCallback);
            }
            if (kafkaSeekOperations != null) {
                KafkaSeeker newInstance = KafkaSeeker.newInstance(this.kafkaConsumer);
                Objects.requireNonNull(newInstance);
                kafkaSeekOperations.forEach(newInstance::perform);
            }
        }
        this.failed = false;
    }

    private void process(ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
        Object invoke = new DefaultExecutableBinder(this.boundArguments).bind(this.info.method, this.kafkaConsumerProcessor.getBinderRegistry(), consumerRecord).invoke(this.consumerBean);
        if (invoke != null) {
            boolean isConvertibleToPublisher = Publishers.isConvertibleToPublisher(invoke);
            handleResultFlux(consumerRecords, consumerRecord, isConvertibleToPublisher ? this.kafkaConsumerProcessor.convertPublisher(invoke) : Flux.just(invoke), isConvertibleToPublisher || this.info.isBlocking);
        }
    }

    private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Throwable th) {
        int currentRetryCount;
        if (this.info.errorStrategy.isRetry()) {
            TopicPartition topicPartition = getTopicPartition(consumerRecord);
            if (shouldRetryException(th, consumerRecords, consumerRecord) && this.info.retryCount > 0 && this.info.retryCount >= (currentRetryCount = getCurrentRetryCount(consumerRecord))) {
                if (this.info.shouldHandleAllExceptions) {
                    handleException(th, consumerRecords, consumerRecord);
                }
                this.kafkaConsumer.seek(topicPartition, consumerRecord.offset());
                delayRetry(currentRetryCount, Collections.singleton(topicPartition));
                return true;
            }
            this.topicPartitionRetries.remove(topicPartition);
        }
        handleException(th, consumerRecords, consumerRecord);
        return this.info.errorStrategy == ErrorStrategyValue.NONE;
    }

    private void commitSync(ConsumerRecords<?, ?> consumerRecords, ConsumerRecord<?, ?> consumerRecord, Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            this.kafkaConsumer.commitSync(map);
        } catch (CommitFailedException e) {
            handleException(e, consumerRecords, consumerRecord);
        }
    }

    private void resolveCommitCallback(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
        Object obj = this.consumerBean;
        if (obj instanceof OffsetCommitCallback) {
            ((OffsetCommitCallback) obj).onComplete(map, exc);
        } else if (exc != null) {
            LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{map, exc.getMessage(), exc});
        }
    }

    private void resetTheFollowingPartitions(ConsumerRecord<?, ?> consumerRecord, Iterator<? extends ConsumerRecord<?, ?>> it) {
        HashSet hashSet = new HashSet();
        hashSet.add(Integer.valueOf(consumerRecord.partition()));
        while (it.hasNext()) {
            ConsumerRecord<?, ?> next = it.next();
            if (hashSet.add(Integer.valueOf(next.partition()))) {
                this.kafkaConsumer.seek(getTopicPartition(next), next.offset());
            }
        }
    }

    private int getCurrentRetryCount(ConsumerRecord<?, ?> consumerRecord) {
        return getPartitionRetryState(getTopicPartition(consumerRecord), consumerRecord.offset()).currentRetryCount;
    }

    private static TopicPartition getTopicPartition(ConsumerRecord<?, ?> consumerRecord) {
        return new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
    }

    private static ConsumerRecord<?, ?> makeConsumerRecord(RecordDeserializationException recordDeserializationException) {
        TopicPartition topicPartition = recordDeserializationException.topicPartition();
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), recordDeserializationException.offset(), (Object) null, (Object) null);
    }
}
