package io.trino.metadata;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.discovery.client.ServiceType;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogManagerConfig;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.failuredetector.FailureDetector;
import io.trino.metadata.InternalNodeManager;
import io.trino.server.InternalCommunicationConfig;
import io.trino.spi.connector.CatalogHandle;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.weakref.jmx.Managed;

@ThreadSafe
/* loaded from: input_file:io/trino/metadata/DiscoveryNodeManager.class */
public final class DiscoveryNodeManager implements InternalNodeManager {
    private static final Logger log = Logger.get(DiscoveryNodeManager.class);
    private static final Splitter CATALOG_HANDLE_ID_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
    private final ServiceSelector serviceSelector;
    private final FailureDetector failureDetector;
    private final NodeVersion expectedNodeVersion;
    private final HttpClient httpClient;
    private final boolean httpsRequired;
    private final InternalNode currentNode;
    private final boolean allCatalogsOnAllNodes;

    @GuardedBy("this")
    private AllNodes allNodes;

    @GuardedBy("this")
    private Set<InternalNode> coordinators;
    private final ConcurrentHashMap<String, RemoteNodeState> nodeStates = new ConcurrentHashMap<>();

    @GuardedBy("this")
    private Optional<SetMultimap<CatalogHandle, InternalNode>> activeNodesByCatalogHandle = Optional.empty();

