package com.yahoo.athenz.common.messaging.pulsar;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.athenz.common.messaging.ChangeSubscriber;
import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.class */
public class PulsarChangeSubscriber<T> implements ChangeSubscriber<T> {
    public static final String PROP_MESSAGING_CLI_CONSUMER_TO_SEC = "athenz.messaging_cli.consumer.timeout_sec";
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final PulsarClientImpl pulsarClient;
    private final Consumer<byte[]> consumer;
    protected java.util.function.Consumer<T> processor;
    protected Class<T> valueType;
    private boolean closed = false;
    private Thread subscriberThread;
    private final int rcvMsgTimeout;

    public PulsarChangeSubscriber(String str, String str2, String str3, SubscriptionType subscriptionType, AthenzPulsarClient.TlsConfig tlsConfig) {
        ConsumerConfigurationData<byte[]> defaultConsumerConfig = AthenzPulsarClient.defaultConsumerConfig(Collections.singleton(str2), str3, subscriptionType);
        this.pulsarClient = AthenzPulsarClient.createPulsarClient(str, tlsConfig);
        this.consumer = AthenzPulsarClient.createConsumer(this.pulsarClient, defaultConsumerConfig);
        this.rcvMsgTimeout = Integer.parseInt(System.getProperty(PROP_MESSAGING_CLI_CONSUMER_TO_SEC, "1"));
        LOG.debug("created publisher: {}, pulsarConsumer: {}", getClass(), this.consumer);
    }

    @Override // com.yahoo.athenz.common.messaging.ChangeSubscriber
    public void init(java.util.function.Consumer<T> consumer, Class<T> cls) {
        this.processor = consumer;
        this.valueType = cls;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        this.subscriberThread = Thread.currentThread();
        while (!this.closed) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("looping over the consumer receive method");
            }
            try {
                Message receive = this.consumer.receive(this.rcvMsgTimeout, TimeUnit.SECONDS);
                if (receive != null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("received message: {}", new String(receive.getData()));
                    }
                    this.processor.accept(OBJECT_MAPPER.readValue(receive.getData(), this.valueType));
                    this.consumer.acknowledge(receive);
                }
            } catch (Exception e) {
                LOG.error("exception in receiving the message: {}", e.getMessage(), e);
            }
        }
    }

    @Override // com.yahoo.athenz.common.messaging.ChangeSubscriber
    public void close() {
        this.closed = true;
        this.subscriberThread.interrupt();
        try {
            this.consumer.close();
            this.pulsarClient.shutdown();
        } catch (PulsarClientException e) {
            LOG.error("Got exception while closing pulsar consumer: {}", e.getMessage(), e);
        }
    }
}
