package com.purbon.kafka.topology;

import com.damnhandy.uri.template.UriTemplate;
import com.purbon.kafka.topology.actions.topics.CreateTopicAction;
import com.purbon.kafka.topology.actions.topics.DeleteTopics;
import com.purbon.kafka.topology.actions.topics.RegisterSchemaAction;
import com.purbon.kafka.topology.actions.topics.TopicConfigUpdatePlan;
import com.purbon.kafka.topology.actions.topics.UpdateTopicConfigAction;
import com.purbon.kafka.topology.actions.topics.builders.TopicConfigUpdatePlanBuilder;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.schemas.SchemaRegistryManager;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/purbon/kafka/topology/TopicManager.class */
public class TopicManager implements ExecutionPlanUpdater {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) TopicManager.class);
    public static final String NUM_PARTITIONS = "num.partitions";
    public static final String REPLICATION_FACTOR = "replication.factor";
    private final SchemaRegistryManager schemaRegistryManager;
    private final TopologyBuilderAdminClient adminClient;
    private final Configuration config;
    private List<String> internalTopicPrefixes;
    private final List<String> managedPrefixes;

    public TopicManager(TopologyBuilderAdminClient topologyBuilderAdminClient, SchemaRegistryManager schemaRegistryManager) {
        this(topologyBuilderAdminClient, schemaRegistryManager, new Configuration());
    }

    public TopicManager(TopologyBuilderAdminClient topologyBuilderAdminClient, SchemaRegistryManager schemaRegistryManager, Configuration configuration) {
        this.adminClient = topologyBuilderAdminClient;
        this.schemaRegistryManager = schemaRegistryManager;
        this.config = configuration;
        this.internalTopicPrefixes = new ArrayList();
        this.managedPrefixes = configuration.getTopicManagedPrefixes();
    }

    @Override // com.purbon.kafka.topology.ExecutionPlanUpdater
    public void updatePlan(ExecutionPlan executionPlan, Map<String, Topology> map) throws IOException {
        this.internalTopicPrefixes = this.config.getKafkaInternalTopicPrefixes(map.values());
        Set<String> loadActualClusterStateIfAvailable = loadActualClusterStateIfAvailable(executionPlan);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Iterator<Topology> it = map.values().iterator();
        while (it.hasNext()) {
            parseMapOfTopics(it.next()).forEach((str, topic) -> {
                if (loadActualClusterStateIfAvailable.contains(str)) {
                    TopicConfigUpdatePlan createTopicConfigUpdatePlan = new TopicConfigUpdatePlanBuilder(this.adminClient).createTopicConfigUpdatePlan(topic, str);
                    if (createTopicConfigUpdatePlan.hasConfigChanges()) {
                        hashSet2.add(new UpdateTopicConfigAction(this.adminClient, createTopicConfigUpdatePlan));
                    }
                } else {
                    hashSet.add(new CreateTopicAction(this.adminClient, topic, str));
                }
                hashMap.put(str, topic);
            });
        }
        Objects.requireNonNull(executionPlan);
        hashSet.forEach(executionPlan::add);
        Objects.requireNonNull(executionPlan);
        hashSet2.forEach(executionPlan::add);
        hashMap.forEach((str2, topic2) -> {
            executionPlan.add(new RegisterSchemaAction(this.schemaRegistryManager, topic2, str2));
        });
        if (this.config.isAllowDeleteTopics()) {
            List list = (List) loadActualClusterStateIfAvailable.stream().filter(str3 -> {
                return (hashMap.containsKey(str3) || isAnInternalTopics(str3)) ? false : true;
            }).collect(Collectors.toList());
            if (list.size() > 0) {
                LOGGER.debug("Topic to be deleted: " + StringUtils.join(list, UriTemplate.DEFAULT_SEPARATOR));
                executionPlan.add(new DeleteTopics(this.adminClient, list));
            }
        }
    }

    private Map<String, Topic> parseMapOfTopics(Topology topology) {
        return (Map) Stream.concat(topology.getProjects().stream().flatMap(project -> {
            return project.getTopics().stream();
        }).filter(this::matchesPrefixList), topology.getSpecialTopics().stream().filter(this::matchesPrefixList)).collect(Collectors.toMap((v0) -> {
            return v0.toString();
        }, topic -> {
            return topic;
        }));
    }

    private boolean isAnInternalTopics(String str) {
        Stream<String> stream = this.internalTopicPrefixes.stream();
        Objects.requireNonNull(str);
        return stream.anyMatch(str::startsWith);
    }

    private Set<String> loadActualClusterStateIfAvailable(ExecutionPlan executionPlan) throws IOException {
        Set<String> set = (Set) (this.config.fetchTopicStateFromTheCluster() ? this.adminClient.listApplicationTopics() : executionPlan.getTopics()).stream().filter(this::matchesPrefixList).collect(Collectors.toSet());
        if (set.size() > 0) {
            LOGGER.debug("Full list of managed topics in the cluster: " + StringUtils.join(new ArrayList(set), UriTemplate.DEFAULT_SEPARATOR));
        }
        if (!this.config.shouldVerifyRemoteState().booleanValue()) {
            LOGGER.warn("Remote state verification disabled, this is not a good practice, be awarein future versions, this check is going to become mandatory.");
        }
        if (this.config.shouldVerifyRemoteState().booleanValue() && !this.config.fetchStateFromTheCluster()) {
            detectDivergencesInTheRemoteCluster(executionPlan);
        }
        return set;
    }

    private void detectDivergencesInTheRemoteCluster(ExecutionPlan executionPlan) throws IOException {
        Set<String> listApplicationTopics = this.adminClient.listApplicationTopics();
        List list = (List) executionPlan.getTopics().stream().filter(str -> {
            return !listApplicationTopics.contains(str);
        }).collect(Collectors.toList());
        if (list.size() > 0) {
            String str2 = "Your remote state has changed since the last execution, this topics: " + StringUtils.join(list, UriTemplate.DEFAULT_SEPARATOR) + " are in your local state, but not in the cluster, please investigate!";
            LOGGER.error(str2);
            throw new RemoteValidationException(str2);
        }
    }

    private boolean matchesPrefixList(Topic topic) {
        return matchesPrefixList(topic.toString());
    }

    private boolean matchesPrefixList(String str) {
        boolean z;
        if (this.managedPrefixes.size() != 0) {
            Stream<String> stream = this.managedPrefixes.stream();
            Objects.requireNonNull(str);
            if (!stream.anyMatch(str::startsWith)) {
                z = false;
                boolean z2 = z;
                LOGGER.debug(String.format("Topic %s matches %s with $s", str, Boolean.valueOf(z2), this.managedPrefixes));
                return z2;
            }
        }
        z = true;
        boolean z22 = z;
        LOGGER.debug(String.format("Topic %s matches %s with $s", str, Boolean.valueOf(z22), this.managedPrefixes));
        return z22;
    }

    @Override // com.purbon.kafka.topology.ExecutionPlanUpdater
    public void printCurrentState(PrintStream printStream) throws IOException {
        printStream.println("List of Topics:");
        Set<String> listTopics = this.adminClient.listTopics();
        Objects.requireNonNull(printStream);
        listTopics.forEach(printStream::println);
    }

    public void close() {
        this.adminClient.close();
    }
}
