package org.apache.druid.sql.calcite.schema;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.class */
public class BrokerSegmentMetadataCache extends AbstractSegmentMetadataCache<DatasourceTable.PhysicalDatasourceMetadata> {
    private static final EmittingLogger log = new EmittingLogger(BrokerSegmentMetadataCache.class);
    private final PhysicalDatasourceMetadataFactory dataSourceMetadataFactory;
    private final CoordinatorClient coordinatorClient;
    private final BrokerSegmentMetadataCacheConfig config;
    private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

    @Inject
    public BrokerSegmentMetadataCache(QueryLifecycleFactory queryLifecycleFactory, TimelineServerView timelineServerView, BrokerSegmentMetadataCacheConfig brokerSegmentMetadataCacheConfig, Escalator escalator, InternalQueryConfig internalQueryConfig, ServiceEmitter serviceEmitter, PhysicalDatasourceMetadataFactory physicalDatasourceMetadataFactory, CoordinatorClient coordinatorClient, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig) {
        super(queryLifecycleFactory, brokerSegmentMetadataCacheConfig, escalator, internalQueryConfig, serviceEmitter);
        this.dataSourceMetadataFactory = physicalDatasourceMetadataFactory;
        this.coordinatorClient = coordinatorClient;
        this.config = brokerSegmentMetadataCacheConfig;
        this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
        initServerViewTimelineCallback(timelineServerView);
    }

    private void initServerViewTimelineCallback(TimelineServerView timelineServerView) {
        timelineServerView.registerTimelineCallback(this.callbackExec, new TimelineServerView.TimelineCallback() { // from class: org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache.1
            public ServerView.CallbackAction timelineInitialized() {
                synchronized (BrokerSegmentMetadataCache.this.lock) {
                    BrokerSegmentMetadataCache.this.isServerViewInitialized = true;
                    BrokerSegmentMetadataCache.this.lock.notifyAll();
                }
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                BrokerSegmentMetadataCache.this.addSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment dataSegment) {
                BrokerSegmentMetadataCache.this.removeSegment(dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                BrokerSegmentMetadataCache.this.removeServerSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    @LifecycleStart
    public void start() throws InterruptedException {
        log.info("Initializing cache.", new Object[0]);
        this.cacheExec.submit(() -> {
            this.cacheExecLoop();
        });
        if (this.config.isAwaitInitializationOnStart()) {
            awaitInitialization();
        }
    }

    @LifecycleStop
    public void stop() {
        this.cacheExec.shutdownNow();
        this.callbackExec.shutdownNow();
    }

    protected boolean shouldRefresh() {
        return this.centralizedDatasourceSchemaConfig.isEnabled() || super.shouldRefresh();
    }

    public void refresh(Set<SegmentId> set, Set<String> set2) throws IOException {
        HashSet hashSet = new HashSet(this.segmentMetadataInfo.keySet());
        hashSet.addAll(queryDataSources());
        log.debug("Querying schema for [%s] datasources from Coordinator.", new Object[]{hashSet});
        Map<String, DatasourceTable.PhysicalDatasourceMetadata> queryDataSourceInformation = queryDataSourceInformation(hashSet);
        log.debug("Fetched schema for [%s] datasources from Coordinator.", new Object[]{queryDataSourceInformation.keySet()});
        queryDataSourceInformation.forEach(this::updateDSMetadata);
        set.removeIf(segmentId -> {
            return queryDataSourceInformation.containsKey(segmentId.getDataSource());
        });
        Set hashSet2 = new HashSet();
        if (!this.config.isDisableSegmentMetadataQueries()) {
            hashSet2 = refreshSegments(set);
        }
        synchronized (this.lock) {
            this.segmentsNeedingRefresh.addAll(Sets.difference(set, hashSet2));
            set2.addAll(this.dataSourcesNeedingRebuild);
            hashSet2.forEach(segmentId2 -> {
                set2.add(segmentId2.getDataSource());
            });
            set2.removeAll(queryDataSourceInformation.keySet());
            this.dataSourcesNeedingRebuild.clear();
        }
        for (String str : set2) {
            RowSignature buildDataSourceRowSignature = buildDataSourceRowSignature(str);
            if (buildDataSourceRowSignature == null) {
                log.info("datasource [%s] no longer exists, all metadata removed.", new Object[]{str});
                this.tables.remove(str);
            } else if (buildDataSourceRowSignature.getColumnNames().isEmpty()) {
                log.info("datasource [%s] schema has not been initialized yet, check coordinator logs if this message is persistent.", new Object[]{str});
                this.tables.remove(str);
            } else {
                updateDSMetadata(str, this.dataSourceMetadataFactory.build(str, buildDataSourceRowSignature));
            }
        }
    }

    protected void removeSegmentAction(SegmentId segmentId) {
    }

    private Set<String> queryDataSources() {
        HashSet hashSet = new HashSet();
        try {
            Set set = (Set) FutureUtils.getUnchecked(this.coordinatorClient.fetchDataSourcesWithUsedSegments(), true);
            if (set != null) {
                hashSet.addAll(set);
            }
        } catch (Exception e) {
            log.debug(e, "Failed to query datasources from the Coordinator.", new Object[0]);
        }
        return hashSet;
    }

    private Map<String, DatasourceTable.PhysicalDatasourceMetadata> queryDataSourceInformation(Set<String> set) {
        Stopwatch createStarted = Stopwatch.createStarted();
        List list = null;
        this.emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/schemaPoll/count", 1));
        try {
            list = (List) FutureUtils.getUnchecked(this.coordinatorClient.fetchDataSourceInformation(set), true);
        } catch (Exception e) {
            log.debug(e, "Failed to query datasource information from the Coordinator.", new Object[0]);
            this.emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/schemaPoll/failed", 1));
        }
        this.emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/schemaPoll/time", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))));
        HashMap hashMap = new HashMap();
        if (list != null) {
            list.forEach(dataSourceInformation -> {
                hashMap.put(dataSourceInformation.getDataSource(), this.dataSourceMetadataFactory.build(dataSourceInformation.getDataSource(), dataSourceInformation.getRowSignature()));
            });
        }
        return hashMap;
    }

    private void updateDSMetadata(String str, DatasourceTable.PhysicalDatasourceMetadata physicalDatasourceMetadata) {
        DatasourceTable.PhysicalDatasourceMetadata physicalDatasourceMetadata2 = (DatasourceTable.PhysicalDatasourceMetadata) this.tables.put(str, physicalDatasourceMetadata);
        if (physicalDatasourceMetadata2 == null || !physicalDatasourceMetadata2.getRowSignature().equals(physicalDatasourceMetadata.getRowSignature())) {
            log.info("[%s] has new signature: %s.", new Object[]{str, physicalDatasourceMetadata.getRowSignature()});
        } else {
            log.debug("[%s] signature is unchanged.", new Object[]{str});
        }
    }
}
