package org.apache.druid.server.coordinator.duty;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.duty.MetadataAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.class */
public class UnloadUnusedSegments implements CoordinatorDuty {
    private static final Logger log = new Logger(UnloadUnusedSegments.class);
    private final SegmentLoadQueueManager loadQueueManager;
    private final MetadataAction.GetDatasourceRules ruleHandler;

    public UnloadUnusedSegments(SegmentLoadQueueManager segmentLoadQueueManager, MetadataAction.GetDatasourceRules getDatasourceRules) {
        this.ruleHandler = getDatasourceRules;
        this.loadQueueManager = segmentLoadQueueManager;
    }

    @Override // org.apache.druid.server.coordinator.duty.CoordinatorDuty
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = druidCoordinatorRuntimeParams.getBroadcastDatasources().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), true);
        }
        List<ServerHolder> allServers = druidCoordinatorRuntimeParams.getDruidCluster().getAllServers();
        int sum = allServers.stream().mapToInt(serverHolder -> {
            return cancelLoadOfUnusedSegments(serverHolder, hashMap, druidCoordinatorRuntimeParams);
        }).sum();
        CoordinatorRunStats coordinatorStats = druidCoordinatorRuntimeParams.getCoordinatorStats();
        int sum2 = allServers.stream().mapToInt(serverHolder2 -> {
            return dropUnusedSegments(serverHolder2, druidCoordinatorRuntimeParams, coordinatorStats, hashMap);
        }).sum();
        if (sum > 0 || sum2 > 0) {
            log.debug("Cancelled [%d] loads and started [%d] drops of unused segments.", new Object[]{Integer.valueOf(sum), Integer.valueOf(sum2)});
        }
        return druidCoordinatorRuntimeParams;
    }

    private int dropUnusedSegments(ServerHolder serverHolder, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, CoordinatorRunStats coordinatorRunStats, Map<String, Boolean> map) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ImmutableDruidServer server = serverHolder.getServer();
        for (ImmutableDruidDataSource immutableDruidDataSource : server.getDataSources()) {
            if (!shouldSkipUnload(serverHolder, immutableDruidDataSource.getName(), map)) {
                int i = 0;
                for (DataSegment dataSegment : immutableDruidDataSource.getSegments()) {
                    if (!druidCoordinatorRuntimeParams.isUsedSegment(dataSegment) && this.loadQueueManager.dropSegment(dataSegment, serverHolder)) {
                        i++;
                        log.debug("Dropping uneeded segment[%s] from server[%s] in tier[%s].", new Object[]{dataSegment.getId(), server.getName(), server.getTier()});
                    }
                }
                if (i > 0) {
                    coordinatorRunStats.addToSegmentStat(Stats.Segments.UNNEEDED, server.getTier(), immutableDruidDataSource.getName(), i);
                    atomicInteger.addAndGet(i);
                }
            }
        }
        return atomicInteger.get();
    }

    private int cancelLoadOfUnusedSegments(ServerHolder serverHolder, Map<String, Boolean> map, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        serverHolder.getQueuedSegments().forEach((dataSegment, segmentAction) -> {
            if (!shouldSkipUnload(serverHolder, dataSegment.getDataSource(), map) && !druidCoordinatorRuntimeParams.isUsedSegment(dataSegment) && segmentAction.isLoad() && serverHolder.cancelOperation(segmentAction, dataSegment)) {
                atomicInteger.incrementAndGet();
            }
        });
        return atomicInteger.get();
    }

    private boolean shouldSkipUnload(ServerHolder serverHolder, String str, Map<String, Boolean> map) {
        return serverHolder.isRealtimeServer() && !map.computeIfAbsent(str, this::isBroadcastDatasource).booleanValue();
    }

    private boolean isBroadcastDatasource(String str) {
        return this.ruleHandler.getRulesWithDefault(str).stream().anyMatch(rule -> {
            return rule instanceof BroadcastDistributionRule;
        });
    }
}
