package org.springframework.integration.handler;

import io.micrometer.observation.ObservationRegistry;
import org.reactivestreams.Subscription;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.metrics.SampleFacade;
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.IntegrationObservation;
import org.springframework.integration.support.management.observation.MessageReceiverContext;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import reactor.core.CoreSubscriber;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.3.6.jar:org/springframework/integration/handler/AbstractMessageHandler.class */
public abstract class AbstractMessageHandler extends MessageHandlerSupport implements MessageHandler, CoreSubscriber<Message<?>> {

    @Nullable
    private MessageReceiverObservationConvention observationConvention;

    public void setObservationConvention(@Nullable MessageReceiverObservationConvention messageReceiverObservationConvention) {
        this.observationConvention = messageReceiverObservationConvention;
    }

    @Override // org.springframework.messaging.MessageHandler
    public void handleMessage(Message<?> message) {
        Assert.notNull(message, "Message must not be null");
        if (isLoggingEnabled()) {
            this.logger.debug(() -> {
                return this + " received message: " + message;
            });
        }
        if (isObserved()) {
            handleWithObservation(message, getObservationRegistry());
            return;
        }
        MetricsCaptor metricsCaptor = getMetricsCaptor();
        if (metricsCaptor != null) {
            handleWithMetrics(message, metricsCaptor);
        } else {
            doHandleMessage(message);
        }
    }

    private void handleWithObservation(Message<?> message, ObservationRegistry observationRegistry) {
        IntegrationObservation.HANDLER.observation(this.observationConvention, DefaultMessageReceiverObservationConvention.INSTANCE, () -> {
            return new MessageReceiverContext(message, getComponentName());
        }, observationRegistry).observe(() -> {
            doHandleMessage(message);
        });
    }

    private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
        SampleFacade start = metricsCaptor.start();
        try {
            doHandleMessage(message);
            start.stop(sendTimer());
        } catch (Exception e) {
            start.stop(buildSendTimer(false, e.getClass().getSimpleName()));
            throw e;
        }
    }

    private void doHandleMessage(Message<?> message) {
        Message<?> message2 = message;
        try {
            if (shouldTrack()) {
                message2 = MessageHistory.write(message2, this, getMessageBuilderFactory());
            }
            handleMessageInternal(message2);
        } catch (Exception e) {
            throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message2, () -> {
                return "error occurred in message handler [" + this + "]";
            }, e);
        }
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        Assert.notNull(subscription, "'subscription' must not be null");
        subscription.request(Long.MAX_VALUE);
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(Message<?> message) {
        handleMessage(message);
    }

    protected abstract void handleMessageInternal(Message<?> message);
}