    @GuardedBy("this")
    private final List<Consumer<AllNodes>> listeners = new ArrayList();
    private final ScheduledExecutorService nodeStateUpdateExecutor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed("node-state-poller-%s"));
    private final ExecutorService nodeStateEventExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("node-state-events-%s"));

    @Inject
    public DiscoveryNodeManager(@ServiceType("trino") ServiceSelector serviceSelector, NodeInfo nodeInfo, FailureDetector failureDetector, NodeVersion nodeVersion, @ForNodeManager HttpClient httpClient, InternalCommunicationConfig internalCommunicationConfig, CatalogManagerConfig catalogManagerConfig) {
        this.serviceSelector = (ServiceSelector) Objects.requireNonNull(serviceSelector, "serviceSelector is null");
        this.failureDetector = (FailureDetector) Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.expectedNodeVersion = (NodeVersion) Objects.requireNonNull(nodeVersion, "expectedNodeVersion is null");
        this.httpClient = (HttpClient) Objects.requireNonNull(httpClient, "httpClient is null");
        this.httpsRequired = internalCommunicationConfig.isHttpsRequired();
        this.allCatalogsOnAllNodes = catalogManagerConfig.getCatalogMangerKind() != CatalogManagerConfig.CatalogMangerKind.STATIC;
        this.currentNode = findCurrentNode(serviceSelector.selectAllServices(), nodeInfo.getNodeId(), nodeVersion, this.httpsRequired);
        refreshNodesInternal();
    }

    private static InternalNode findCurrentNode(List<ServiceDescriptor> list, String str, NodeVersion nodeVersion, boolean z) {
        for (ServiceDescriptor serviceDescriptor : list) {
            URI httpUri = getHttpUri(serviceDescriptor, z);
            NodeVersion nodeVersion2 = getNodeVersion(serviceDescriptor);
            if (httpUri != null && nodeVersion2 != null) {
                InternalNode internalNode = new InternalNode(serviceDescriptor.getNodeId(), httpUri, nodeVersion2, isCoordinator(serviceDescriptor));
                if (internalNode.getNodeIdentifier().equals(str)) {
                    Preconditions.checkState(internalNode.getNodeVersion().equals(nodeVersion), "INVARIANT: current node version (%s) should be equal to %s", internalNode.getNodeVersion(), nodeVersion);
                    return internalNode;
                }
            }
        }
        throw new IllegalStateException("INVARIANT: current node not returned from service selector");
    }

    @PostConstruct
    public void startPollingNodeStates() {
        this.nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
            try {
                pollWorkers();
            } catch (Exception e) {
                log.error(e, "Error polling state of nodes");
            }
        }, 5L, 5L, TimeUnit.SECONDS);
        pollWorkers();
    }

    @PreDestroy
    public void destroy() {
        this.nodeStateUpdateExecutor.shutdown();
        this.nodeStateEventExecutor.shutdown();
    }

    private void pollWorkers() {
        AllNodes allNodes = getAllNodes();
        ImmutableSet<InternalNode> build = ImmutableSet.builder().addAll(allNodes.getActiveNodes()).addAll(allNodes.getDrainingNodes()).addAll(allNodes.getDrainedNodes()).addAll(allNodes.getShuttingDownNodes()).build();
        this.nodeStates.keySet().removeAll(Sets.difference(this.nodeStates.keySet(), (Set) build.stream().map((v0) -> {
            return v0.getNodeIdentifier();
        }).collect(ImmutableSet.toImmutableSet())).immutableCopy());
        for (InternalNode internalNode : build) {
            this.nodeStates.putIfAbsent(internalNode.getNodeIdentifier(), new RemoteNodeState(this.httpClient, HttpUriBuilder.uriBuilderFrom(internalNode.getInternalUri()).appendPath("/v1/info/state").build()));
        }
        this.nodeStates.values().forEach((v0) -> {
            v0.asyncRefresh();
        });
        refreshNodesInternal();
    }

    @PreDestroy
    public void stop() {
        this.nodeStateUpdateExecutor.shutdownNow();
    }

    @Override // io.trino.metadata.InternalNodeManager
    public void refreshNodes() {
        refreshNodesInternal();
    }

    private synchronized void refreshNodesInternal() {
        Set<ServiceDescriptor> failed = this.failureDetector.getFailed();
        Set<ServiceDescriptor> set = (Set) this.serviceSelector.selectAllServices().stream().filter(serviceDescriptor -> {
            return !failed.contains(serviceDescriptor);
        }).collect(ImmutableSet.toImmutableSet());
        ImmutableSet.Builder builder = ImmutableSet.builder();
        ImmutableSet.Builder builder2 = ImmutableSet.builder();
        ImmutableSet.Builder builder3 = ImmutableSet.builder();
        ImmutableSet.Builder builder4 = ImmutableSet.builder();
        ImmutableSet.Builder builder5 = ImmutableSet.builder();
        ImmutableSet.Builder builder6 = ImmutableSet.builder();
        ImmutableSetMultimap.Builder builder7 = ImmutableSetMultimap.builder();
        for (ServiceDescriptor serviceDescriptor2 : set) {
            URI httpUri = getHttpUri(serviceDescriptor2, this.httpsRequired);
            NodeVersion nodeVersion = getNodeVersion(serviceDescriptor2);
            boolean isCoordinator = isCoordinator(serviceDescriptor2);
            if (httpUri != null && nodeVersion != null) {
                InternalNode internalNode = new InternalNode(serviceDescriptor2.getNodeId(), httpUri, nodeVersion, isCoordinator);
                NodeState nodeState = getNodeState(internalNode);
                switch (nodeState) {
                    case ACTIVE:
                        builder.add(internalNode);
                        if (isCoordinator) {
                            builder6.add(internalNode);
                        }
                        String str = (String) serviceDescriptor2.getProperties().get("catalogHandleIds");
                        if (str != null) {
                            Iterator it = CATALOG_HANDLE_ID_SPLITTER.split(str.toLowerCase(Locale.ENGLISH)).iterator();
                            while (it.hasNext()) {
                                builder7.put(CatalogHandle.fromId((String) it.next()), internalNode);
                            }
                        }
                        builder7.put(GlobalSystemConnector.CATALOG_HANDLE, internalNode);
                        break;
                    case INACTIVE:
                        builder2.add(internalNode);
                        break;
                    case DRAINING:
                        builder3.add(internalNode);
                        break;
                    case DRAINED:
                        builder4.add(internalNode);
                        break;
                    case SHUTTING_DOWN:
                        builder5.add(internalNode);
                        break;
                    default:
                        log.error("Unknown state %s for node %s", new Object[]{nodeState, internalNode});
                        break;
                }
            }
        }
        ImmutableSet build = builder.build();
        ImmutableSet build2 = builder3.build();
        ImmutableSet build3 = builder4.build();
        ImmutableSet build4 = builder2.build();
        ImmutableSet build5 = builder6.build();
        ImmutableSet build6 = builder5.build();
        if (this.allNodes != null) {
            UnmodifiableIterator it2 = Sets.difference(this.allNodes.getActiveNodes(), ImmutableSet.builder().addAll(build).addAll(build2).addAll(build3).addAll(build6).build()).iterator();
            while (it2.hasNext()) {
                InternalNode internalNode2 = (InternalNode) it2.next();
                log.info("Previously active node is missing: %s (last seen at %s)", new Object[]{internalNode2.getNodeIdentifier(), internalNode2.getHost()});
            }
        }
        if (!this.allCatalogsOnAllNodes) {
            this.activeNodesByCatalogHandle = Optional.of(builder7.build());
        }
        AllNodes allNodes = new AllNodes(build, build4, build2, build3, build6, build5);
        if (allNodes.equals(this.allNodes)) {
            return;
        }
        this.allNodes = allNodes;
        this.coordinators = build5;
        ImmutableList copyOf = ImmutableList.copyOf(this.listeners);
        this.nodeStateEventExecutor.submit(() -> {
            copyOf.forEach(consumer -> {
                consumer.accept(allNodes);
            });
        });
    }

    private NodeState getNodeState(InternalNode internalNode) {
        if (!this.expectedNodeVersion.equals(internalNode.getNodeVersion())) {
            return NodeState.INACTIVE;
        }
        return (NodeState) Optional.ofNullable(this.nodeStates.get(internalNode.getNodeIdentifier())).flatMap((v0) -> {
            return v0.getNodeState();
        }).orElse(NodeState.ACTIVE);
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized AllNodes getAllNodes() {
        return this.allNodes;
    }

    @Managed
    public int getActiveNodeCount() {
        return getAllNodes().getActiveNodes().size();
    }

    @Managed
    public int getInactiveNodeCount() {
        return getAllNodes().getInactiveNodes().size();
    }

    @Managed
    public int getDrainingNodeCount() {
        return getAllNodes().getDrainingNodes().size();
    }

    @Managed
    public int getDrainedNodeCount() {
        return getAllNodes().getDrainedNodes().size();
    }

    @Managed
    public int getShuttingDownNodeCount() {
        return getAllNodes().getShuttingDownNodes().size();
    }

    @Override // io.trino.metadata.InternalNodeManager
    public Set<InternalNode> getNodes(NodeState nodeState) {
        switch (nodeState) {
            case ACTIVE:
                return getAllNodes().getActiveNodes();
            case INACTIVE:
                return getAllNodes().getInactiveNodes();
            case DRAINING:
                return getAllNodes().getDrainingNodes();
            case DRAINED:
                return getAllNodes().getDrainedNodes();
            case SHUTTING_DOWN:
                return getAllNodes().getShuttingDownNodes();
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized Set<InternalNode> getActiveCatalogNodes(CatalogHandle catalogHandle) {
        return (Set) this.activeNodesByCatalogHandle.map(setMultimap -> {
            return setMultimap.get(catalogHandle);
        }).orElseGet(() -> {
            return this.allNodes.getActiveNodes();
        });
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized InternalNodeManager.NodesSnapshot getActiveNodesSnapshot() {
        return new InternalNodeManager.NodesSnapshot(this.allNodes.getActiveNodes(), this.activeNodesByCatalogHandle);
    }

    @Override // io.trino.metadata.InternalNodeManager
    public InternalNode getCurrentNode() {
        return this.currentNode;
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized Set<InternalNode> getCoordinators() {
        return this.coordinators;
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized void addNodeChangeListener(Consumer<AllNodes> consumer) {
        this.listeners.add((Consumer) Objects.requireNonNull(consumer, "listener is null"));
        AllNodes allNodes = this.allNodes;
        this.nodeStateEventExecutor.submit(() -> {
            consumer.accept(allNodes);
        });
    }

    @Override // io.trino.metadata.InternalNodeManager
    public synchronized void removeNodeChangeListener(Consumer<AllNodes> consumer) {
        this.listeners.remove(Objects.requireNonNull(consumer, "listener is null"));
    }

    private static URI getHttpUri(ServiceDescriptor serviceDescriptor, boolean z) {
        String str = (String) serviceDescriptor.getProperties().get(z ? "https" : "http");
        if (str != null) {
            return URI.create(str);
        }
        return null;
    }

    private static NodeVersion getNodeVersion(ServiceDescriptor serviceDescriptor) {
        String str = (String) serviceDescriptor.getProperties().get("node_version");
        if (str == null) {
            return null;
        }
        return new NodeVersion(str);
    }

    private static boolean isCoordinator(ServiceDescriptor serviceDescriptor) {
        return Boolean.parseBoolean((String) serviceDescriptor.getProperties().get("coordinator"));
    }
}
