package io.debezium.kafka;

import io.debezium.annotation.ThreadSafe;
import io.debezium.util.IoUtil;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.zk.AdminZkClient;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

@ThreadSafe
/* loaded from: input_file:io/debezium/kafka/KafkaServer.class */
public class KafkaServer {
    public static final int DEFAULT_BROKER_ID = 1;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServer.class);
    private final Supplier<String> zkConnection;
    private final int brokerId;
    private volatile File logsDir;
    private final Properties config;
    private volatile int desiredPort;
    private volatile int port;
    private volatile kafka.server.KafkaServer server;
    private volatile AdminZkClient adminZkClient;

    public KafkaServer(Supplier<String> supplier) {
        this(supplier, 1);
    }

    public KafkaServer(Supplier<String> supplier, int i) {
        this(supplier, i, -1);
    }

    public KafkaServer(Supplier<String> supplier, int i, int i2) {
        this.desiredPort = -1;
        this.port = -1;
        if (supplier == null) {
            throw new IllegalArgumentException("The Zookeeper connection string supplier may not be null");
        }
        this.zkConnection = supplier;
        this.brokerId = i;
        this.config = new Properties();
        setPort(i2);
        populateDefaultConfiguration(this.config);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int brokerId() {
        return this.brokerId;
    }

    protected String zookeeperConnection() {
        return this.zkConnection.get();
    }

    protected void populateDefaultConfiguration(Properties properties) {
        this.config.setProperty(KafkaConfig.NumPartitionsProp(), String.valueOf(1));
        this.config.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(Long.MAX_VALUE));
    }

    public KafkaServer setProperty(String str, String str2) {
        if (this.server != null) {
            throw new IllegalStateException("Unable to change the properties when already running");
        }
        if (!KafkaConfig.ZkConnectProp().equalsIgnoreCase(str) && !KafkaConfig.BrokerIdProp().equalsIgnoreCase(str)) {
            this.config.setProperty(str, str2);
        }
        return this;
    }

    public KafkaServer setProperties(Properties properties) {
        if (this.server != null) {
            throw new IllegalStateException("Unable to change the properties when already running");
        }
        properties.stringPropertyNames().forEach(str -> {
            setProperty(str, properties.getProperty(str));
        });
        return this;
    }

    public KafkaServer setPort(int i) {
        this.desiredPort = i > 0 ? i : -1;
        this.port = this.desiredPort;
        return this;
    }

    public Properties config() {
        Properties properties = new Properties();
        properties.putAll(this.config);
        properties.setProperty(KafkaConfig.ZkConnectProp(), zookeeperConnection());
        properties.setProperty(KafkaConfig.BrokerIdProp(), Integer.toString(this.brokerId));
        properties.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), String.valueOf(this.config.getOrDefault(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE)));
        properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp(), Integer.toString(1));
        properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp(), Integer.toString(0));
        return properties;
    }

    public String getConnection() {
        return "localhost:" + this.port;
    }

    public synchronized KafkaServer startup() {
        if (this.server != null) {
            throw new IllegalStateException(String.valueOf(this) + " is already running");
        }
        Properties config = config();
        if (this.logsDir == null) {
            try {
                File createTempFile = File.createTempFile("kafka", "suffix");
                this.logsDir = createTempFile.getParentFile();
                createTempFile.delete();
            } catch (IOException e) {
                throw new RuntimeException("Unable to create temporary directory", e);
            }
        }
        config.setProperty(KafkaConfig.LogDirProp(), this.logsDir.getAbsolutePath());
        config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
        this.port = this.desiredPort > 0 ? this.desiredPort : IoUtil.getAvailablePort();
        config.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + this.port);
        try {
            LOGGER.debug("Starting Kafka broker {} at {} with storage in {}", new Object[]{Integer.valueOf(this.brokerId), getConnection(), this.logsDir.getAbsolutePath()});
            KafkaConfig kafkaConfig = new KafkaConfig(config);
            this.server = new kafka.server.KafkaServer(kafkaConfig, Time.SYSTEM, Option.apply((Object) null), false);
            this.server.startup();
            LOGGER.info("Started Kafka server {} at {} with storage in {}", new Object[]{Integer.valueOf(this.brokerId), getConnection(), this.logsDir.getAbsolutePath()});
            this.adminZkClient = new AdminZkClient(this.server.zkClient(), Option.apply(kafkaConfig));
            return this;
        } catch (RuntimeException e2) {
            this.server = null;
            throw e2;
        }
    }

    public synchronized void shutdown(boolean z) {
        if (this.server != null) {
            try {
                this.server.shutdown();
                if (z) {
                    ((Iterable) JavaConverters.asJavaIterableConverter(this.server.logManager().allLogs()).asJava()).forEach(unifiedLog -> {
                        unifiedLog.delete();
                    });
                }
                LOGGER.info("Stopped Kafka server {} at {}", Integer.valueOf(this.brokerId), getConnection());
            } finally {
                this.server = null;
                this.adminZkClient = null;
                this.port = this.desiredPort;
            }
        }
    }

    public synchronized void deleteData() {
        if (this.server == null) {
            try {
                IoUtil.delete(this.logsDir);
            } catch (IOException e) {
                LOGGER.error("Unable to delete directory '{}'", this.logsDir, e);
            }
        }
    }

    public AdminZkClient getAdminZkClient() {
        return this.adminZkClient;
    }

    public void createTopics(String... strArr) {
        createTopics(1, 1, strArr);
    }

    public void createTopics(int i, int i2, String... strArr) {
        for (String str : strArr) {
            if (str != null) {
                createTopic(str, i, i2);
            }
        }
    }

    public void createTopic(String str, int i, int i2) {
        getAdminZkClient().createTopic(str, i, i2, new Properties(), (RackAwareMode) null, false);
    }

    void onEachDirectory(Consumer<File> consumer) {
        consumer.accept(getStateDirectory());
    }

    public File getStateDirectory() {
        return this.logsDir;
    }

    public void setStateDirectory(File file) {
        if (file != null && file.exists() && !file.isDirectory() && !file.canWrite() && !file.canRead()) {
            throw new IllegalArgumentException("The directory must be readable and writable");
        }
        this.logsDir = file;
    }

    public String toString() {
        return "KafkaServer{" + getConnection() + "}";
    }
}
