package io.awspring.cloud.sqs.listener.source;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.QueueAttributesResolver;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.QueueAttributes;
import io.awspring.cloud.sqs.listener.QueueAttributesAware;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor;
import io.awspring.cloud.sqs.listener.acknowledgement.ExecutingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.class */
public abstract class AbstractSqsMessageSource<T> extends AbstractPollingMessageSource<T, Message> implements SqsAsyncClientAware {
    private static final Logger logger = LoggerFactory.getLogger(AbstractSqsMessageSource.class);
    private static final int MESSAGE_VISIBILITY_DISABLED = -1;
    private SqsAsyncClient sqsAsyncClient;
    private String queueUrl;
    private QueueAttributes queueAttributes;
    private QueueNotFoundStrategy queueNotFoundStrategy;
    private Collection<QueueAttributeName> queueAttributeNames;
    private Collection<String> messageAttributeNames;
    private Collection<String> messageSystemAttributeNames;
    private int messageVisibility;
    private int pollTimeout;

    @Override // io.awspring.cloud.sqs.listener.SqsAsyncClientAware
    public void setSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
        Assert.notNull(sqsAsyncClient, "sqsAsyncClient cannot be null.");
        this.sqsAsyncClient = sqsAsyncClient;
    }

    @Override // io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource
    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
        Assert.isInstanceOf(SqsContainerOptions.class, containerOptions, "containerOptions must be an instance of SqsContainerOptions");
        SqsContainerOptions sqsContainerOptions = (SqsContainerOptions) containerOptions;
        this.pollTimeout = (int) sqsContainerOptions.getPollTimeout().getSeconds();
        this.queueAttributeNames = sqsContainerOptions.getQueueAttributeNames();
        this.messageAttributeNames = sqsContainerOptions.getMessageAttributeNames();
        this.messageSystemAttributeNames = sqsContainerOptions.getMessageSystemAttributeNames();
        this.queueNotFoundStrategy = sqsContainerOptions.getQueueNotFoundStrategy();
        this.messageVisibility = sqsContainerOptions.getMessageVisibility() != null ? (int) sqsContainerOptions.getMessageVisibility().getSeconds() : MESSAGE_VISIBILITY_DISABLED;
    }

    @Override // io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource
    protected void doStart() {
        Assert.notNull(this.sqsAsyncClient, "sqsAsyncClient not set");
        Assert.notNull(this.queueAttributeNames, "queueAttributeNames not set");
        this.queueAttributes = resolveQueueAttributes();
        this.queueUrl = this.queueAttributes.getQueueUrl();
        configureConversionContextAndAcknowledgement();
    }

    private void configureConversionContextAndAcknowledgement() {
        ConfigUtils.INSTANCE.acceptIfInstance(getMessageConversionContext(), SqsAsyncClientAware.class, sqsAsyncClientAware -> {
            sqsAsyncClientAware.setSqsAsyncClient(this.sqsAsyncClient);
        }).acceptIfInstance(getMessageConversionContext(), QueueAttributesAware.class, queueAttributesAware -> {
            queueAttributesAware.setQueueAttributes(this.queueAttributes);
        }).acceptIfInstance(getAcknowledgmentProcessor(), ExecutingAcknowledgementProcessor.class, executingAcknowledgementProcessor -> {
            executingAcknowledgementProcessor.setAcknowledgementExecutor(createAndConfigureAcknowledgementExecutor(this.queueAttributes));
        });
    }

    private QueueAttributes resolveQueueAttributes() {
        return QueueAttributesResolver.builder().queueName(getPollingEndpointName()).sqsAsyncClient(this.sqsAsyncClient).queueAttributeNames(this.queueAttributeNames).queueNotFoundStrategy(this.queueNotFoundStrategy).build().resolveQueueAttributes().join();
    }

    protected AcknowledgementExecutor<T> createAndConfigureAcknowledgementExecutor(QueueAttributes queueAttributes) {
        AcknowledgementExecutor<T> createAcknowledgementExecutorInstance = createAcknowledgementExecutorInstance();
        ConfigUtils.INSTANCE.acceptIfInstance(createAcknowledgementExecutorInstance, QueueAttributesAware.class, queueAttributesAware -> {
            queueAttributesAware.setQueueAttributes(queueAttributes);
        }).acceptIfInstance(createAcknowledgementExecutorInstance, SqsAsyncClientAware.class, sqsAsyncClientAware -> {
            sqsAsyncClientAware.setSqsAsyncClient(this.sqsAsyncClient);
        });
        return createAcknowledgementExecutorInstance;
    }

    protected AcknowledgementExecutor<T> createAcknowledgementExecutorInstance() {
        return new SqsAcknowledgementExecutor();
    }

    @Override // io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource
    protected CompletableFuture<Collection<Message>> doPollForMessages(int i) {
        logger.debug("Polling queue {} for {} messages.", this.queueUrl, Integer.valueOf(i));
        return i <= 10 ? executePoll(i) : executeMultiplePolls(i);
    }

    private CompletableFuture<Collection<Message>> executePoll(int i) {
        return this.sqsAsyncClient.receiveMessage(createRequest(i)).thenApply((v0) -> {
            return v0.messages();
        }).thenApply(list -> {
            return list;
        }).whenComplete((BiConsumer) this::logMessagesReceived);
    }

    private ReceiveMessageRequest createRequest(int i) {
        ReceiveMessageRequest.Builder waitTimeSeconds = ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(i)).attributeNamesWithStrings(this.messageSystemAttributeNames).messageAttributeNames(this.messageAttributeNames).waitTimeSeconds(Integer.valueOf(this.pollTimeout));
        customizeRequest(waitTimeSeconds);
        if (this.messageVisibility >= 0) {
            waitTimeSeconds.visibilityTimeout(Integer.valueOf(this.messageVisibility));
        }
        return (ReceiveMessageRequest) waitTimeSeconds.build();
    }

    protected void customizeRequest(ReceiveMessageRequest.Builder builder) {
    }

    private CompletableFuture<Collection<Message>> executeMultiplePolls(int i) {
        int i2 = i % 10;
        return i2 == 0 ? combinePolls(i) : combinePolls(i).thenCombine((CompletionStage) executePoll(i2), this::combineBatches);
    }

    private CompletableFuture<Collection<Message>> combinePolls(int i) {
        return (CompletableFuture) IntStream.range(0, i / 10).mapToObj(i2 -> {
            return executePoll(10);
        }).reduce(CompletableFuture.completedFuture(Collections.emptyList()), (completableFuture, completableFuture2) -> {
            return completableFuture.thenCombine((CompletionStage) completableFuture2, this::combineBatches);
        });
    }

    private Collection<Message> combineBatches(Collection<Message> collection, Collection<Message> collection2) {
        ArrayList arrayList = new ArrayList(collection);
        arrayList.addAll(collection2);
        return arrayList;
    }

    private void logMessagesReceived(@Nullable Collection<Message> collection, @Nullable Throwable th) {
        if (collection != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Received {} messages {} from queue {}", new Object[]{Integer.valueOf(collection.size()), collection.stream().map((v0) -> {
                    return v0.messageId();
                }).collect(Collectors.toList()), this.queueUrl});
            } else {
                logger.debug("Received {} messages from queue {}", Integer.valueOf(collection.size()), this.queueUrl);
            }
        }
    }
}
