package org.apache.druid.segment.metadata;

import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.class */
public class SegmentMetadataQuerySegmentWalker implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SegmentMetadataQuerySegmentWalker.class);
    private final CoordinatorServerView serverView;
    private final DruidHttpClientConfig httpClientConfig;
    private final ServiceEmitter emitter;
    protected final QueryRunnerFactoryConglomerate conglomerate;
    protected final ServerConfig serverConfig;

    @Inject
    public SegmentMetadataQuerySegmentWalker(CoordinatorServerView coordinatorServerView, DruidHttpClientConfig druidHttpClientConfig, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ServerConfig serverConfig, ServiceEmitter serviceEmitter) {
        this.serverView = coordinatorServerView;
        this.httpClientConfig = druidHttpClientConfig;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.emitter = serviceEmitter;
        this.serverConfig = serverConfig;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        throw new UnsupportedOperationException();
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, final Iterable<SegmentDescriptor> iterable) {
        return decorateRunner(query, new QueryRunner<T>() { // from class: org.apache.druid.segment.metadata.SegmentMetadataQuerySegmentWalker.1
            public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
                return SegmentMetadataQuerySegmentWalker.this.run(queryPlus, responseContext, new CachingClusteredClient.TimelineConverter(iterable));
            }
        });
    }

    private <T> QueryRunner<T> decorateRunner(Query<T> query, QueryRunner<T> queryRunner) {
        return FluentQueryRunner.create(new SetAndVerifyContextQueryRunner(this.serverConfig, queryRunner), this.conglomerate.getToolChest(query)).emitCPUTimeMetric(this.emitter);
    }

    private <T> Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext, UnaryOperator<TimelineLookup<String, SegmentLoadInfo>> unaryOperator) {
        Query<T> query = queryPlus.getQuery();
        VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = this.serverView.getTimeline(query.getDataSource());
        if (timeline == null) {
            return Sequences.empty();
        }
        TimelineLookup<String, SegmentLoadInfo> timelineLookup = (TimelineLookup) unaryOperator.apply(timeline);
        QueryToolChest<T, Query<T>> toolChest = this.conglomerate.getToolChest(query);
        Set<Pair<SegmentDescriptor, SegmentLoadInfo>> computeSegmentsToQuery = computeSegmentsToQuery(timelineLookup, query, toolChest);
        QueryPlus withQueryMetrics = queryPlus.withQueryMetrics(toolChest);
        withQueryMetrics.getQueryMetrics().reportQueriedSegmentCount(computeSegmentsToQuery.size()).emit(this.emitter);
        SortedMap<String, List<SegmentDescriptor>> groupSegmentsByServer = groupSegmentsByServer(computeSegmentsToQuery, query);
        ArrayList arrayList = new ArrayList(groupSegmentsByServer.size());
        groupSegmentsByServer.forEach((str, list) -> {
            QueryRunner queryRunner = this.serverView.getQueryRunner(str);
            if (queryRunner == null) {
                log.error("Server [%s] doesn't have a query runner", new Object[]{str});
            } else {
                arrayList.add(getServerResults(queryRunner, withQueryMetrics, responseContext, this.httpClientConfig.getMaxQueuedBytes() / groupSegmentsByServer.size(), list));
            }
        });
        return merge(withQueryMetrics.getQuery(), arrayList);
    }

    <T> Sequence<T> getServerResults(QueryRunner queryRunner, QueryPlus<T> queryPlus, ResponseContext responseContext, long j, List<SegmentDescriptor> list) {
        return queryRunner.run(queryPlus.withQuery(Queries.withSpecificSegments(queryPlus.getQuery(), list)).withMaxQueuedBytes(j), responseContext);
    }

    private <T> Set<Pair<SegmentDescriptor, SegmentLoadInfo>> computeSegmentsToQuery(TimelineLookup<String, SegmentLoadInfo> timelineLookup, Query<T> query, QueryToolChest<T, Query<T>> queryToolChest) {
        Objects.requireNonNull(timelineLookup);
        Function function = timelineLookup::lookupWithIncompletePartitions;
        List<TimelineObjectHolder> filterSegments = queryToolChest.filterSegments(query, (List) query.getIntervals().stream().flatMap(interval -> {
            return ((List) function.apply(interval)).stream();
        }).collect(Collectors.toList()));
        HashSet hashSet = new HashSet();
        for (TimelineObjectHolder timelineObjectHolder : filterSegments) {
            for (PartitionChunk partitionChunk : Sets.newHashSet(timelineObjectHolder.getObject())) {
                hashSet.add(new Pair(new SegmentDescriptor(timelineObjectHolder.getInterval(), (String) timelineObjectHolder.getVersion(), partitionChunk.getChunkNumber()), (SegmentLoadInfo) partitionChunk.getObject()));
            }
        }
        return hashSet;
    }

    private <T> SortedMap<String, List<SegmentDescriptor>> groupSegmentsByServer(Set<Pair<SegmentDescriptor, SegmentLoadInfo>> set, Query<T> query) {
        TreeMap treeMap = new TreeMap();
        for (Pair<SegmentDescriptor, SegmentLoadInfo> pair : set) {
            DruidServerMetadata pickOne = ((SegmentLoadInfo) pair.rhs).pickOne();
            if (pickOne == null) {
                log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", new Object[]{pair.lhs, query.getDataSource()}).emit();
            } else {
                ((List) treeMap.computeIfAbsent(pickOne.getName(), str -> {
                    return new ArrayList();
                })).add((SegmentDescriptor) pair.lhs);
            }
        }
        return treeMap;
    }

    private <T> Sequence<T> merge(Query<T> query, List<Sequence<T>> list) {
        return Sequences.simple(list).flatMerge(sequence -> {
            return sequence;
        }, query.getResultOrdering());
    }
}
