package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters.class */
public class AsyncComponentAdapters {
    private static final Logger logger = LoggerFactory.getLogger(AsyncComponentAdapters.class);

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters$AbstractThreadingComponentAdapter.class */
    protected static class AbstractThreadingComponentAdapter implements TaskExecutorAware {
        private TaskExecutor taskExecutor;

        protected AbstractThreadingComponentAdapter() {
        }

        @Override // io.awspring.cloud.sqs.listener.TaskExecutorAware
        public void setTaskExecutor(TaskExecutor taskExecutor) {
            this.taskExecutor = taskExecutor;
        }

        protected <T> CompletableFuture<T> execute(Supplier<T> supplier) {
            if (Thread.currentThread() instanceof MessageExecutionThread) {
                AsyncComponentAdapters.logger.trace("Already in a {}, not switching", MessageExecutionThread.class.getSimpleName());
                return supplyInSameThread(supplier);
            }
            AsyncComponentAdapters.logger.trace("Not in a {}, submitting to executor", MessageExecutionThread.class.getSimpleName());
            Assert.notNull(this.taskExecutor, "Task executor not set");
            return supplyInNewThread(supplier);
        }

        protected CompletableFuture<Void> execute(Runnable runnable) {
            if (Thread.currentThread() instanceof MessageExecutionThread) {
                AsyncComponentAdapters.logger.trace("Already in a {}, not switching", MessageExecutionThread.class.getSimpleName());
                return runInSameThread(runnable);
            }
            AsyncComponentAdapters.logger.trace("Not in a {}, submitting to executor", MessageExecutionThread.class.getSimpleName());
            Assert.notNull(this.taskExecutor, "Task executor not set");
            return runInNewThread(runnable);
        }

        private CompletableFuture<Void> runInSameThread(Runnable runnable) {
            try {
                runnable.run();
                return CompletableFuture.completedFuture(null);
            } catch (Exception e) {
                return CompletableFutures.failedFuture(wrapWithBlockingException(e));
            }
        }

        private CompletableFuture<Void> runInNewThread(Runnable runnable) {
            try {
                return CompletableFutures.exceptionallyCompose(CompletableFuture.runAsync(runnable, this.taskExecutor), th -> {
                    return CompletableFutures.failedFuture(wrapWithBlockingException(th));
                });
            } catch (Exception e) {
                return CompletableFutures.failedFuture(wrapWithBlockingException(e));
            }
        }

        private <T> CompletableFuture<T> supplyInSameThread(Supplier<T> supplier) {
            try {
                return CompletableFuture.completedFuture(supplier.get());
            } catch (Exception e) {
                return CompletableFutures.failedFuture(wrapWithBlockingException(e));
            }
        }

        private <T> CompletableFuture<T> supplyInNewThread(Supplier<T> supplier) {
            try {
                return CompletableFutures.exceptionallyCompose(CompletableFuture.supplyAsync(supplier, this.taskExecutor), th -> {
                    return CompletableFutures.failedFuture(wrapWithBlockingException(th));
                });
            } catch (Exception e) {
                return CompletableFutures.failedFuture(wrapWithBlockingException(e));
            }
        }

