package io.micronaut.configuration.kafka.health;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.config.KafkaHealthConfiguration;
import io.micronaut.configuration.kafka.config.KafkaHealthConfigurationProperties;
import io.micronaut.configuration.kafka.reactor.KafkaReactorUtil;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requirements;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.util.SupplierUtil;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Time;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Singleton
@Requirements({@Requires(bean = KafkaDefaultConfiguration.class), @Requires(property = "kafka.health.enabled", value = "true", defaultValue = "true")})
/* loaded from: input_file:io/micronaut/configuration/kafka/health/KafkaHealthIndicator.class */
public class KafkaHealthIndicator implements HealthIndicator, ClusterResourceListener {
    private static final String ID = "kafka";
    private static final String MIN_INSYNC_REPLICAS_PROPERTY = "min.insync.replicas";
    private static final String REPLICATION_PROPERTY = "offsets.topic.replication.factor";
    private static final String DEFAULT_REPLICATION_PROPERTY = "default.replication.factor";
    private static final String DETAILS_BROKER_ID = "brokerId";
    private static final String DETAILS_CLUSTER_ID = "clusterId";
    private static final String DETAILS_NODES = "nodes";
    private final Supplier<AdminClient> adminClientSupplier;
    private final KafkaDefaultConfiguration defaultConfiguration;
    private final Supplier<NetworkClient> networkClientSupplier;
    private final KafkaHealthConfiguration kafkaHealthConfiguration;
    private String clusterId;

    @Inject
    public KafkaHealthIndicator(BeanContext beanContext, KafkaDefaultConfiguration kafkaDefaultConfiguration, NetworkClientCreator networkClientCreator, KafkaHealthConfiguration kafkaHealthConfiguration) {
        this.adminClientSupplier = SupplierUtil.memoized(() -> {
            return (AdminClient) beanContext.getBean(AdminClient.class);
        });
        this.defaultConfiguration = kafkaDefaultConfiguration;
        this.networkClientSupplier = SupplierUtil.memoized(() -> {
            return networkClientCreator.create(this);
        });
        this.kafkaHealthConfiguration = kafkaHealthConfiguration;
    }

    @Deprecated(forRemoval = true)
    public KafkaHealthIndicator(AdminClient adminClient, KafkaDefaultConfiguration kafkaDefaultConfiguration) {
        this.adminClientSupplier = () -> {
            return adminClient;
        };
        this.defaultConfiguration = kafkaDefaultConfiguration;
        this.networkClientSupplier = SupplierUtil.memoized(() -> {
            return new DefaultNetworkClientCreator(kafkaDefaultConfiguration).create(this);
        });
        this.kafkaHealthConfiguration = new KafkaHealthConfigurationProperties();
    }

    public void onUpdate(ClusterResource clusterResource) {
        this.clusterId = (String) Optional.ofNullable(clusterResource).map((v0) -> {
            return v0.clusterId();
        }).orElse(null);
    }

    public static int getClusterReplicationFactor(Config config) {
        ConfigEntry configEntry = (ConfigEntry) Optional.ofNullable(config.get(REPLICATION_PROPERTY)).orElseGet(() -> {
            return config.get(DEFAULT_REPLICATION_PROPERTY);
        });
        if (configEntry != null) {
            return Integer.parseInt(configEntry.value());
        }
        return Integer.MAX_VALUE;
    }

    public static int getMinNodeCount(Config config) {
        return ((Integer) Optional.ofNullable(config.get(MIN_INSYNC_REPLICAS_PROPERTY)).map((v0) -> {
            return v0.value();
        }).map(Integer::parseInt).orElseGet(() -> {
            return Integer.valueOf(getClusterReplicationFactor(config));
        })).intValue();
    }

    /* renamed from: getResult, reason: merged with bridge method [inline-methods] */
    public Flux<HealthResult> m41getResult() {
        if (this.kafkaHealthConfiguration.isRestricted()) {
            try {
                NetworkClient networkClient = this.networkClientSupplier.get();
                return Flux.just(hasReadyNodes(networkClient).orElseGet(() -> {
                    return waitForLeastLoadedNode(networkClient);
                }));
            } catch (Exception e) {
                return Flux.just(failure(e, Collections.emptyMap()));
            }
        }
        AdminClient adminClient = this.adminClientSupplier.get();
        DescribeClusterResult describeCluster = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int) this.defaultConfiguration.getHealthTimeout().toMillis())));
        Objects.requireNonNull(describeCluster);
        Mono fromKafkaFuture = KafkaReactorUtil.fromKafkaFuture(describeCluster::clusterId);
        Objects.requireNonNull(describeCluster);
        Mono fromKafkaFuture2 = KafkaReactorUtil.fromKafkaFuture(describeCluster::nodes);
        Objects.requireNonNull(describeCluster);
        return KafkaReactorUtil.fromKafkaFuture(describeCluster::controller).flux().switchMap(node -> {
            String idString = node.idString();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, idString);
            DescribeConfigsResult describeConfigs = adminClient.describeConfigs(Collections.singletonList(configResource));
            Objects.requireNonNull(describeConfigs);
            return KafkaReactorUtil.fromKafkaFuture(describeConfigs::all).flux().switchMap(map -> {
                int minNodeCount = getMinNodeCount((Config) map.get(configResource));
                return fromKafkaFuture2.flux().switchMap(collection -> {
                    return fromKafkaFuture.map(str -> {
                        int size = collection.size();
                        return getHealthResult(size >= minNodeCount, str, Integer.valueOf(size), idString);
                    });
                });
            });
        }).onErrorResume(th -> {
            return Mono.just(HealthResult.builder("kafka", HealthStatus.DOWN).exception(th).build());
        });
    }

    private static HealthResult getHealthResult(boolean z, @Nullable String str, @Nullable Integer num, @Nullable String str2) {
        HashMap hashMap = new HashMap();
        if (str != null) {
            hashMap.put(DETAILS_CLUSTER_ID, str);
        }
        if (str2 != null) {
            hashMap.put(DETAILS_BROKER_ID, str2);
        }
        if (num != null) {
            hashMap.put(DETAILS_NODES, num);
        }
        return result(z, hashMap).build();
    }

    private static HealthResult getHealthResult(boolean z, @Nullable String str) {
        return getHealthResult(z, str, null, null);
    }

    private static HealthResult.Builder result(boolean z, Map<String, Object> map) {
        return HealthResult.builder("kafka", z ? HealthStatus.UP : HealthStatus.DOWN).details(map);
    }

    private static HealthResult success(Map<String, Object> map) {
        return result(true, map).build();
    }

    private static HealthResult failure(Throwable th, Map<String, Object> map) {
        return result(false, map).exception(th).build();
    }

    private Optional<HealthResult> hasReadyNodes(NetworkClient networkClient) {
        return networkClient.hasReadyNodes(Time.SYSTEM.milliseconds()) ? Optional.of(getHealthResult(true, this.clusterId)) : Optional.empty();
    }

    private HealthResult waitForLeastLoadedNode(NetworkClient networkClient) {
        try {
            return result(NetworkClientUtils.awaitReady(networkClient, networkClient.leastLoadedNode(Time.SYSTEM.milliseconds()).node(), Time.SYSTEM, this.defaultConfiguration.getHealthTimeout().toMillis()), null).build();
        } catch (IOException e) {
            return failure(e, Collections.emptyMap());
        }
    }
}
