package org.apache.druid.client;

import com.google.common.base.Predicate;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/client/BrokerServerView.class */
public class BrokerServerView implements TimelineServerView {
    private static final Logger log = new Logger(BrokerServerView.class);
    private final QueryableDruidServer.Maker druidClientFactory;
    private final TierSelectorStrategy tierSelectorStrategy;
    private final ServiceEmitter emitter;
    private final BrokerSegmentWatcherConfig segmentWatcherConfig;
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
    private final FilteredServerInventoryView baseView;
    private final Object lock = new Object();
    private final ConcurrentMap<String, QueryableDruidServer> clients = new ConcurrentHashMap();
    private final Map<SegmentId, ServerSelector> selectors = new HashMap();
    private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap();
    private final ConcurrentMap<TimelineServerView.TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap();
    private final CountDownLatch initialized = new CountDownLatch(1);

    @Inject
    public BrokerServerView(QueryableDruidServer.Maker maker, FilteredServerInventoryView filteredServerInventoryView, TierSelectorStrategy tierSelectorStrategy, ServiceEmitter serviceEmitter, BrokerSegmentWatcherConfig brokerSegmentWatcherConfig) {
        this.druidClientFactory = maker;
        this.baseView = filteredServerInventoryView;
        this.tierSelectorStrategy = tierSelectorStrategy;
        this.emitter = serviceEmitter;
        validateSegmentWatcherConfig(brokerSegmentWatcherConfig);
        this.segmentWatcherConfig = brokerSegmentWatcherConfig;
        this.segmentFilter = pair -> {
            if (brokerSegmentWatcherConfig.getWatchedTiers() != null && !brokerSegmentWatcherConfig.getWatchedTiers().contains(((DruidServerMetadata) pair.lhs).getTier())) {
                return false;
            }
            if (brokerSegmentWatcherConfig.getIgnoredTiers() != null && brokerSegmentWatcherConfig.getIgnoredTiers().contains(((DruidServerMetadata) pair.lhs).getTier())) {
                return false;
            }
            if (brokerSegmentWatcherConfig.getWatchedDataSources() == null || brokerSegmentWatcherConfig.getWatchedDataSources().contains(((DataSegment) pair.rhs).getDataSource())) {
                return ((DruidServerMetadata) pair.lhs).getType() != ServerType.INDEXER_EXECUTOR || brokerSegmentWatcherConfig.isWatchRealtimeTasks();
            }
            return false;
        };
        ExecutorService singleThreaded = Execs.singleThreaded("BrokerServerView-%s");
        filteredServerInventoryView.registerSegmentCallback(singleThreaded, new ServerView.SegmentCallback() { // from class: org.apache.druid.client.BrokerServerView.1
            @Override // org.apache.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                BrokerServerView.this.serverAddedSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                BrokerServerView.this.serverRemovedSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentViewInitialized() {
                BrokerServerView.this.initialized.countDown();
                BrokerServerView.this.runTimelineCallbacks((v0) -> {
                    return v0.timelineInitialized();
                });
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        }, this.segmentFilter);
        filteredServerInventoryView.registerServerRemovedCallback(singleThreaded, druidServer -> {
            removeServer(druidServer);
            return ServerView.CallbackAction.CONTINUE;
        });
    }

