package com.purbon.kafka.topology;

import com.purbon.kafka.topology.api.ksql.KsqlApiClient;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.artefact.KsqlArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefacts;
import com.purbon.kafka.topology.model.artefact.KsqlStreamArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlTableArtefact;
import com.purbon.kafka.topology.utils.Either;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/purbon/kafka/topology/KSqlArtefactManager.class */
public class KSqlArtefactManager extends ArtefactManager {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KSqlArtefactManager.class);

    public KSqlArtefactManager(ArtefactClient artefactClient, Configuration configuration, String str) {
        super(artefactClient, configuration, str);
    }

    public KSqlArtefactManager(Map<String, ? extends ArtefactClient> map, Configuration configuration, String str) {
        super(map, configuration, str);
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    protected Collection<? extends Artefact> getLocalState(ExecutionPlan executionPlan) {
        return executionPlan.getKSqlArtefacts();
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    protected List<? extends Artefact> findArtefactsToBeDeleted(Collection<? extends Artefact> collection, Set<Artefact> set) {
        LinkedList linkedList = (LinkedList) collection.stream().filter(artefact -> {
            return !set.contains(artefact);
        }).sorted((artefact2, artefact3) -> {
            return (-1) * ((KsqlArtefact) artefact2).compareTo((KsqlArtefact) artefact3);
        }).collect(Collectors.toCollection(LinkedList::new));
        HashMap hashMap = new HashMap();
        hashMap.put(KsqlApiClient.TABLE_TYPE, new LinkedList());
        hashMap.put(KsqlApiClient.STREAM_TYPE, new LinkedList());
        linkedList.forEach(artefact4 -> {
            if (artefact4 instanceof KsqlTableArtefact) {
                ((LinkedList) hashMap.get(KsqlApiClient.TABLE_TYPE)).add((KsqlArtefact) artefact4);
            } else {
                ((LinkedList) hashMap.get(KsqlApiClient.STREAM_TYPE)).add((KsqlArtefact) artefact4);
            }
        });
        LinkedList linkedList2 = new LinkedList();
        Iterator it = Arrays.asList(KsqlApiClient.TABLE_TYPE, KsqlApiClient.STREAM_TYPE).iterator();
        while (it.hasNext()) {
            Iterator descendingIterator = ((LinkedList) hashMap.get((String) it.next())).descendingIterator();
            Objects.requireNonNull(linkedList2);
            descendingIterator.forEachRemaining((v1) -> {
                r1.add(v1);
            });
        }
        return linkedList2;
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    protected Collection<? extends Artefact> getClustersState() throws IOException {
        List list = (List) this.clients.values().stream().map(artefactClient -> {
            try {
                Collection<? extends Artefact> clusterState = artefactClient.getClusterState();
                return clusterState.isEmpty() ? Either.Right(null) : Either.Right(clusterState);
            } catch (IOException e) {
                return Either.Left(e);
            }
        }).collect(Collectors.toList());
        List list2 = (List) list.stream().filter((v0) -> {
            return v0.isLeft();
        }).map(either -> {
            return (IOException) either.getLeft().get();
        }).collect(Collectors.toList());
        if (list2.size() > 0) {
            throw new IOException((Throwable) list2.get(0));
        }
        return (Collection) list.stream().filter((v0) -> {
            return v0.isRight();
        }).flatMap(either2 -> {
            return ((Collection) either2.getRight().get()).stream();
        }).map(artefact -> {
            if (artefact instanceof KsqlStreamArtefact) {
                return new KsqlStreamArtefact(artefact.getPath(), null, artefact.getName());
            }
            if (artefact instanceof KsqlTableArtefact) {
                return new KsqlTableArtefact(artefact.getPath(), null, artefact.getName());
            }
            LOGGER.error("KSQL Artefact of wrong type " + artefact.getClass());
            return null;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet());
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    Set<Artefact> parseNewArtefacts(Topology topology) {
        return (Set) topology.getProjects().stream().flatMap(project -> {
            KsqlArtefacts ksqlArtefacts = project.getKsqlArtefacts();
            return Stream.concat(Stream.concat(ksqlArtefacts.getStreams().stream(), ksqlArtefacts.getTables().stream()), Collections.singletonList(ksqlArtefacts.getVars()).stream());
        }).sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    boolean isAllowDelete() {
        return this.config.isAllowDeleteKsqlArtefacts();
    }

    @Override // com.purbon.kafka.topology.ArtefactManager
    String rootPath() {
        return Files.isDirectory(Paths.get(this.topologyFileOrDir, new String[0]), new LinkOption[0]) ? this.topologyFileOrDir : new File(this.topologyFileOrDir).getParent();
    }

    @Override // com.purbon.kafka.topology.ExecutionPlanUpdater
    public void printCurrentState(PrintStream printStream) throws IOException {
        printStream.println("List of KSQL Artifacts:");
        Collection<? extends Artefact> clustersState = getClustersState();
        Objects.requireNonNull(printStream);
        clustersState.forEach((v1) -> {
            r1.println(v1);
        });
    }
}
