package com.purbon.kafka.topology.api.connect;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.clients.ArtefactClient;
import com.purbon.kafka.topology.clients.JulieHttpClient;
import com.purbon.kafka.topology.model.Artefact;
import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact;
import com.purbon.kafka.topology.utils.BasicAuth;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.jackson.JsonConstants;
import redis.clients.jedis.Protocol;

/* loaded from: input_file:com/purbon/kafka/topology/api/connect/KConnectApiClient.class */
public class KConnectApiClient extends JulieHttpClient implements ArtefactClient {
    private String label;

    public KConnectApiClient(String str, Configuration configuration) throws IOException {
        this(str, "", configuration);
    }

    public KConnectApiClient(String str, String str2, Configuration configuration) throws IOException {
        super(str, Optional.of(configuration));
        this.label = str2;
        Map<String, String> serversBasicAuthMap = configuration.getServersBasicAuthMap();
        if (serversBasicAuthMap.containsKey(str2)) {
            String[] split = serversBasicAuthMap.get(str2).split(":");
            setBasicAuth(new BasicAuth(split[0], split[1]));
        }
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public String getServer() {
        return this.server;
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Map<String, Object> add(String str) throws IOException {
        throw new IOException("Not implemented in this context");
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public List<String> list() throws IOException {
        return ImmutableList.copyOf(doList().fieldNames());
    }

    protected JsonNode doList() throws IOException {
        return JSON.toNode(doGet("/connectors?expand=info").getResponseAsString());
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Collection<? extends Artefact> getClusterState() throws IOException {
        JsonNode doList = doList();
        Objects.requireNonNull(doList);
        Iterable iterable = doList::fields;
        return (Collection) StreamSupport.stream(iterable.spliterator(), false).map(entry -> {
            JsonNode jsonNode = (JsonNode) Optional.ofNullable(((JsonNode) entry.getValue()).get(Protocol.CLUSTER_INFO)).map(jsonNode2 -> {
                return jsonNode2.get(LoggerContext.PROPERTY_CONFIG);
            }).orElse(null);
            String str = null;
            if (jsonNode instanceof ObjectNode) {
                ObjectNode objectNode = (ObjectNode) jsonNode;
                objectNode.remove("name");
                str = Integer.toHexString(objectNode.hashCode());
            }
            return new KafkaConnectArtefact("", this.server, (String) entry.getKey(), str);
        }).collect(Collectors.toList());
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Map<String, Object> add(String str, String str2) throws IOException {
        String format = String.format("/connectors/%s/config", str);
        Map<String, Object> map = JSON.toMap(str2);
        if (mayBeAConfigRecord(map)) {
            Object obj = map.get(LoggerContext.PROPERTY_CONFIG);
            if (!str.equalsIgnoreCase(map.get("name").toString())) {
                throw new IOException("Trying to add a connector with a different name as in the topology");
            }
            str2 = JSON.asString(obj);
        }
        return JSON.toMap(doPut(format, str2));
    }

    private boolean mayBeAConfigRecord(Map<String, Object> map) {
        Set<String> keySet = map.keySet();
        return keySet.contains(LoggerContext.PROPERTY_CONFIG) && keySet.contains("name") && keySet.size() == 2;
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public void delete(String str) throws IOException {
        doDelete("/connectors/" + str + "/", "");
    }

    public String status(String str) throws IOException {
        Map<String, Object> map = JSON.toMap(doGet("/connectors/" + str + "/status").getResponseAsString());
        if (map.containsKey("error_code")) {
            throw new IOException(map.get(JsonConstants.ELT_MESSAGE).toString());
        }
        return (String) ((Map) map.get("connector")).get("state");
    }

    public void pause(String str) throws IOException {
        doPut("/connectors/" + str + "/pause");
    }

    @Override // com.purbon.kafka.topology.clients.ArtefactClient
    public Map<String, Object> update(String str, String str2) throws IOException {
        return add(str, str2);
    }

    public String toString() {
        return "KConnectApiClient{" + this.server + " - " + this.label + "}";
    }
}
