package com.purbon.kafka.topology.api.ksql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.artefact.KsqlArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlStreamArtefact;
import com.purbon.kafka.topology.model.artefact.KsqlTableArtefact;
import com.purbon.kafka.topology.utils.JSON;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/purbon/kafka/topology/api/ksql/KsqlApiClient.class */
public class KsqlApiClient implements ArtefactClient {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KsqlApiClient.class);
    private final URL server;
    private final Client client;
    public static final String QUERY_TYPE = "query";
    public static final String STREAM_TYPE = "stream";
    public static final String TABLE_TYPE = "table";

    public KsqlApiClient(KsqlClientConfig ksqlClientConfig) {
        this.server = ksqlClientConfig.getServer();
        ClientOptions port = ClientOptions.create().setHost(this.server.getHost()).setPort(this.server.getPort());
        if (this.server.getProtocol() != null && this.server.getProtocol().equals("https")) {
            port.setUseTls(true);
        }
        port.setUseAlpn(ksqlClientConfig.useAlpn());
        port.setKeyStore(ksqlClientConfig.getKeyStore());
        port.setKeyStorePassword(ksqlClientConfig.getKeyStorePassword());
        port.setTrustStore(ksqlClientConfig.getTrustStore());
        port.setTrustStorePassword(ksqlClientConfig.getTrustStorePassword());
        port.setVerifyHost(ksqlClientConfig.isVerifyHost());
        if (ksqlClientConfig.useBasicAuth()) {
            port.setBasicAuthCredentials(ksqlClientConfig.getBasicAuth().getUser(), ksqlClientConfig.getBasicAuth().getPassword());
        }
        this.client = Client.create(port);
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public String getServer() {
        return this.server.toString();
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public void addSessionVars(Map<String, String> map) {
        Client client = this.client;
        Objects.requireNonNull(client);
        map.forEach((v1, v2) -> {
            r1.define(v1, v2);
        });
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Map<String, Object> add(String str) throws IOException {
        try {
            if (isCreateWithoutReplace(str)) {
                LOGGER.warn("Are you trying to archive idempotency? if yes, please make sure that your statementstarts with CREATE OR REPLACE. Currently sour ksql statement does not, - " + str.substring(0, 40));
            }
            return new QueryResponse(this.client.executeStatement(str).get()).asMap();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    private boolean isCreateWithoutReplace(String str) {
        return str.toLowerCase(Locale.ROOT).trim().matches("create\\s+(stream|table)");
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Map<String, Object> add(String str, String str2) throws IOException {
        return add(str2);
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public void delete(String str) throws IOException {
        delete(str, "STREAM");
        delete(str, "TABLE");
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public void delete(String str, String str2) throws IOException {
        try {
            if (STREAM_TYPE.equalsIgnoreCase(str2) || TABLE_TYPE.equalsIgnoreCase(str2)) {
                this.client.executeStatement(String.format("DROP %s IF EXISTS %s;", str2.toUpperCase(), str)).get();
            } else {
                this.client.terminatePushQuery(str).get();
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public List<String> list() throws IOException {
        return (List) Stream.of((Object[]) new List[]{listStreams(), listTables()}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    public List<String> listTables() throws IOException {
        try {
            return (List) this.client.listTables().get().stream().map(tableInfo -> {
                return new KsqlTableArtefact("", this.server.getHost(), tableInfo.getName());
            }).map((v1) -> {
                return artefactToString(v1);
            }).filter(str -> {
                return !str.isEmpty();
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    private String artefactToString(KsqlArtefact ksqlArtefact) {
        try {
            return JSON.asString(ksqlArtefact);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return "";
        }
    }

    public List<String> listStreams() throws IOException {
        try {
            return (List) this.client.listStreams().get().stream().filter(streamInfo -> {
                return !"KSQL_PROCESSING_LOG".equalsIgnoreCase(streamInfo.getName());
            }).map(streamInfo2 -> {
                return new KsqlStreamArtefact("", this.server.getHost(), streamInfo2.getName());
            }).map((v1) -> {
                return artefactToString(v1);
            }).filter(str -> {
                return !str.isEmpty();
            }).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Collection<? extends Artefact> getClusterState() throws IOException {
        ArrayList arrayList = new ArrayList();
        Stream filter = listStreams().stream().map(str -> {
            try {
                return (KsqlStreamArtefact) JSON.toObject(str, KsqlStreamArtefact.class);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        Objects.requireNonNull(arrayList);
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        Stream filter2 = listTables().stream().map(str2 -> {
            try {
                return (KsqlTableArtefact) JSON.toObject(str2, KsqlTableArtefact.class);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        Objects.requireNonNull(arrayList);
        filter2.forEach((v1) -> {
            r1.add(v1);
        });
        return arrayList;
    }
}
