package org.zodiac.monitor.logging.appender.log4j;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.AbstractManager;
import org.apache.logging.log4j.core.util.Log4jThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zodiac.monitor.logging.appender.DefaultKafkaProducerFactory;
import org.zodiac.monitor.logging.appender.KafkaProducerFactory;

/* loaded from: input_file:org/zodiac/monitor/logging/appender/log4j/AdvancedKafkaManager.class */
public class AdvancedKafkaManager extends AbstractManager {
    public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
    private final Properties config;
    private Producer<byte[], byte[]> producer;
    private static final Logger LOGGER = LoggerFactory.getLogger(AdvancedKafkaManager.class);
    private static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
    private static final ExecutorService SEND_MESSAGE_EXECUTOR = Executors.newSingleThreadExecutor();

    public AdvancedKafkaManager(LoggerContext loggerContext, String str, String str2) {
        super(loggerContext, str);
        this.config = new Properties();
        this.config.put("bootstrap.servers", str2);
        this.config.put("acks", "all");
        this.config.put("retries", 0);
        this.config.put("batch.size", 16384);
        this.config.put("linger.ms", 1);
        this.config.put("buffer.memory", 33554432);
        this.config.put("key.serializer", StringSerializer.class);
        this.config.put("value.serializer", ByteArraySerializer.class);
    }

    private void closeProducer(long j, TimeUnit timeUnit) {
        if (this.producer != null) {
            Log4jThread log4jThread = new Log4jThread(new Runnable() { // from class: org.zodiac.monitor.logging.appender.log4j.AdvancedKafkaManager.1
                @Override // java.lang.Runnable
                public void run() {
                    if (AdvancedKafkaManager.this.producer != null) {
                        AdvancedKafkaManager.this.producer.close();
                    }
                }
            }, "AdvancedKafkaManager-CloseThread");
            log4jThread.setDaemon(true);
            log4jThread.start();
            try {
                log4jThread.join(timeUnit.toMillis(j));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public boolean releaseSub(long j, TimeUnit timeUnit) {
        if (j <= 0) {
            return true;
        }
        closeProducer(j, timeUnit);
        return true;
    }

    public void send(final String str, final byte[] bArr) throws ExecutionException, InterruptedException, TimeoutException {
        SEND_MESSAGE_EXECUTOR.submit(new Runnable() { // from class: org.zodiac.monitor.logging.appender.log4j.AdvancedKafkaManager.2
            @Override // java.lang.Runnable
            public void run() {
                if (AdvancedKafkaManager.this.producer != null) {
                    try {
                        AdvancedKafkaManager.this.producer.send(new ProducerRecord(str, bArr));
                    } catch (Exception e) {
                        AdvancedKafkaManager.LOGGER.warn("Unable to send message to Kafka", e);
                    }
                }
            }
        });
    }

    public void startup() {
        this.producer = producerFactory.newKafkaProducer(this.config);
    }
}
