package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.DefaultExecutableBinder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:io/micronaut/configuration/kafka/processor/ConsumerStateBatch.class */
public final class ConsumerStateBatch extends ConsumerState {
    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerStateBatch(KafkaConsumerProcessor kafkaConsumerProcessor, ConsumerInfo consumerInfo, Consumer<?, ?> consumer, Object obj) {
        super(kafkaConsumerProcessor, consumerInfo, consumer, obj);
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    @Nullable
    protected Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        if (this.info.errorStrategy.isRetry()) {
            return (Map) this.kafkaConsumer.assignment().stream().collect(Collectors.toMap(Function.identity(), this::getCurrentOffset));
        }
        return null;
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    protected ConsumerRecords<?, ?> pollRecords(@Nullable Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            return this.kafkaConsumer.poll(this.info.pollTimeout);
        } catch (RecordDeserializationException e) {
            LOG.trace("Kafka consumer [{}] failed to deserialize value while polling", this.info.logMethod, e);
            this.kafkaConsumer.seek(e.topicPartition(), e.offset() + 1);
            resolveWithErrorStrategy(null, map, e);
            return null;
        }
    }

    @Override // io.micronaut.configuration.kafka.processor.ConsumerState
    protected void processRecords(ConsumerRecords<?, ?> consumerRecords, @Nullable Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            if (this.info.ackArg != null) {
                Map<TopicPartition, OffsetAndMetadata> ackOffsets = getAckOffsets(consumerRecords);
                this.boundArguments.put(this.info.ackArg, () -> {
                    this.kafkaConsumer.commitSync(ackOffsets);
                });
            }
            handleResult(normalizeResult(new DefaultExecutableBinder(this.boundArguments).bind(this.info.method, this.kafkaConsumerProcessor.getBatchBinderRegistry(), consumerRecords).invoke(this.consumerBean)), consumerRecords);
            this.failed = false;
        } catch (Exception e) {
            this.failed = resolveWithErrorStrategy(consumerRecords, map, e);
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getAckOffsets(ConsumerRecords<?, ?> consumerRecords) {
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, (String) null));
        }
        return hashMap;
    }

    @Nullable
    private static Object normalizeResult(@Nullable Object obj) {
        return (obj == null || !obj.getClass().isArray()) ? obj : Arrays.asList((Object[]) obj);
    }

    private void handleResult(Object obj, ConsumerRecords<?, ?> consumerRecords) {
        if (obj != null) {
            boolean isConvertibleToPublisher = Publishers.isConvertibleToPublisher(obj);
            boolean z = this.info.isBlocking || !isConvertibleToPublisher;
            Flux doOnNext = (obj instanceof Iterable ? Flux.fromIterable((Iterable) obj) : isConvertibleToPublisher ? this.kafkaConsumerProcessor.convertPublisher(obj) : Flux.just(obj)).zipWithIterable(consumerRecords).doOnNext(tuple2 -> {
                handleResultFlux(consumerRecords, (ConsumerRecord) tuple2.getT2(), Flux.just(tuple2.getT1()), z);
            });
            if (z) {
                doOnNext.blockLast();
            } else {
                doOnNext.subscribe();
            }
        }
    }

    private boolean resolveWithErrorStrategy(@Nullable ConsumerRecords<?, ?> consumerRecords, Map<TopicPartition, OffsetAndMetadata> map, Throwable th) {
        int currentRetryCount;
        if (this.info.errorStrategy.isRetry()) {
            Set<TopicPartition> partitions = consumerRecords != null ? consumerRecords.partitions() : map.keySet();
            if (shouldRetryException(th) && this.info.retryCount > 0 && this.info.retryCount >= (currentRetryCount = getCurrentRetryCount(partitions, map))) {
                if (this.info.shouldHandleAllExceptions) {
                    handleException(th, consumerRecords, null);
                }
                partitions.forEach(topicPartition -> {
                    this.kafkaConsumer.seek(topicPartition, ((OffsetAndMetadata) map.get(topicPartition)).offset());
                });
                delayRetry(currentRetryCount, partitions);
                return true;
            }
            Map<TopicPartition, PartitionRetryState> map2 = this.topicPartitionRetries;
            Objects.requireNonNull(map2);
            partitions.forEach((v1) -> {
                r1.remove(v1);
            });
        }
        handleException(th, consumerRecords, null);
        return this.info.errorStrategy == ErrorStrategyValue.NONE;
    }

    private int getCurrentRetryCount(Set<TopicPartition> set, @Nullable Map<TopicPartition, OffsetAndMetadata> map) {
        return set.stream().map(topicPartition -> {
            return getPartitionRetryState(topicPartition, ((OffsetAndMetadata) map.get(topicPartition)).offset());
        }).mapToInt(partitionRetryState -> {
            return partitionRetryState.currentRetryCount;
        }).max().orElse(this.info.retryCount);
    }

    private OffsetAndMetadata getCurrentOffset(TopicPartition topicPartition) {
        return new OffsetAndMetadata(this.kafkaConsumer.position(topicPartition), (String) null);
    }
}
