package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.WindowedSubscriber;
import com.azure.core.util.Context;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/azure/messaging/eventhubs/SynchronousReceiver.class */
final class SynchronousReceiver {
    private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
    private static final WindowedSubscriber<PartitionEvent> DISPOSED = Flux.error(new RuntimeException("Disposed.")).subscribeWith(new WindowedSubscriber(new HashMap(0), TERMINAL_MESSAGE, new WindowedSubscriber.WindowedSubscriberOptions()));
    private static final String SYNC_RECEIVE_SPAN_NAME = "EventHubs.receiveFromPartition";
    private final ClientLogger logger;
    private final EventHubConsumerAsyncClient asyncClient;
    private final EventHubsTracer tracer;
    private final AtomicReference<WindowedSubscriber<PartitionEvent>> subscriber = new AtomicReference<>(null);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousReceiver(ClientLogger clientLogger, EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
        this.logger = (ClientLogger) Objects.requireNonNull(clientLogger, "'logger' cannot be null.");
        this.asyncClient = (EventHubConsumerAsyncClient) Objects.requireNonNull(eventHubConsumerAsyncClient, "'asyncClient' cannot be null.");
        this.tracer = eventHubConsumerAsyncClient.getInstrumentation().getTracer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IterableStream<PartitionEvent> receive(String str, EventPosition eventPosition, ReceiveOptions receiveOptions, int i, Duration duration) {
        Objects.requireNonNull(str, "'partitionId' cannot be null.");
        Objects.requireNonNull(eventPosition, "'startingPosition' cannot be null.");
        Objects.requireNonNull(receiveOptions, "'receiveOptions' cannot be null.");
        WindowedSubscriber<PartitionEvent> windowedSubscriber = this.subscriber.get();
        return windowedSubscriber != null ? windowedSubscriber.enqueueRequest(i, duration) : subscribeOnce(str, eventPosition, receiveOptions).enqueueRequest(i, duration);
    }

    void dispose() {
        WindowedSubscriber<PartitionEvent> andSet = this.subscriber.getAndSet(DISPOSED);
        if (andSet != null) {
            andSet.dispose();
        }
    }

    private WindowedSubscriber<PartitionEvent> subscribeOnce(String str, EventPosition eventPosition, ReceiveOptions receiveOptions) {
        if (!this.asyncClient.isV2()) {
            throw this.logger.logExceptionAsError(new UnsupportedOperationException("SynchronousReceiver requires v2 mode."));
        }
        WindowedSubscriber<PartitionEvent> createSubscriber = createSubscriber(str);
        if (this.subscriber.compareAndSet(null, createSubscriber)) {
            this.asyncClient.receiveFromPartition(str, eventPosition, receiveOptions).subscribeWith(createSubscriber);
        }
        return this.subscriber.get();
    }

    private WindowedSubscriber<PartitionEvent> createSubscriber(String str) {
        WindowedSubscriber.WindowedSubscriberOptions windowedSubscriberOptions = new WindowedSubscriber.WindowedSubscriberOptions();
        windowedSubscriberOptions.setWindowDecorator(flux -> {
            return this.tracer.reportSyncReceiveSpan(SYNC_RECEIVE_SPAN_NAME, this.tracer.isEnabled() ? Instant.now() : null, flux, Context.NONE);
        });
        return new WindowedSubscriber<>(Collections.singletonMap(ClientConstants.PARTITION_ID_KEY, str), TERMINAL_MESSAGE, windowedSubscriberOptions);
    }
}
