package org.apache.druid.segment.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

@ManageLifecycle
/* loaded from: input_file:org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.class */
public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCache<DataSourceInformation> {
    private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class);
    private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
    private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = Long.valueOf(TimeUnit.SECONDS.toMillis(50));
    private static final String DEEP_STORAGE_ONLY_METRIC_PREFIX = "metadatacache/deepStorageOnly/";
    private final SegmentMetadataCacheConfig config;
    private final AbstractSegmentMetadataCache.ColumnTypeMergePolicy columnTypeMergePolicy;
    private final SegmentSchemaCache segmentSchemaCache;
    private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
    private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
    private final Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
    private final ServiceEmitter emitter;
    private volatile SegmentReplicationStatus segmentReplicationStatus;
    private final ConcurrentHashMap<String, DataSourceInformation> coldSchemaTable;
    private final ScheduledExecutorService coldSchemaExec;

    @Nullable
    private Future<?> cacheExecFuture;

    @Nullable
    private Future<?> coldSchemaExecFuture;

    @Inject
    public CoordinatorSegmentMetadataCache(QueryLifecycleFactory queryLifecycleFactory, CoordinatorServerView coordinatorServerView, SegmentMetadataCacheConfig segmentMetadataCacheConfig, Escalator escalator, InternalQueryConfig internalQueryConfig, ServiceEmitter serviceEmitter, SegmentSchemaCache segmentSchemaCache, SegmentSchemaBackFillQueue segmentSchemaBackFillQueue, SqlSegmentsMetadataManager sqlSegmentsMetadataManager, Supplier<SegmentsMetadataManagerConfig> supplier) {
        super(queryLifecycleFactory, segmentMetadataCacheConfig, escalator, internalQueryConfig, serviceEmitter);
        this.segmentReplicationStatus = null;
        this.coldSchemaTable = new ConcurrentHashMap<>();
        this.cacheExecFuture = null;
        this.coldSchemaExecFuture = null;
        this.config = segmentMetadataCacheConfig;
        this.columnTypeMergePolicy = segmentMetadataCacheConfig.getMetadataColumnTypeMergePolicy();
        this.segmentSchemaCache = segmentSchemaCache;
        this.segmentSchemaBackfillQueue = segmentSchemaBackFillQueue;
        this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
        this.segmentsMetadataManagerConfigSupplier = supplier;
        this.emitter = serviceEmitter;
        this.coldSchemaExec = Execs.scheduledSingleThreaded("DruidColdSchema-ScheduledExecutor-%d");
        initServerViewTimelineCallback(coordinatorServerView);
    }

    long getColdSchemaExecPeriodMillis() {
        return ((SegmentsMetadataManagerConfig) this.segmentsMetadataManagerConfigSupplier.get()).getPollDuration().toStandardDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER.longValue();
    }

    private void initServerViewTimelineCallback(CoordinatorServerView coordinatorServerView) {
        coordinatorServerView.registerTimelineCallback(this.callbackExec, new TimelineServerView.TimelineCallback() { // from class: org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache.1
            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction timelineInitialized() {
                synchronized (CoordinatorSegmentMetadataCache.this.lock) {
                    CoordinatorSegmentMetadataCache.this.isServerViewInitialized = true;
                    CoordinatorSegmentMetadataCache.this.lock.notifyAll();
                }
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                CoordinatorSegmentMetadataCache.this.addSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentRemoved(DataSegment dataSegment) {
                CoordinatorSegmentMetadataCache.this.removeSegment(dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                CoordinatorSegmentMetadataCache.this.removeServerSegment(druidServerMetadata, dataSegment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                CoordinatorSegmentMetadataCache.this.updateSchemaForRealtimeSegments(segmentSchemas);
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    @LifecycleStart
    public void start() {
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    @LifecycleStop
    public void stop() {
        this.callbackExec.shutdownNow();
        this.cacheExec.shutdownNow();
        this.coldSchemaExec.shutdownNow();
        this.segmentSchemaCache.onLeaderStop();
        this.segmentSchemaBackfillQueue.onLeaderStop();
        if (this.cacheExecFuture != null) {
            this.cacheExecFuture.cancel(true);
        }
        if (this.coldSchemaExecFuture != null) {
            this.coldSchemaExecFuture.cancel(true);
        }
    }

    public void onLeaderStart() {
        log.info("Initializing cache on leader node.", new Object[0]);
        try {
            this.segmentSchemaBackfillQueue.onLeaderStart();
            this.cacheExecFuture = this.cacheExec.submit(this::cacheExecLoop);
            this.coldSchemaExecFuture = this.coldSchemaExec.scheduleWithFixedDelay(this::coldDatasourceSchemaExec, getColdSchemaExecPeriodMillis(), getColdSchemaExecPeriodMillis(), TimeUnit.MILLISECONDS);
            if (this.config.isAwaitInitializationOnStart()) {
                awaitInitialization();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onLeaderStop() {
        log.info("No longer leader, stopping cache.", new Object[0]);
        if (this.cacheExecFuture != null) {
            this.cacheExecFuture.cancel(true);
        }
        if (this.coldSchemaExecFuture != null) {
            this.coldSchemaExecFuture.cancel(true);
        }
        this.segmentSchemaCache.onLeaderStop();
        this.segmentSchemaBackfillQueue.onLeaderStop();
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    public synchronized void refreshWaitCondition() throws InterruptedException {
        this.segmentSchemaCache.awaitInitialization();
    }

    public void updateSegmentReplicationStatus(SegmentReplicationStatus segmentReplicationStatus) {
        this.segmentReplicationStatus = segmentReplicationStatus;
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    protected void unmarkSegmentAsMutable(SegmentId segmentId) {
        synchronized (this.lock) {
            log.debug("SegmentId [%s] is marked as finalized.", new Object[]{segmentId});
            this.mutableSegments.remove(segmentId);
            this.segmentSchemaCache.realtimeSegmentRemoved(segmentId);
        }
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    protected void removeSegmentAction(SegmentId segmentId) {
        log.debug("SegmentId [%s] is removed.", new Object[]{segmentId});
        this.segmentSchemaCache.segmentRemoved(segmentId);
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    protected boolean fetchAggregatorsInSegmentMetadataQuery() {
        return true;
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    protected boolean segmentMetadataQueryResultHandler(String str, SegmentId segmentId, RowSignature rowSignature, SegmentAnalysis segmentAnalysis) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.segmentMetadataInfo.compute(str, (str2, concurrentSkipListMap) -> {
            if (concurrentSkipListMap == null) {
                log.warn("No segment map found with datasource [%s], skipping refresh of segment [%s]", new Object[]{str2, segmentId});
                return null;
            }
            concurrentSkipListMap.compute(segmentId, (segmentId2, availableSegmentMetadata) -> {
                if (availableSegmentMetadata == null) {
                    log.warn("No segment [%s] found, skipping refresh", new Object[]{segmentId});
                    return null;
                }
                long numRows = segmentAnalysis.getNumRows();
                log.debug("Publishing segment schema. SegmentId [%s], RowSignature [%s], numRows [%d]", new Object[]{segmentId, rowSignature, Long.valueOf(numRows)});
                Map<String, AggregatorFactory> aggregators = segmentAnalysis.getAggregators();
                this.segmentSchemaCache.addTemporaryMetadataQueryResult(segmentId, rowSignature, aggregators, numRows);
                this.segmentSchemaBackfillQueue.add(segmentId, rowSignature, aggregators, numRows);
                atomicBoolean.set(true);
                return availableSegmentMetadata;
            });
            if (concurrentSkipListMap.isEmpty()) {
                return null;
            }
            return concurrentSkipListMap;
        });
        return atomicBoolean.get();
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata() {
        return FluentIterable.from(this.segmentMetadataInfo.values()).transformAndConcat((v0) -> {
            return v0.values();
        }).transform(availableSegmentMetadata -> {
            Optional<SchemaPayloadPlus> schemaForSegment = this.segmentSchemaCache.getSchemaForSegment(availableSegmentMetadata.getSegment().getId());
            if (schemaForSegment.isPresent()) {
                return AvailableSegmentMetadata.from(availableSegmentMetadata).withRowSignature(schemaForSegment.get().getSchemaPayload().getRowSignature()).withNumRows(schemaForSegment.get().getNumRows().longValue()).build();
            }
            markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
            return availableSegmentMetadata;
        }).iterator();
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    @Nullable
    public AvailableSegmentMetadata getAvailableSegmentMetadata(String str, SegmentId segmentId) {
        ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> concurrentSkipListMap = this.segmentMetadataInfo.get(str);
        AvailableSegmentMetadata availableSegmentMetadata = null;
        if (concurrentSkipListMap != null) {
            availableSegmentMetadata = concurrentSkipListMap.get(segmentId);
        }
        if (availableSegmentMetadata == null) {
            return null;
        }
        Optional<SchemaPayloadPlus> schemaForSegment = this.segmentSchemaCache.getSchemaForSegment(segmentId);
        if (schemaForSegment.isPresent()) {
            availableSegmentMetadata = AvailableSegmentMetadata.from(availableSegmentMetadata).withRowSignature(schemaForSegment.get().getSchemaPayload().getRowSignature()).withNumRows(schemaForSegment.get().getNumRows().longValue()).build();
        } else {
            markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
        }
        return availableSegmentMetadata;
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    public DataSourceInformation getDatasource(String str) {
        return getMergedDatasourceInformation((DataSourceInformation) this.tables.get(str), this.coldSchemaTable.get(str)).orElse(null);
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    public Map<String, DataSourceInformation> getDataSourceInformationMap() {
        HashMap hashMap = new HashMap(this.tables);
        HashMap hashMap2 = new HashMap(this.coldSchemaTable);
        HashSet<String> hashSet = new HashSet(hashMap.keySet());
        hashSet.addAll(hashMap2.keySet());
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (String str : hashSet) {
            getMergedDatasourceInformation((DataSourceInformation) hashMap.get(str), (DataSourceInformation) hashMap2.get(str)).ifPresent(dataSourceInformation -> {
                builder.put(str, dataSourceInformation);
            });
        }
        return builder.build();
    }

    private Optional<DataSourceInformation> getMergedDatasourceInformation(DataSourceInformation dataSourceInformation, DataSourceInformation dataSourceInformation2) {
        if (dataSourceInformation == null && dataSourceInformation2 == null) {
            return Optional.empty();
        }
        if (dataSourceInformation != null && dataSourceInformation2 == null) {
            return Optional.of(dataSourceInformation);
        }
        if (dataSourceInformation == null && dataSourceInformation2 != null) {
            return Optional.of(dataSourceInformation2);
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataSourceInformation.getRowSignature());
        arrayList.add(dataSourceInformation2.getRowSignature());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            mergeRowSignature(linkedHashMap, (RowSignature) it.next());
        }
        RowSignature.Builder builder = RowSignature.builder();
        Objects.requireNonNull(builder);
        linkedHashMap.forEach(builder::add);
        return Optional.of(new DataSourceInformation(dataSourceInformation.getDataSource(), builder.build()));
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    public void refresh(Set<SegmentId> set, Set<String> set2) throws IOException {
        log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]", new Object[]{set, set2});
        filterRealtimeSegments(set);
        log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]", new Object[]{set});
        Set<SegmentId> filterSegmentWithCachedSchema = filterSegmentWithCachedSchema(set);
        log.debug("SegmentsToRefreshMinusCachedSegments [%s], cachedSegments [%s]", new Object[]{set, filterSegmentWithCachedSchema});
        Set<SegmentId> emptySet = Collections.emptySet();
        if (!this.config.isDisableSegmentMetadataQueries()) {
            emptySet = refreshSegments(set);
            log.debug("Refreshed segments are [%s]", new Object[]{emptySet});
        }
        synchronized (this.lock) {
            this.segmentsNeedingRefresh.addAll(Sets.difference(set, emptySet));
            set2.addAll(this.dataSourcesNeedingRebuild);
            emptySet.forEach(segmentId -> {
                set2.add(segmentId.getDataSource());
            });
            filterSegmentWithCachedSchema.forEach(segmentId2 -> {
                set2.add(segmentId2.getDataSource());
            });
            this.dataSourcesNeedingRebuild.clear();
        }
        log.debug("Datasources to rebuild are [%s]", new Object[]{set2});
        for (String str : set2) {
            RowSignature buildDataSourceRowSignature = buildDataSourceRowSignature(str);
            if (buildDataSourceRowSignature == null) {
                log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", new Object[]{str});
                this.tables.remove(str);
            } else {
                DataSourceInformation dataSourceInformation = new DataSourceInformation(str, buildDataSourceRowSignature);
                DataSourceInformation dataSourceInformation2 = (DataSourceInformation) this.tables.put(str, dataSourceInformation);
                if (dataSourceInformation2 == null || !dataSourceInformation2.getRowSignature().equals(dataSourceInformation.getRowSignature())) {
                    log.info("[%s] has new signature: %s.", new Object[]{str, dataSourceInformation.getRowSignature()});
                } else {
                    log.debug("[%s] signature is unchanged.", new Object[]{str});
                }
            }
        }
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    void logSegmentsToRefresh(String str, Set<SegmentId> set) {
        log.info("Logging a sample of 5 segments [%s] to be refreshed for datasource [%s]", new Object[]{Iterables.limit(set, 5), str});
    }

    private void filterRealtimeSegments(Set<SegmentId> set) {
        synchronized (this.lock) {
            set.removeAll(this.mutableSegments);
        }
    }

    private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> set) {
        HashSet hashSet = new HashSet();
        for (SegmentId segmentId : set) {
            if (this.segmentSchemaCache.isSchemaCached(segmentId)) {
                hashSet.add(segmentId);
            }
        }
        set.removeAll(hashSet);
        return hashSet;
    }

    @Nullable
    private Integer getReplicationFactor(SegmentId segmentId) {
        SegmentReplicaCount replicaCountsInCluster;
        if (this.segmentReplicationStatus == null || (replicaCountsInCluster = this.segmentReplicationStatus.getReplicaCountsInCluster(segmentId)) == null) {
            return null;
        }
        return Integer.valueOf(replicaCountsInCluster.required());
    }

    @VisibleForTesting
    protected void coldDatasourceSchemaExec() {
        Stopwatch createStarted = Stopwatch.createStarted();
        HashSet hashSet = new HashSet();
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        for (ImmutableDruidDataSource immutableDruidDataSource : this.sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()) {
            i++;
            Collection<DataSegment> segments = immutableDruidDataSource.getSegments();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            int i4 = 0;
            int i5 = 0;
            for (DataSegment dataSegment : segments) {
                Integer replicationFactor = getReplicationFactor(dataSegment.getId());
                if (replicationFactor == null || replicationFactor.intValue() == 0) {
                    Optional<SchemaPayloadPlus> schemaForSegment = this.segmentSchemaCache.getSchemaForSegment(dataSegment.getId());
                    if (schemaForSegment.isPresent()) {
                        mergeRowSignature(linkedHashMap, schemaForSegment.get().getSchemaPayload().getRowSignature());
                        i5++;
                    }
                    i4++;
                }
            }
            if (i4 != 0) {
                i3 += i4;
                String name = immutableDruidDataSource.getName();
                ServiceMetricEvent.Builder dimension = new ServiceMetricEvent.Builder().setDimension("dataSource", name);
                this.emitter.emit(dimension.setMetric("metadatacache/deepStorageOnly/segment/count", Integer.valueOf(i4)));
                if (!linkedHashMap.isEmpty()) {
                    RowSignature.Builder builder = RowSignature.builder();
                    Objects.requireNonNull(builder);
                    linkedHashMap.forEach(builder::add);
                    RowSignature build = builder.build();
                    hashSet.add(name);
                    i2++;
                    DataSourceInformation dataSourceInformation = new DataSourceInformation(name, build);
                    DataSourceInformation put = this.coldSchemaTable.put(name, dataSourceInformation);
                    if (put == null || !put.getRowSignature().equals(dataSourceInformation.getRowSignature())) {
                        log.info("[%s] has new cold signature: %s.", new Object[]{immutableDruidDataSource, dataSourceInformation.getRowSignature()});
                    } else {
                        log.debug("[%s] signature is unchanged.", new Object[]{immutableDruidDataSource});
                    }
                    this.emitter.emit(dimension.setMetric("metadatacache/deepStorageOnly/refresh/count", Integer.valueOf(i5)));
                    log.debug("[%s] signature from cold segments is [%s]", new Object[]{name, build});
                }
            }
        }
        this.coldSchemaTable.keySet().retainAll(hashSet);
        this.emitter.emit(new ServiceMetricEvent.Builder().setMetric("metadatacache/deepStorageOnly/process/time", Long.valueOf(createStarted.millisElapsed())));
        String format = StringUtils.format("Cold schema processing took [%d] millis. Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segment schema.", new Object[]{Long.valueOf(createStarted.millisElapsed()), Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(i2)});
        if (createStarted.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS.longValue()) {
            log.info(format, new Object[0]);
        } else {
            log.debug(format, new Object[0]);
        }
    }

    private void mergeRowSignature(Map<String, ColumnType> map, RowSignature rowSignature) {
        for (String str : rowSignature.getColumnNames()) {
            ColumnType columnType = (ColumnType) rowSignature.getColumnType(str).orElseThrow(() -> {
                return new ISE("Encountered null type for column [%s]", new Object[]{str});
            });
            map.compute(str, (str2, columnType2) -> {
                return this.columnTypeMergePolicy.merge(columnType2, columnType);
            });
        }
    }

    @Override // org.apache.druid.segment.metadata.AbstractSegmentMetadataCache
    @VisibleForTesting
    @Nullable
    public RowSignature buildDataSourceRowSignature(String str) {
        ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata> concurrentSkipListMap = this.segmentMetadataInfo.get(str);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (concurrentSkipListMap == null || concurrentSkipListMap.isEmpty()) {
            return null;
        }
        for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : concurrentSkipListMap.entrySet()) {
            Optional<SchemaPayloadPlus> schemaForSegment = this.segmentSchemaCache.getSchemaForSegment(entry.getKey());
            if (schemaForSegment.isPresent()) {
                mergeRowSignature(linkedHashMap, schemaForSegment.get().getSchemaPayload().getRowSignature());
            } else {
                markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
            }
        }
        RowSignature.Builder builder = RowSignature.builder();
        Objects.requireNonNull(builder);
        linkedHashMap.forEach(builder::add);
        return builder.build();
    }

    @VisibleForTesting
    void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas) {
        log.debug("SchemaUpdate for realtime segments [%s].", new Object[]{segmentSchemas});
        for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemas.getSegmentSchemaList()) {
            String dataSource = segmentSchema.getDataSource();
            SegmentId tryParse = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId());
            if (tryParse == null) {
                log.error("Could not apply schema update. Failed parsing segmentId [%s]", new Object[]{segmentSchema.getSegmentId()});
            } else {
                log.debug("Applying schema update for segmentId [%s] datasource [%s]", new Object[]{tryParse, dataSource});
                this.segmentMetadataInfo.compute(dataSource, (str, concurrentSkipListMap) -> {
                    if (concurrentSkipListMap == null) {
                        log.warn("No segment map found with datasource [%s], skipping refresh of segment [%s]", new Object[]{str, tryParse});
                        return null;
                    }
                    concurrentSkipListMap.compute(tryParse, (segmentId, availableSegmentMetadata) -> {
                        if (availableSegmentMetadata == null) {
                            log.makeAlert("Schema update [%s] for unknown segment [%s]", new Object[]{segmentSchema, tryParse}).emit();
                        } else {
                            Optional<RowSignature> mergeOrCreateRowSignature = mergeOrCreateRowSignature(tryParse, (RowSignature) this.segmentSchemaCache.getSchemaForSegment(tryParse).map(schemaPayloadPlus -> {
                                return schemaPayloadPlus.getSchemaPayload().getRowSignature();
                            }).orElse(null), segmentSchema);
                            if (mergeOrCreateRowSignature.isPresent()) {
                                log.debug("Segment [%s] signature [%s] after applying schema update.", new Object[]{tryParse, mergeOrCreateRowSignature.get()});
                                this.segmentSchemaCache.addRealtimeSegmentSchema(tryParse, mergeOrCreateRowSignature.get(), segmentSchema.getNumRows().intValue());
                                markDataSourceAsNeedRebuild(dataSource);
                            }
                        }
                        return availableSegmentMetadata;
                    });
                    return concurrentSkipListMap;
                });
            }
        }
    }

    @VisibleForTesting
    Optional<RowSignature> mergeOrCreateRowSignature(SegmentId segmentId, @Nullable RowSignature rowSignature, SegmentSchemas.SegmentSchema segmentSchema) {
        if (!segmentSchema.isDelta()) {
            RowSignature.Builder builder = RowSignature.builder();
            Map<String, ColumnType> columnTypeMap = segmentSchema.getColumnTypeMap();
            for (String str : segmentSchema.getNewColumns()) {
                builder.add(str, columnTypeMap.get(str));
            }
            return Optional.of((RowSignature) ROW_SIGNATURE_INTERNER.intern(builder.build()));
        }
        if (rowSignature == null) {
            log.makeAlert("Received delta schema update [%s] for a segment [%s] with no previous schema. ", new Object[]{segmentSchema, segmentId}).emit();
            return Optional.empty();
        }
        RowSignature.Builder builder2 = RowSignature.builder();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str2 : rowSignature.getColumnNames()) {
            linkedHashMap.put(str2, (ColumnType) rowSignature.getColumnType(str2).orElseThrow(() -> {
                return new ISE("Encountered null type for column [%s]", new Object[]{str2});
            }));
        }
        Map<String, ColumnType> columnTypeMap2 = segmentSchema.getColumnTypeMap();
        HashSet hashSet = new HashSet();
        for (String str3 : segmentSchema.getUpdatedColumns()) {
            if (linkedHashMap.containsKey(str3)) {
                linkedHashMap.compute(str3, (str4, columnType) -> {
                    return this.columnTypeMergePolicy.merge(columnType, (ColumnType) columnTypeMap2.get(str3));
                });
            } else {
                hashSet.add(str3);
                linkedHashMap.put(str3, columnTypeMap2.get(str3));
            }
        }
        for (String str5 : segmentSchema.getNewColumns()) {
            if (linkedHashMap.containsKey(str5)) {
                linkedHashMap.compute(str5, (str6, columnType2) -> {
                    return this.columnTypeMergePolicy.merge(columnType2, (ColumnType) columnTypeMap2.get(str5));
                });
            } else {
                linkedHashMap.put(str5, columnTypeMap2.get(str5));
            }
        }
        if (!hashSet.isEmpty()) {
            log.makeAlert("Datasource schema mismatch detected. The delta realtime segment schema contains columns that are not defined in the datasource schema. This indicates a potential issue with schema updates on the Coordinator. Please review relevant Coordinator metrics and logs for task communication to identify any issues.", new Object[0]).addData(DatasourceDefn.TABLE_TYPE, segmentId.getDataSource()).addData("existingSignature", rowSignature).addData("deltaSchema", segmentSchema).addData("missingUpdateColumns", hashSet).emit();
        }
        Objects.requireNonNull(builder2);
        linkedHashMap.forEach(builder2::add);
        return Optional.of((RowSignature) ROW_SIGNATURE_INTERNER.intern(builder2.build()));
    }

    private void markSegmentForRefreshIfNeeded(DataSegment dataSegment) {
        SegmentId id = dataSegment.getId();
        log.debug("SchemaMetadata for segmentId [%s] is absent.", new Object[]{id});
        if (dataSegment.isTombstone()) {
            log.debug("Skipping refresh for tombstone segment [%s].", new Object[]{id});
            return;
        }
        ImmutableDruidDataSource immutableDataSourceWithUsedSegments = this.sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSegment.getDataSource());
        if (immutableDataSourceWithUsedSegments == null || immutableDataSourceWithUsedSegments.getSegment(id) == null) {
            log.debug("Skipping refresh for unused segment [%s].", new Object[]{id});
        } else {
            markSegmentAsNeedRefresh(id);
        }
    }
}
