package com.yahoo.athenz.common.messaging.pulsar.client;

import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.class */
public class AthenzPulsarClient {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static final String PROP_PULSAR_MAX_PENDING_MSGS = "athenz.pulsar.max_pending_msgs";
    public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS = "athenz.pulsar.pulsar_client_class";
    public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT = "com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient";

    /* loaded from: input_file:com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient$TlsConfig.class */
    public static class TlsConfig {
        String tlsCertFilePath;
        String tlsKeyFilePath;
        String tlsTrustCertsFilePath;

        public TlsConfig(String str, String str2, String str3) {
            this.tlsCertFilePath = str;
            this.tlsKeyFilePath = str2;
            this.tlsTrustCertsFilePath = str3;
        }
    }

    public static PulsarClientImpl createPulsarClient(String str, TlsConfig tlsConfig) {
        if (tlsConfig == null || tlsConfig.tlsCertFilePath == null || tlsConfig.tlsKeyFilePath == null || tlsConfig.tlsTrustCertsFilePath == null) {
            throw new IllegalArgumentException("invalid tls configured");
        }
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("invalid service configured");
        }
        try {
            return createAthenzPulsarClientInstance().getPulsarClient(str, getClientConfiguration(tlsConfig));
        } catch (PulsarClientException e) {
            LOG.error("Failed to create pulsar client: {}", e.getMessage(), e);
            throw new IllegalStateException("failed to create pulsar client");
        }
    }

    public static ProducerConfigurationData defaultProducerConfig(String str) {
        int parseInt = Integer.parseInt(System.getProperty(PROP_PULSAR_MAX_PENDING_MSGS, "10000"));
        ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
        producerConfigurationData.setBlockIfQueueFull(true);
        producerConfigurationData.setMaxPendingMessages(parseInt);
        producerConfigurationData.setTopicName(str);
        return producerConfigurationData;
    }

    public static Producer<byte[]> createProducer(String str, String str2, TlsConfig tlsConfig) {
        return createProducer(createPulsarClient(str, tlsConfig), defaultProducerConfig(str2));
    }

    public static Producer<byte[]> createProducer(PulsarClientImpl pulsarClientImpl, ProducerConfigurationData producerConfigurationData) {
        return createProducer(pulsarClientImpl, producerConfigurationData, Schema.BYTES);
    }

    public static <T> Producer<T> createProducer(PulsarClientImpl pulsarClientImpl, ProducerConfigurationData producerConfigurationData, Schema<T> schema) {
        if (producerConfigurationData.getTopicName() == null || producerConfigurationData.getTopicName().isEmpty()) {
            throw new IllegalArgumentException("invalid topic configured");
        }
        try {
            return (Producer) pulsarClientImpl.createProducerAsync(producerConfigurationData, schema).get();
        } catch (InterruptedException e) {
            LOG.error("Failed to create pulsar producer, thread was interrupt: {}", e.getMessage(), e);
            Thread.currentThread().interrupt();
            throw new IllegalStateException("failed to create pulsar producer");
        } catch (ExecutionException e2) {
            LOG.error("Failed to create pulsar producer: {}", e2.getMessage(), e2);
            throw new IllegalStateException("failed to create pulsar producer");
        }
    }

    private static ClientConfigurationData getClientConfiguration(TlsConfig tlsConfig) {
        ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
        clientConfigurationData.setAuthentication(new AuthenticationTls(tlsConfig.tlsCertFilePath, tlsConfig.tlsKeyFilePath));
        clientConfigurationData.setTlsAllowInsecureConnection(false);
        clientConfigurationData.setTlsHostnameVerificationEnable(true);
        clientConfigurationData.setTlsTrustCertsFilePath(tlsConfig.tlsTrustCertsFilePath);
        clientConfigurationData.setUseTls(true);
        return clientConfigurationData;
    }

    private static AthenzPulsarClient createAthenzPulsarClientInstance() {
        try {
            return (AthenzPulsarClient) Class.forName(System.getProperty(PROP_ATHENZ_PULSAR_CLIENT_CLASS, PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT)).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    protected PulsarClientImpl getPulsarClient(String str, ClientConfigurationData clientConfigurationData) throws PulsarClientException {
        clientConfigurationData.setServiceUrl(str);
        return new PulsarClientImpl(clientConfigurationData);
    }

    public static ConsumerConfigurationData<byte[]> defaultConsumerConfig(Set<String> set, String str, SubscriptionType subscriptionType) {
        if (subscriptionType == null) {
            throw new IllegalArgumentException("invalid subscription type configured");
        }
        ConsumerConfigurationData<byte[]> consumerConfigurationData = new ConsumerConfigurationData<>();
        consumerConfigurationData.setSubscriptionType(subscriptionType);
        consumerConfigurationData.setSubscriptionName(str);
        consumerConfigurationData.setTopicNames(set);
        consumerConfigurationData.setPoolMessages(true);
        return consumerConfigurationData;
    }

    public static Consumer<byte[]> createConsumer(String str, Set<String> set, String str2, SubscriptionType subscriptionType, TlsConfig tlsConfig) {
        return createConsumer(createPulsarClient(str, tlsConfig), defaultConsumerConfig(set, str2, subscriptionType));
    }

    public static Consumer<byte[]> createConsumer(PulsarClientImpl pulsarClientImpl, ConsumerConfigurationData<byte[]> consumerConfigurationData) {
        return createConsumer(pulsarClientImpl, consumerConfigurationData, Schema.BYTES);
    }

    public static <T> Consumer<T> createConsumer(PulsarClientImpl pulsarClientImpl, ConsumerConfigurationData<T> consumerConfigurationData, Schema<T> schema) {
        validateConsumerConfiguration(consumerConfigurationData);
        try {
            return (Consumer) pulsarClientImpl.subscribeAsync(consumerConfigurationData, schema, (ConsumerInterceptors) null).get();
        } catch (InterruptedException e) {
            LOG.error("Failed to create pulsar consumer, thread was interrupt: {}", e.getMessage(), e);
            Thread.currentThread().interrupt();
            throw new IllegalStateException("failed to create pulsar consumer");
        } catch (ExecutionException e2) {
            LOG.error("Failed to create pulsar consumer: {}", e2.getMessage(), e2);
            throw new IllegalStateException("failed to create pulsar consumer");
        }
    }

    private static <T> void validateConsumerConfiguration(ConsumerConfigurationData<T> consumerConfigurationData) {
        if (consumerConfigurationData.getSubscriptionName() == null) {
            throw new IllegalArgumentException("invalid subscription name configured");
        }
        if (consumerConfigurationData.getTopicNames() == null || consumerConfigurationData.getTopicNames().isEmpty()) {
            throw new IllegalArgumentException("invalid topic configured");
        }
        Iterator it = consumerConfigurationData.getTopicNames().iterator();
        while (it.hasNext()) {
            if (((String) it.next()) == null) {
                throw new IllegalArgumentException("invalid topic configured");
            }
        }
    }
}
