package org.apache.druid.server.coordinator.duty;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManagerTestBase;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.class */
public class KillUnusedSegmentsTest {
    private static final String VERSION = "v1";
    private CoordinatorDynamicConfig.Builder dynamicConfigBuilder;
    private TestOverlordClient overlordClient;
    private KillUnusedSegmentsConfig.Builder configBuilder;
    private DruidCoordinatorRuntimeParams.Builder paramsBuilder;
    private KillUnusedSegments killDuty;

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
    private SQLMetadataConnector connector;
    private MetadataStorageTablesConfig config;
    private static final DateTime NOW = DateTimes.nowUtc();
    private static final Interval YEAR_OLD = new Interval(Period.days(1), NOW.minusDays(365));
    private static final Interval MONTH_OLD = new Interval(Period.days(1), NOW.minusDays(30));
    private static final Interval FIFTEEN_DAY_OLD = new Interval(Period.days(1), NOW.minusDays(15));
    private static final Interval DAY_OLD = new Interval(Period.days(1), NOW.minusDays(1));
    private static final Interval HOUR_OLD = new Interval(Period.days(1), NOW.minusHours(1));
    private static final Interval NEXT_DAY = new Interval(Period.days(1), NOW.plusDays(1));
    private static final Interval NEXT_MONTH = new Interval(Period.days(1), NOW.plusDays(30));
    private static final String DS1 = "DS1";
    private static final RowKey DS1_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS1);
    private static final String DS2 = "DS2";
    private static final RowKey DS2_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS2);
    private static final String DS3 = "DS3";
    private static final RowKey DS3_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DS3);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest$TestOverlordClient.class */
    public static class TestOverlordClient extends NoopOverlordClient {
        private final List<TaskStatusPlus> taskStatuses;
        private final Map<String, Interval> observedDatasourceToLastKillInterval;
        private final Map<String, String> observedDatasourceToLastKillTaskId;
        private final IndexingTotalWorkerCapacityInfo capcityInfo;
        private int taskIdSuffix;

        TestOverlordClient() {
            this.taskStatuses = new ArrayList();
            this.observedDatasourceToLastKillInterval = new HashMap();
            this.observedDatasourceToLastKillTaskId = new HashMap();
            this.taskIdSuffix = 0;
            this.capcityInfo = new IndexingTotalWorkerCapacityInfo(5, 10);
        }

        TestOverlordClient(int i, int i2) {
            this.taskStatuses = new ArrayList();
            this.observedDatasourceToLastKillInterval = new HashMap();
            this.observedDatasourceToLastKillTaskId = new HashMap();
            this.taskIdSuffix = 0;
            this.capcityInfo = new IndexingTotalWorkerCapacityInfo(i, i2);
        }

        static String getTaskId(String str, String str2, Interval interval) {
            return str + "-" + str2 + "-" + String.valueOf(interval);
        }

        void addTask(String str) {
            List<TaskStatusPlus> list = this.taskStatuses;
            int i = this.taskIdSuffix;
            this.taskIdSuffix = i + 1;
            list.add(new TaskStatusPlus("coordinator-issued__" + str + "__" + i, (String) null, "kill", DateTimes.EPOCH, DateTimes.EPOCH, TaskState.RUNNING, RunnerTaskState.RUNNING, 100L, TaskLocation.unknown(), str, (String) null));
        }

        public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(@Nullable String str, @Nullable String str2, @Nullable Integer num) {
            return Futures.immediateFuture(CloseableIterators.wrap(this.taskStatuses.iterator(), (Closeable) null));
        }

        public ListenableFuture<String> runKillTask(String str, String str2, Interval interval, @Nullable List<String> list, @Nullable Integer num, @Nullable DateTime dateTime) {
            String taskId = getTaskId(str, str2, interval);
            this.observedDatasourceToLastKillInterval.put(str2, interval);
            this.observedDatasourceToLastKillTaskId.put(str2, taskId);
            return Futures.immediateFuture(taskId);
        }

        public ListenableFuture<IndexingTotalWorkerCapacityInfo> getTotalWorkerCapacity() {
            return Futures.immediateFuture(this.capcityInfo);
        }

        public ListenableFuture<List<IndexingWorkerInfo>> getWorkers() {
            return Futures.immediateFuture(ImmutableList.of(new IndexingWorkerInfo(new IndexingWorker("http", "localhost", "1.2.3.4", 3, "2"), 0, Collections.emptySet(), Collections.emptyList(), DateTimes.of("2000"), (DateTime) null)));
        }

        Interval getLastKillInterval(String str) {
            return this.observedDatasourceToLastKillInterval.get(str);
        }

        void deleteLastKillInterval(String str) {
            this.observedDatasourceToLastKillInterval.remove(str);
        }

        String getLastKillTaskId(String str) {
            String str2 = this.observedDatasourceToLastKillTaskId.get(str);
            this.observedDatasourceToLastKillTaskId.remove(str);
            return str2;
        }

        void deleteLastKillTaskId(String str) {
            this.observedDatasourceToLastKillTaskId.remove(str);
        }
    }

    @Before
    public void setup() {
        this.connector = this.derbyConnectorRule.getConnector();
        this.sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(TestHelper.makeJsonMapper(), Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(1), (SegmentMetadataCache.UsageMode) null)), this.derbyConnectorRule.metadataTablesConfigSupplier(), this.connector, (SegmentSchemaCache) null, CentralizedDatasourceSchemaConfig.create(), NoopServiceEmitter.instance());
        this.sqlSegmentsMetadataManager.start();
        this.config = (MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get();
        this.connector.createSegmentTable();
        this.overlordClient = new TestOverlordClient();
        this.configBuilder = KillUnusedSegmentsConfig.builder().withCleanupPeriod(Duration.standardSeconds(0L)).withDurationToRetain(Duration.standardHours(36L)).withMaxSegmentsToKill(10).withMaxIntervalToKill(Period.ZERO).withBufferPeriod(Duration.standardSeconds(1L));
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(Double.valueOf(1.0d));
        this.paramsBuilder = DruidCoordinatorRuntimeParams.builder().withUsedSegments(Collections.emptySet());
    }

    @Test
    public void testKillWithDefaultCoordinatorConfig() {
        this.configBuilder = KillUnusedSegmentsConfig.builder();
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        DateTime minusDays = NOW.minusDays(60);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, minusDays);
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillWithDefaultCoordinatorConfigPlusZeroMaxIntervalToKill() {
        this.configBuilder = KillUnusedSegmentsConfig.builder().withMaxIntervalToKill(Period.ZERO);
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        DateTime minusDays = NOW.minusDays(60);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, minusDays);
        createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, minusDays);
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillWithNoDatasources() {
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillWithMultipleDatasources() {
        this.configBuilder.withIgnoreDurationToRetain(true).withMaxSegmentsToKill(2);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
        validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(3L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
        validateLastKillStateAndReset(DS2, NEXT_DAY);
        CoordinatorRunStats runDutyAndGetStats3 = runDutyAndGetStats();
        Assert.assertEquals(30L, runDutyAndGetStats3.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(5L, runDutyAndGetStats3.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(30L, runDutyAndGetStats3.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(5L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        validateLastKillStateAndReset(DS1, NEXT_MONTH);
        validateLastKillStateAndReset(DS2, null);
    }

    @Test
    public void testRoundRobinKillMultipleDatasources() {
        this.configBuilder.withIgnoreDurationToRetain(true).withMaxSegmentsToKill(2);
        this.dynamicConfigBuilder.withMaxKillTaskSlots(2);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS3, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS3, NEXT_DAY, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats3 = runDutyAndGetStats();
        Assert.assertEquals(6L, runDutyAndGetStats3.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(6L, runDutyAndGetStats3.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(6L, runDutyAndGetStats3.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats4 = runDutyAndGetStats();
        Assert.assertEquals(8L, runDutyAndGetStats4.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(7L, runDutyAndGetStats4.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(8L, runDutyAndGetStats4.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(5L, runDutyAndGetStats4.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(3L, runDutyAndGetStats4.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
    }

    @Test
    public void testRoundRobinKillWhenDatasourcesChange() {
        this.configBuilder.withIgnoreDurationToRetain(true).withMaxSegmentsToKill(2);
        this.dynamicConfigBuilder.withMaxKillTaskSlots(1);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
        createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats3 = runDutyAndGetStats();
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats4 = runDutyAndGetStats();
        Assert.assertEquals(4L, runDutyAndGetStats4.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(4L, runDutyAndGetStats4.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(4L, runDutyAndGetStats4.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats4.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
    }

    @Test
    public void testKillSingleDatasourceMultipleRuns() {
        this.configBuilder.withIgnoreDurationToRetain(true).withMaxSegmentsToKill(2);
        this.dynamicConfigBuilder.withMaxKillTaskSlots(2);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(4L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        CoordinatorRunStats runDutyAndGetStats3 = runDutyAndGetStats();
        Assert.assertEquals(6L, runDutyAndGetStats3.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats3.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(6L, runDutyAndGetStats3.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
    }

    @Test
    public void testKillWithDifferentLastUpdatedTimesInWideInterval() {
        this.configBuilder.withIgnoreDurationToRetain(true).withBufferPeriod(Duration.standardDays(3L));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(4L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_MONTH.getEnd()));
    }

    @Test
    public void testAddOlderSegmentsAfterInitialRun() {
        this.configBuilder.withIgnoreDurationToRetain(true).withMaxSegmentsToKill(2);
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, NEXT_MONTH);
        CoordinatorRunStats runDutyAndGetStats3 = runDutyAndGetStats();
        Assert.assertEquals(30L, runDutyAndGetStats3.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats3.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(30L, runDutyAndGetStats3.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats3.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, null);
        CoordinatorRunStats runDutyAndGetStats4 = runDutyAndGetStats();
        Assert.assertEquals(40L, runDutyAndGetStats4.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats4.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(40L, runDutyAndGetStats4.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(5L, runDutyAndGetStats4.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
    }

    @Test
    public void testDatasoucesAllowList() {
        this.dynamicConfigBuilder.withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of(DS2, DS3));
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS3, MONTH_OLD, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
        validateLastKillStateAndReset(DS1, null);
        validateLastKillStateAndReset(DS2, YEAR_OLD);
        validateLastKillStateAndReset(DS3, MONTH_OLD);
    }

    @Test
    public void testNegativeDurationToRetain() {
        this.configBuilder.withDurationToRetain(Duration.standardHours(36L).negated());
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(5L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_DAY.getEnd()));
    }

    @Test
    public void testIgnoreDurationToRetain() {
        this.configBuilder.withIgnoreDurationToRetain(true);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(6L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), NEXT_MONTH.getEnd()));
    }

    @Test
    public void testLowerMaxSegmentsToKill() {
        this.configBuilder.withMaxSegmentsToKill(1);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLowerMaxIntervalToKill() {
        this.configBuilder.withMaxIntervalToKill(Period.hours(1));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testMaxIntervalToKillOverridesDurationToRetain() {
        this.configBuilder.withDurationToRetain(Period.hours(6).toStandardDuration()).withMaxIntervalToKill(Period.days(20));
        initDuty();
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29));
        Assert.assertEquals(1L, runDutyAndGetStats().get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, MONTH_OLD);
        createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusHours(2));
        Assert.assertEquals(2L, runDutyAndGetStats().get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, FIFTEEN_DAY_OLD);
    }

    @Test
    public void testDurationToRetainOverridesMaxIntervalToKill() {
        this.configBuilder.withDurationToRetain(Period.days(20).toStandardDuration()).withMaxIntervalToKill(Period.days(350));
        initDuty();
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(29));
        Assert.assertEquals(1L, runDutyAndGetStats().get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, YEAR_OLD);
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(29));
        createAndAddUnusedSegment(DS1, FIFTEEN_DAY_OLD, VERSION, NOW.minusDays(14));
        Assert.assertEquals(2L, runDutyAndGetStats().get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, MONTH_OLD);
    }

    @Test
    public void testHigherMaxIntervalToKill() {
        this.configBuilder.withMaxIntervalToKill(Period.days(360));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, JodaUtils.umbrellaInterval(Arrays.asList(YEAR_OLD, MONTH_OLD)));
    }

    @Test
    public void testKillDatasourceWithNoUnusedSegmentsInInitialRun() {
        this.configBuilder.withMaxSegmentsToKill(1);
        createAndAddUsedSegment(DS1, YEAR_OLD, VERSION);
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(20L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLargeKillPeriod() {
        this.configBuilder.withCleanupPeriod(Duration.standardHours(1L));
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, HOUR_OLD, VERSION, NOW.minusDays(2));
        createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(10));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
        CoordinatorRunStats runDutyAndGetStats2 = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats2.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats2.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats2.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats2.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, null);
    }

    @Test
    public void testKillTaskSlotAtCapacity() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.3d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(2);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS2, NEXT_MONTH, VERSION, NOW.minusDays(1));
        createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
        validateLastKillStateAndReset(DS2, YEAR_OLD);
        validateLastKillStateAndReset(DS3, null);
    }

    @Test
    public void testKillWithOverlordTaskSlotsFull() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.1d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(10);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.overlordClient = new TestOverlordClient(1, 5);
        this.overlordClient.addTask(DS1);
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillWithOverlordTaskSlotAvailable() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(3);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        this.overlordClient = new TestOverlordClient(3, 10);
        this.overlordClient.addTask(DS1);
        this.overlordClient.addTask(DS1);
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(3L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testDefaultKillTaskSlotStats() {
        this.dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats1() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats2() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.0d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats3() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(1.0d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(0);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats4() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.1d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(3);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillTaskSlotStats5() {
        this.dynamicConfigBuilder.withKillTaskSlotRatio(Double.valueOf(0.3d));
        this.dynamicConfigBuilder.withMaxKillTaskSlots(2);
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(0L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
    }

    @Test
    public void testKillFirstHalfEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(true);
        Interval interval = new Interval(DateTimes.MIN, DateTimes.of("2024"));
        createAndAddUnusedSegment(DS1, interval, VERSION, NOW.minusDays(60));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, interval);
    }

    @Test
    public void testKillEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(true);
        createAndAddUnusedSegment(DS1, Intervals.ETERNITY, VERSION, NOW.minusDays(60));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
    }

    @Test
    public void testKillSecondHalfEternitySegment() {
        this.configBuilder.withIgnoreDurationToRetain(true);
        Interval interval = new Interval(DateTimes.of("1970"), DateTimes.MAX);
        createAndAddUnusedSegment(DS1, interval, VERSION, NOW.minusDays(60));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, interval);
    }

    @Test
    public void testKillLargeIntervalSegments() {
        Interval of = Intervals.of("1990-01-01T00Z/19940-01-01T00Z");
        Interval of2 = Intervals.of("-19940-01-01T00Z/1970-01-01T00Z");
        createAndAddUnusedSegment(DS1, of, VERSION, NOW.minusDays(60));
        createAndAddUnusedSegment(DS1, of2, VERSION, NOW.minusDays(60));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(2L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, new Interval(of2.getStart(), of.getEnd()));
    }

    @Test
    public void testKillMultipleSegmentsInSameInterval() {
        this.configBuilder.withIgnoreDurationToRetain(true);
        createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, YEAR_OLD, "v2", NOW.minusDays(10));
        createAndAddUnusedSegment(DS1, YEAR_OLD, "v3", NOW.minusDays(10));
        initDuty();
        CoordinatorRunStats runDutyAndGetStats = runDutyAndGetStats();
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.AVAILABLE_SLOTS));
        Assert.assertEquals(1L, runDutyAndGetStats.get(Stats.Kill.SUBMITTED_TASKS));
        Assert.assertEquals(10L, runDutyAndGetStats.get(Stats.Kill.MAX_SLOTS));
        Assert.assertEquals(3L, runDutyAndGetStats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
        validateLastKillStateAndReset(DS1, YEAR_OLD);
    }

    @Test
    public void testLimitToPeriod_empty() {
        Assert.assertEquals(Collections.emptyList(), KillUnusedSegments.limitToPeriod(Collections.emptyList(), Period.ZERO));
    }

    @Test
    public void testLimitToPeriod_zeroPeriod() {
        Assert.assertEquals(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), KillUnusedSegments.limitToPeriod(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), Period.ZERO));
    }

    @Test
    public void testLimitToPeriod_oneSecondPeriod() {
        Assert.assertEquals(ImmutableList.of(YEAR_OLD), KillUnusedSegments.limitToPeriod(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), Period.seconds(1)));
    }

    @Test
    public void testLimitToPeriod_360DayPeriod() {
        Assert.assertEquals(ImmutableList.of(YEAR_OLD, MONTH_OLD), KillUnusedSegments.limitToPeriod(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), Period.days(360)));
    }

    @Test
    public void testLimitToPeriod_1YearPeriod() {
        Assert.assertEquals(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), KillUnusedSegments.limitToPeriod(ImmutableList.of(DAY_OLD, YEAR_OLD, MONTH_OLD), Period.years(1)));
    }

    private void validateLastKillStateAndReset(String str, @Nullable Interval interval) {
        Interval lastKillInterval = this.overlordClient.getLastKillInterval(str);
        String lastKillTaskId = this.overlordClient.getLastKillTaskId(str);
        Assert.assertEquals(interval, lastKillInterval);
        String str2 = null;
        if (interval != null) {
            str2 = TestOverlordClient.getTaskId("coordinator-issued", str, interval);
        }
        Assert.assertEquals(str2, lastKillTaskId);
        this.overlordClient.deleteLastKillTaskId(str);
        this.overlordClient.deleteLastKillInterval(str);
    }

    private DataSegment createAndAddUsedSegment(String str, Interval interval, String str2) {
        DataSegment createSegment = createSegment(str, interval, str2);
        try {
            SqlSegmentsMetadataManagerTestBase.publishSegment(this.connector, this.config, TestHelper.makeJsonMapper(), createSegment);
            return createSegment;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void createAndAddUnusedSegment(String str, Interval interval, String str2, DateTime dateTime) {
        DataSegment createAndAddUsedSegment = createAndAddUsedSegment(str, interval, str2);
        this.sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(createAndAddUsedSegment.getId()));
        this.derbyConnectorRule.segments().updateUsedStatusLastUpdated(createAndAddUsedSegment.getId().toString(), dateTime);
    }

    private DataSegment createSegment(String str, Interval interval, String str2) {
        return new DataSegment(str, interval, str2, new HashMap(), new ArrayList(), new ArrayList(), NoneShardSpec.instance(), 1, 0L);
    }

    private void initDuty() {
        this.killDuty = new KillUnusedSegments(this.sqlSegmentsMetadataManager, this.overlordClient, this.configBuilder.build());
    }

    private CoordinatorRunStats runDutyAndGetStats() {
        this.paramsBuilder.withDynamicConfigs(this.dynamicConfigBuilder.build());
        return this.killDuty.run(this.paramsBuilder.build()).getCoordinatorStats();
    }
}
