package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConsumerFactory;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Locale;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/EventHubReactorSession.class */
class EventHubReactorSession extends ReactorSession implements EventHubSession {
    private static final Symbol EPOCH = Symbol.valueOf("com.microsoft:epoch");
    private static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf("com.microsoft:enable-receiver-runtime-metric");
    private static final ClientLogger LOGGER = new ClientLogger(EventHubReactorSession.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, String str, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, AmqpLinkProvider amqpLinkProvider, Mono<ClaimsBasedSecurityNode> mono, TokenManagerProvider tokenManagerProvider, AmqpRetryOptions amqpRetryOptions, MessageSerializer messageSerializer) {
        super(amqpConnection, session, sessionHandler, str, reactorProvider, reactorHandlerProvider, amqpLinkProvider, mono, tokenManagerProvider, messageSerializer, amqpRetryOptions);
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubSession
    public Mono<AmqpSendLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, String str3) {
        Objects.requireNonNull(str, "'linkName' cannot be null.");
        Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        Objects.requireNonNull(duration, "'timeout' cannot be null.");
        Objects.requireNonNull(str3, "'clientIdentifier' cannot be null.");
        HashMap hashMap = new HashMap();
        hashMap.put(AmqpConstants.CLIENT_IDENTIFIER, str3);
        return createProducer(str, str2, duration, amqpRetryPolicy, hashMap).cast(AmqpSendLink.class);
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubSession
    public Mono<AmqpReceiveLink> createConsumer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, EventPosition eventPosition, ReceiveOptions receiveOptions, String str3) {
        Objects.requireNonNull(str, "'linkName' cannot be null.");
        Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        Objects.requireNonNull(duration, "'timeout' cannot be null.");
        Objects.requireNonNull(amqpRetryPolicy, "'retry' cannot be null.");
        Objects.requireNonNull(eventPosition, "'eventPosition' cannot be null.");
        Objects.requireNonNull(receiveOptions, "'options' cannot be null.");
        Objects.requireNonNull(str3, "'clientIdentifier' cannot be null.");
        String expression = getExpression(eventPosition);
        HashMap hashMap = new HashMap();
        hashMap.put(AmqpConstants.STRING_FILTER, new UnknownDescribedType(AmqpConstants.STRING_FILTER, expression));
        HashMap hashMap2 = new HashMap();
        if (receiveOptions.getOwnerLevel() != null) {
            hashMap2.put(EPOCH, receiveOptions.getOwnerLevel());
        }
        hashMap2.put(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER, str3);
        return createConsumer(str, str2, duration, amqpRetryPolicy, hashMap, hashMap2, receiveOptions.getTrackLastEnqueuedEventProperties() ? new Symbol[]{ENABLE_RECEIVER_RUNTIME_METRIC_NAME} : null, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND, new ConsumerFactory());
    }

    private String getExpression(EventPosition eventPosition) {
        String str = eventPosition.isInclusive() ? "=" : "";
        if (eventPosition.getOffset() != null) {
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.OFFSET_ANNOTATION_NAME.getValue(), str, eventPosition.getOffset());
        }
        if (eventPosition.getSequenceNumber() != null) {
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), str, eventPosition.getSequenceNumber());
        }
        if (eventPosition.getEnqueuedDateTime() == null) {
            throw LOGGER.logExceptionAsError(new IllegalArgumentException("No starting position was set."));
        }
        try {
            return String.format("amqp.annotation.%s >%s '%s'", AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), str, Long.toString(eventPosition.getEnqueuedDateTime().toEpochMilli()));
        } catch (ArithmeticException e) {
            throw LOGGER.logExceptionAsError(new IllegalArgumentException(String.format(Locale.ROOT, "Event position for enqueued DateTime could not be parsed. Value: '%s'", eventPosition.getEnqueuedDateTime()), e));
        }
    }
}