        private AsyncAdapterBlockingExecutionFailedException wrapWithBlockingException(Throwable th) {
            return new AsyncAdapterBlockingExecutionFailedException("Error executing action in " + getClass().getSimpleName(), th instanceof CompletionException ? th.getCause() : th);
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters$BlockingAcknowledgementResultCallbackAdapter.class */
    private static class BlockingAcknowledgementResultCallbackAdapter<T> extends AbstractThreadingComponentAdapter implements AsyncAcknowledgementResultCallback<T> {
        private final AcknowledgementResultCallback<T> blockingAcknowledgementResultCallback;

        public BlockingAcknowledgementResultCallbackAdapter(AcknowledgementResultCallback<T> acknowledgementResultCallback) {
            this.blockingAcknowledgementResultCallback = acknowledgementResultCallback;
        }

        @Override // io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback
        public CompletableFuture<Void> onSuccess(Collection<Message<T>> collection) {
            return execute(() -> {
                this.blockingAcknowledgementResultCallback.onSuccess(collection);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback
        public CompletableFuture<Void> onFailure(Collection<Message<T>> collection, Throwable th) {
            return execute(() -> {
                this.blockingAcknowledgementResultCallback.onFailure(collection, th);
            });
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters$BlockingErrorHandlerAdapter.class */
    private static class BlockingErrorHandlerAdapter<T> extends AbstractThreadingComponentAdapter implements AsyncErrorHandler<T> {
        private final ErrorHandler<T> blockingErrorHandler;

        public BlockingErrorHandlerAdapter(ErrorHandler<T> errorHandler) {
            this.blockingErrorHandler = errorHandler;
        }

        @Override // io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler
        public CompletableFuture<Void> handle(Message<T> message, Throwable th) {
            return execute(() -> {
                this.blockingErrorHandler.handle(message, th);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler
        public CompletableFuture<Void> handle(Collection<Message<T>> collection, Throwable th) {
            return execute(() -> {
                this.blockingErrorHandler.handle(collection, th);
            });
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters$BlockingMessageInterceptorAdapter.class */
    private static class BlockingMessageInterceptorAdapter<T> extends AbstractThreadingComponentAdapter implements AsyncMessageInterceptor<T> {
        private final MessageInterceptor<T> blockingMessageInterceptor;

        public BlockingMessageInterceptorAdapter(MessageInterceptor<T> messageInterceptor) {
            this.blockingMessageInterceptor = messageInterceptor;
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Message<T>> intercept(Message<T> message) {
            return execute(() -> {
                return this.blockingMessageInterceptor.intercept(message);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Collection<Message<T>>> intercept(Collection<Message<T>> collection) {
            return execute(() -> {
                return this.blockingMessageInterceptor.intercept(collection);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Void> afterProcessing(Message<T> message, Throwable th) {
            return execute(() -> {
                this.blockingMessageInterceptor.afterProcessing(message, th);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor
        public CompletableFuture<Void> afterProcessing(Collection<Message<T>> collection, Throwable th) {
            return execute(() -> {
                this.blockingMessageInterceptor.afterProcessing(collection, th);
            });
        }
    }

    /* loaded from: input_file:io/awspring/cloud/sqs/listener/AsyncComponentAdapters$BlockingMessageListenerAdapter.class */
    private static class BlockingMessageListenerAdapter<T> extends AbstractThreadingComponentAdapter implements AsyncMessageListener<T> {
        private final MessageListener<T> blockingMessageListener;

        public BlockingMessageListenerAdapter(MessageListener<T> messageListener) {
            this.blockingMessageListener = messageListener;
        }

        @Override // io.awspring.cloud.sqs.listener.AsyncMessageListener
        public CompletableFuture<Void> onMessage(Message<T> message) {
            return execute(() -> {
                this.blockingMessageListener.onMessage(message);
            });
        }

        @Override // io.awspring.cloud.sqs.listener.AsyncMessageListener
        public CompletableFuture<Void> onMessage(Collection<Message<T>> collection) {
            return execute(() -> {
                this.blockingMessageListener.onMessage(collection);
            });
        }
    }

    private AsyncComponentAdapters() {
    }

    public static <T> AsyncErrorHandler<T> adapt(ErrorHandler<T> errorHandler) {
        return new BlockingErrorHandlerAdapter(errorHandler);
    }

    public static <T> AsyncMessageInterceptor<T> adapt(MessageInterceptor<T> messageInterceptor) {
        return new BlockingMessageInterceptorAdapter(messageInterceptor);
    }

    public static <T> AsyncMessageListener<T> adapt(MessageListener<T> messageListener) {
        return new BlockingMessageListenerAdapter(messageListener);
    }

    public static <T> AsyncAcknowledgementResultCallback<T> adapt(AcknowledgementResultCallback<T> acknowledgementResultCallback) {
        return new BlockingAcknowledgementResultCallbackAdapter(acknowledgementResultCallback);
    }
}
