package com.purbon.kafka.topology;

import com.damnhandy.uri.template.UriTemplate;
import com.purbon.kafka.topology.actions.CreateArtefactAction;
import com.purbon.kafka.topology.actions.DeleteArtefactAction;
import com.purbon.kafka.topology.actions.SyncArtefactAction;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.exceptions.RemoteValidationException;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.KsqlVarsArtefact;
import com.purbon.kafka.topology.model.artefact.TypeArtefact;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/purbon/kafka/topology/ArtefactManager.class */
public abstract class ArtefactManager implements ExecutionPlanUpdater {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) ArtefactManager.class);
    protected Map<String, ArtefactClient> clients;
    protected Configuration config;
    protected String topologyFileOrDir;

    public ArtefactManager(ArtefactClient artefactClient, Configuration configuration, String str) {
        this((Map<String, ? extends ArtefactClient>) Collections.singletonMap("default", artefactClient), configuration, str);
    }

    public ArtefactManager(Map<String, ? extends ArtefactClient> map, Configuration configuration, String str) {
        this.clients = Collections.unmodifiableMap(map);
        this.config = configuration;
        this.topologyFileOrDir = str;
    }

    private boolean findKsqlVarsArtefact(Artefact artefact) {
        return ((Boolean) Optional.ofNullable((TypeArtefact) artefact.getClass().getAnnotation(TypeArtefact.class)).map(typeArtefact -> {
            return Boolean.valueOf(typeArtefact.name().equals("VARS"));
        }).orElse(false)).booleanValue();
    }

    @Override // com.purbon.kafka.topology.ExecutionPlanUpdater
    public void updatePlan(ExecutionPlan executionPlan, Map<String, Topology> map) throws IOException {
        Collection<? extends Artefact> loadActualClusterStateIfAvailable = loadActualClusterStateIfAvailable(executionPlan);
        HashSet hashSet = new HashSet();
        Iterator<Topology> it = map.values().iterator();
        while (it.hasNext()) {
            Set<? extends Artefact> parseNewArtefacts = parseNewArtefacts(it.next());
            KsqlVarsArtefact ksqlVarsArtefact = (KsqlVarsArtefact) parseNewArtefacts.stream().filter(this::findKsqlVarsArtefact).findFirst().orElseGet(() -> {
                return new KsqlVarsArtefact(Collections.emptyMap());
            });
            parseNewArtefacts.removeIf(this::findKsqlVarsArtefact);
            for (Artefact artefact : parseNewArtefacts) {
                Optional<? extends Artefact> findAny = loadActualClusterStateIfAvailable.stream().filter(artefact2 -> {
                    return artefact2.equals(artefact);
                }).findAny();
                if (findAny.isEmpty()) {
                    ArtefactClient selectClient = selectClient(artefact);
                    if (selectClient == null) {
                        throw new IOException("The Artefact " + artefact.getName() + " require a non configured client, please check our configuration");
                    }
                    selectClient.addSessionVars(ksqlVarsArtefact.getSessionVars());
                    executionPlan.add(new CreateArtefactAction(selectClient, rootPath(), loadActualClusterStateIfAvailable, artefact));
                } else if (Objects.equals(findAny.get().getHash(), artefact.getHash())) {
                    continue;
                } else {
                    ArtefactClient selectClient2 = selectClient(artefact);
                    if (selectClient2 == null) {
                        throw new IOException("The Artefact " + artefact.getName() + " require a non configured client, please check our configuration");
                    }
                    executionPlan.add(new SyncArtefactAction(selectClient2, rootPath(), artefact));
                }
                hashSet.add(artefact);
            }
        }
        if (isAllowDelete()) {
            List<? extends Artefact> findArtefactsToBeDeleted = findArtefactsToBeDeleted(loadActualClusterStateIfAvailable, hashSet);
            if (findArtefactsToBeDeleted.size() > 0) {
                LOGGER.debug("Artefacts to be deleted: " + StringUtils.join(findArtefactsToBeDeleted, UriTemplate.DEFAULT_SEPARATOR));
                for (Artefact artefact3 : findArtefactsToBeDeleted) {
                    ArtefactClient selectClient3 = selectClient(artefact3);
                    if (selectClient3 == null) {
                        throw new IOException("The Artefact " + artefact3.getName() + " require a non configured client, please check our configuration");
                    }
                    executionPlan.add(new DeleteArtefactAction(selectClient3, artefact3));
                }
            }
        }
    }

    protected List<? extends Artefact> findArtefactsToBeDeleted(Collection<? extends Artefact> collection, Set<Artefact> set) {
        return (List) collection.stream().filter(artefact -> {
            return !set.contains(artefact);
        }).collect(Collectors.toList());
    }

    protected ArtefactClient selectClient(Artefact artefact) {
        return this.clients.getOrDefault(artefact.getServerLabel(), this.clients.containsKey("default") ? this.clients.get("default") : null);
    }

    protected Collection<? extends Artefact> loadActualClusterStateIfAvailable(ExecutionPlan executionPlan) throws IOException {
        Collection<? extends Artefact> clustersState = this.config.fetchStateFromTheCluster() ? getClustersState() : getLocalState(executionPlan);
        if (!this.config.shouldVerifyRemoteState().booleanValue()) {
            LOGGER.warn("Remote state verification disabled, this is not a good practice, be awarein future versions, this check is going to become mandatory.");
        }
        if (this.config.shouldVerifyRemoteState().booleanValue() && !this.config.fetchStateFromTheCluster()) {
            detectDivergencesInTheRemoteCluster(executionPlan);
        }
        return clustersState;
    }

    private void detectDivergencesInTheRemoteCluster(ExecutionPlan executionPlan) throws IOException {
        Collection<? extends Artefact> clustersState = getClustersState();
        List list = (List) getLocalState(executionPlan).stream().filter(artefact -> {
            return !clustersState.contains(artefact);
        }).collect(Collectors.toList());
        if (list.size() > 0) {
            String str = "Your remote state has changed since the last execution, these Artefact(s): " + StringUtils.join(list, UriTemplate.DEFAULT_SEPARATOR) + " are in your local state, but not in the cluster, please investigate!";
            LOGGER.error(str);
            throw new RemoteValidationException(str);
        }
    }

    protected abstract Collection<? extends Artefact> getLocalState(ExecutionPlan executionPlan);

    protected abstract Collection<? extends Artefact> getClustersState() throws IOException;

    abstract Set<? extends Artefact> parseNewArtefacts(Topology topology);

    abstract boolean isAllowDelete();

    abstract String rootPath();
}
