package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

@Deprecated
/* loaded from: input_file:com/azure/messaging/servicebus/AutoDispositionLockRenew.class */
final class AutoDispositionLockRenew extends FluxOperator<ServiceBusReceivedMessage, ServiceBusReceivedMessage> {
    private final ClientLogger logger;
    private final ServiceBusReceiverAsyncClient client;
    private final boolean enableAutoDisposition;
    private final boolean enableAutoLockRenew;
    private final Semaphore dispositionLock;

    /* loaded from: input_file:com/azure/messaging/servicebus/AutoDispositionLockRenew$Subscriber.class */
    private static final class Subscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
        private final ClientLogger logger;
        private final ServiceBusReceiverAsyncClient client;
        private final boolean enableAutoDisposition;
        private final boolean enableAutoLockRenew;
        private final Semaphore dispositionLock;
        private final CoreSubscriber<? super ServiceBusReceivedMessage> downstream;

        Subscriber(ClientLogger clientLogger, ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient, boolean z, boolean z2, Semaphore semaphore, CoreSubscriber<? super ServiceBusReceivedMessage> coreSubscriber) {
            this.logger = clientLogger;
            this.client = serviceBusReceiverAsyncClient;
            this.enableAutoDisposition = z;
            this.enableAutoLockRenew = z2;
            this.dispositionLock = semaphore;
            this.downstream = coreSubscriber;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            this.logger.atInfo().log("Subscription received. Subscribing downstream. {}", new Object[]{subscription});
            this.downstream.onSubscribe(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            Disposable beginLockRenewal = this.enableAutoLockRenew ? this.client.beginLockRenewal(serviceBusReceivedMessage) : Disposables.disposed();
            String valueOf = serviceBusReceivedMessage != null ? String.valueOf(serviceBusReceivedMessage.getSequenceNumber()) : "n/a";
            this.logger.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("onNext: Passing message downstream.");
            if (this.enableAutoDisposition) {
                try {
                    this.dispositionLock.acquire();
                } catch (InterruptedException e) {
                    this.logger.atInfo().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("Unable to acquire dispositionLock.", new Object[]{e});
                }
            }
            try {
                try {
                    this.downstream.onNext(serviceBusReceivedMessage);
                    disposition(serviceBusReceivedMessage, valueOf, true);
                    if (this.enableAutoDisposition) {
                        this.dispositionLock.release();
                        beginLockRenewal.dispose();
                    }
                    this.logger.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("onNext: Finished.");
                } catch (Exception e2) {
                    this.logger.atError().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("Error occurred when downstream processing message.", new Object[]{e2});
                    disposition(serviceBusReceivedMessage, valueOf, false);
                    if (this.enableAutoDisposition) {
                        this.dispositionLock.release();
                        beginLockRenewal.dispose();
                    }
                    this.logger.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("onNext: Finished.");
                }
            } catch (Throwable th) {
                if (this.enableAutoDisposition) {
                    this.dispositionLock.release();
                    beginLockRenewal.dispose();
                }
                this.logger.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, valueOf).log("onNext: Finished.");
                throw th;
            }
        }

        protected void hookOnError(Throwable th) {
            this.logger.atInfo().log("Propagating upstream error signal to downstream.", new Object[]{th});
            this.downstream.onError(th);
        }

        protected void hookOnComplete() {
            this.logger.atInfo().log("Propagating upstream completion signal to downstream.");
            this.downstream.onComplete();
        }

        public Context currentContext() {
            return this.downstream.currentContext();
        }

        private void disposition(ServiceBusReceivedMessage serviceBusReceivedMessage, String str, boolean z) {
            if (this.enableAutoDisposition) {
                if (serviceBusReceivedMessage == null || !serviceBusReceivedMessage.isSettled()) {
                    try {
                        if (z) {
                            this.client.complete(serviceBusReceivedMessage).block();
                        } else {
                            this.client.abandon(serviceBusReceivedMessage).block();
                        }
                    } catch (Exception e) {
                        LoggingEventBuilder addKeyValue = this.logger.atWarning().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, str);
                        Object[] objArr = new Object[2];
                        objArr[0] = z ? "Complete" : "Abandon";
                        objArr[1] = e;
                        addKeyValue.log("Unable to '{}' message, cancelling the message streaming.", objArr);
                        upstream().cancel();
                        onError(e);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AutoDispositionLockRenew(Flux<? extends ServiceBusReceivedMessage> flux, ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient, boolean z, boolean z2, Semaphore semaphore) {
        super(flux);
        HashMap hashMap = new HashMap(2);
        hashMap.put("namespace", serviceBusReceiverAsyncClient.getFullyQualifiedNamespace());
        hashMap.put("entityPath", serviceBusReceiverAsyncClient.getEntityPath());
        this.logger = new ClientLogger(AutoDispositionLockRenew.class, hashMap);
        this.client = serviceBusReceiverAsyncClient;
        this.enableAutoDisposition = z;
        this.enableAutoLockRenew = z2;
        this.dispositionLock = semaphore;
    }

    public void subscribe(CoreSubscriber<? super ServiceBusReceivedMessage> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'actual' cannot be null.");
        this.source.subscribe(new Subscriber(this.logger, this.client, this.enableAutoDisposition, this.enableAutoLockRenew, this.dispositionLock, coreSubscriber));
    }
}