    @LifecycleStart
    public void start() throws InterruptedException {
        if (this.segmentWatcherConfig.isAwaitInitializationOnStart()) {
            long currentTimeMillis = System.currentTimeMillis();
            log.info("BrokerServerView waiting for initialization.", new Object[0]);
            awaitInitialization();
            long currentTimeMillis2 = System.currentTimeMillis();
            log.info("BrokerServerView initialized in [%,d] ms.", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.emitter.emit(ServiceMetricEvent.builder().setMetric("serverview/init/time", Long.valueOf(currentTimeMillis2 - currentTimeMillis)));
        }
    }

    public boolean isInitialized() {
        return this.initialized.getCount() == 0;
    }

    public void awaitInitialization() throws InterruptedException {
        this.initialized.await();
    }

    public QueryableDruidServer.Maker getDruidClientFactory() {
        return this.druidClientFactory;
    }

    private void validateSegmentWatcherConfig(BrokerSegmentWatcherConfig brokerSegmentWatcherConfig) {
        if (brokerSegmentWatcherConfig.getWatchedTiers() != null && brokerSegmentWatcherConfig.getIgnoredTiers() != null) {
            throw new ISE("At most one of 'druid.broker.segment.watchedTiers' and 'druid.broker.segment.ignoredTiers' can be configured.", new Object[0]);
        }
        if (brokerSegmentWatcherConfig.getWatchedTiers() != null && brokerSegmentWatcherConfig.getWatchedTiers().isEmpty()) {
            throw new ISE("If configured, 'druid.broker.segment.watchedTiers' must be non-empty", new Object[0]);
        }
        if (brokerSegmentWatcherConfig.getIgnoredTiers() != null && brokerSegmentWatcherConfig.getIgnoredTiers().isEmpty()) {
            throw new ISE("If configured, 'druid.broker.segment.ignoredTiers' must be non-empty", new Object[0]);
        }
    }

    private QueryableDruidServer addServer(DruidServer druidServer) {
        QueryableDruidServer make = this.druidClientFactory.make(druidServer);
        if (this.clients.put(druidServer.getName(), make) != null) {
            log.warn("QueryRunner for server[%s] already exists!? Well it's getting replaced", new Object[]{druidServer});
        }
        return make;
    }

    private QueryableDruidServer removeServer(DruidServer druidServer) {
        Iterator<DataSegment> it = druidServer.iterateAllSegments().iterator();
        while (it.hasNext()) {
            serverRemovedSegment(druidServer.getMetadata(), it.next());
        }
        return this.clients.remove(druidServer.getName());
    }

    private void serverAddedSegment(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
        SegmentId id = dataSegment.getId();
        synchronized (this.lock) {
            if (!druidServerMetadata.getType().equals(ServerType.BROKER)) {
                log.debug("Adding segment[%s] for server[%s]", new Object[]{dataSegment, druidServerMetadata});
                ServerSelector serverSelector = this.selectors.get(id);
                if (serverSelector == null) {
                    serverSelector = new ServerSelector(dataSegment, this.tierSelectorStrategy);
                    VersionedIntervalTimeline<String, ServerSelector> versionedIntervalTimeline = this.timelines.get(dataSegment.getDataSource());
                    if (versionedIntervalTimeline == null) {
                        versionedIntervalTimeline = new VersionedIntervalTimeline<>(Ordering.natural(), true);
                        this.timelines.put(dataSegment.getDataSource(), versionedIntervalTimeline);
                    }
                    versionedIntervalTimeline.add(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().createChunk(serverSelector));
                    this.selectors.put(id, serverSelector);
                }
                QueryableDruidServer queryableDruidServer = this.clients.get(druidServerMetadata.getName());
                if (queryableDruidServer == null) {
                    DruidServer inventoryValue = this.baseView.getInventoryValue(druidServerMetadata.getName());
                    if (inventoryValue == null) {
                        log.warn("Could not find server[%s] in inventory. Skipping addition of segment[%s].", new Object[]{druidServerMetadata.getName(), id});
                        return;
                    }
                    queryableDruidServer = addServer(inventoryValue);
                }
                serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);
            }
            runTimelineCallbacks(timelineCallback -> {
                return timelineCallback.segmentAdded(druidServerMetadata, dataSegment);
            });
        }
    }

