package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/pipeline/AbstractBeforeProcessingInterceptorExecutionStage.class */
public abstract class AbstractBeforeProcessingInterceptorExecutionStage<T> implements MessageProcessingPipeline<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractBeforeProcessingInterceptorExecutionStage.class);

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Message<T>> process(Message<T> message, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Processing message {}", MessageHeaderUtils.getId((Message<?>) message));
        return (CompletableFuture) getInterceptors(messageProcessingContext).stream().reduce(CompletableFuture.completedFuture(message), (completableFuture, asyncMessageInterceptor) -> {
            Objects.requireNonNull(asyncMessageInterceptor);
            return completableFuture.thenCompose(asyncMessageInterceptor::intercept).thenApply((Function) validateMessageNotNull());
        }, (completableFuture2, completableFuture3) -> {
            return completableFuture2;
        });
    }

    @Override // io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline
    public CompletableFuture<Collection<Message<T>>> process(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Processing messages {}", MessageHeaderUtils.getId(collection));
        return (CompletableFuture) getInterceptors(messageProcessingContext).stream().reduce(CompletableFuture.completedFuture(collection), (completableFuture, asyncMessageInterceptor) -> {
            Objects.requireNonNull(asyncMessageInterceptor);
            return completableFuture.thenCompose(asyncMessageInterceptor::intercept).thenApply((Function) validateMessagesNotEmpty());
        }, (completableFuture2, completableFuture3) -> {
            return completableFuture2;
        });
    }

    protected abstract Collection<AsyncMessageInterceptor<T>> getInterceptors(MessageProcessingContext<T> messageProcessingContext);

    private Function<Message<T>, Message<T>> validateMessageNotNull() {
        return message -> {
            Assert.notNull(message, "Interceptor must not return null messages");
            return message;
        };
    }

    private Function<Collection<Message<T>>, Collection<Message<T>>> validateMessagesNotEmpty() {
        return collection -> {
            Assert.notEmpty(collection, "Interceptor must not return null or empty collection");
            return collection;
        };
    }
}