    private void serverRemovedSegment(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
        SegmentId id = dataSegment.getId();
        synchronized (this.lock) {
            log.debug("Removing segment[%s] from server[%s].", new Object[]{id, druidServerMetadata});
            if (druidServerMetadata.getType().equals(ServerType.BROKER)) {
                runTimelineCallbacks(timelineCallback -> {
                    return timelineCallback.serverSegmentRemoved(druidServerMetadata, dataSegment);
                });
                return;
            }
            ServerSelector serverSelector = this.selectors.get(id);
            if (serverSelector == null) {
                log.warn("Told to remove non-existant segment[%s]", new Object[]{id});
                return;
            }
            QueryableDruidServer queryableDruidServer = this.clients.get(druidServerMetadata.getName());
            if (queryableDruidServer == null) {
                log.warn("Could not find server[%s] in inventory. Skipping removal of segment[%s].", new Object[]{druidServerMetadata.getName(), id});
            } else if (serverSelector.removeServer(queryableDruidServer)) {
                runTimelineCallbacks(timelineCallback2 -> {
                    return timelineCallback2.serverSegmentRemoved(druidServerMetadata, dataSegment);
                });
            } else {
                log.warn("Asked to disassociate non-existant association between server[%s] and segment[%s]", new Object[]{druidServerMetadata, id});
            }
            if (serverSelector.isEmpty()) {
                VersionedIntervalTimeline<String, ServerSelector> versionedIntervalTimeline = this.timelines.get(dataSegment.getDataSource());
                this.selectors.remove(id);
                if (versionedIntervalTimeline.remove(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().createChunk(serverSelector)) == null) {
                    log.warn("Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", new Object[]{dataSegment.getInterval(), dataSegment.getVersion()});
                } else {
                    runTimelineCallbacks(timelineCallback3 -> {
                        return timelineCallback3.segmentRemoved(dataSegment);
                    });
                }
            }
        }
    }

    @Override // org.apache.druid.client.TimelineServerView
    public Optional<VersionedIntervalTimeline<String, ServerSelector>> getTimeline(TableDataSource tableDataSource) {
        Optional<VersionedIntervalTimeline<String, ServerSelector>> ofNullable;
        synchronized (this.lock) {
            ofNullable = Optional.ofNullable(this.timelines.get(tableDataSource.getName()));
        }
        return ofNullable;
    }

    @Override // org.apache.druid.client.TimelineServerView
    public void registerTimelineCallback(Executor executor, TimelineServerView.TimelineCallback timelineCallback) {
        this.timelineCallbacks.put(timelineCallback, executor);
    }

    @Override // org.apache.druid.client.TimelineServerView
    public <T> QueryRunner<T> getQueryRunner(DruidServer druidServer) {
        synchronized (this.lock) {
            QueryableDruidServer queryableDruidServer = this.clients.get(druidServer.getName());
            if (queryableDruidServer == null) {
                log.error("No QueryRunner found for server name[%s].", new Object[]{druidServer.getName()});
                return null;
            }
            return (QueryRunner<T>) queryableDruidServer.getQueryRunner();
        }
    }

    @Override // org.apache.druid.client.ServerView, org.apache.druid.client.FilteredServerInventoryView
    public void registerServerRemovedCallback(Executor executor, ServerView.ServerRemovedCallback serverRemovedCallback) {
        this.baseView.registerServerRemovedCallback(executor, serverRemovedCallback);
    }

    @Override // org.apache.druid.client.ServerView
    public void registerSegmentCallback(Executor executor, ServerView.SegmentCallback segmentCallback) {
        this.baseView.registerSegmentCallback(executor, segmentCallback, this.segmentFilter);
    }

    private void runTimelineCallbacks(Function<TimelineServerView.TimelineCallback, ServerView.CallbackAction> function) {
        for (Map.Entry<TimelineServerView.TimelineCallback, Executor> entry : this.timelineCallbacks.entrySet()) {
            entry.getValue().execute(() -> {
                if (ServerView.CallbackAction.UNREGISTER == function.apply((TimelineServerView.TimelineCallback) entry.getKey())) {
                    this.timelineCallbacks.remove(entry.getKey());
                }
            });
        }
    }

    @Override // org.apache.druid.client.TimelineServerView
    public List<DruidServerMetadata> getDruidServerMetadatas() {
        ArrayList arrayList = new ArrayList(this.clients.size());
        Iterator<QueryableDruidServer> it = this.clients.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getServer().getMetadata());
        }
        return arrayList;
    }

    @Override // org.apache.druid.client.TimelineServerView
    public List<ImmutableDruidServer> getDruidServers() {
        return (List) this.clients.values().stream().map(queryableDruidServer -> {
            return queryableDruidServer.getServer().toImmutableDruidServer();
        }).collect(Collectors.toList());
    }
}
