package org.apache.druid.metadata.segment.cache;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.class */
public class HeapMemorySegmentMetadataCacheTest {

    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private BlockingExecutorService pollExecutor;
    private ScheduledExecutorFactory executorFactory;
    private TestDerbyConnector derbyConnector;
    private StubServiceEmitter serviceEmitter;
    private HeapMemorySegmentMetadataCache cache;

    @Before
    public void setup() {
        this.pollExecutor = new BlockingExecutorService("test-poll-exec");
        this.executorFactory = (i, str) -> {
            return new WrappingScheduledExecutorService(str, this.pollExecutor, false);
        };
        this.derbyConnector = this.derbyConnectorRule.getConnector();
        this.serviceEmitter = new StubServiceEmitter();
        this.derbyConnector.createSegmentTable();
        this.derbyConnector.createPendingSegmentsTable();
        EmittingLogger.registerEmitter(this.serviceEmitter);
    }

    @After
    public void tearDown() {
        if (this.cache != null) {
            this.cache.stopBeingLeader();
            this.cache.stop();
        }
    }

    private void setupTargetWithCaching(SegmentMetadataCache.UsageMode usageMode) {
        if (this.cache != null) {
            throw new ISE("Test target has already been initialized with caching[%s]", new Object[]{Boolean.valueOf(this.cache.isEnabled())});
        }
        SegmentsMetadataManagerConfig segmentsMetadataManagerConfig = new SegmentsMetadataManagerConfig((Period) null, usageMode);
        this.cache = new HeapMemorySegmentMetadataCache(TestHelper.JSON_MAPPER, () -> {
            return segmentsMetadataManagerConfig;
        }, this.derbyConnectorRule.metadataTablesConfigSupplier(), this.derbyConnector, this.executorFactory, this.serviceEmitter);
    }

    private void setupAndSyncCache() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        this.cache.start();
        this.cache.becomeLeader();
        syncCacheAfterBecomingLeader();
    }

    private void syncCacheAfterBecomingLeader() {
        syncCache();
        syncCache();
    }

    private void syncCache() {
        this.serviceEmitter.flush();
        this.pollExecutor.finishNextPendingTasks(2);
    }

    @Test
    public void testStart_schedulesDbPoll_ifCacheIsEnabled() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        Assert.assertTrue(this.cache.isEnabled());
        this.cache.start();
        Assert.assertTrue(this.pollExecutor.hasPendingTasks());
        syncCache();
        this.serviceEmitter.verifyEmitted("segment/metadataCache/sync/time", 1);
        Assert.assertTrue(this.pollExecutor.hasPendingTasks());
    }

    @Test
    public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER);
        Assert.assertFalse(this.cache.isEnabled());
        this.cache.start();
        Assert.assertFalse(this.cache.isEnabled());
        Assert.assertFalse(this.pollExecutor.hasPendingTasks());
    }

    @Test
    public void testStop_stopsDbPoll_ifCacheIsEnabled() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        Assert.assertTrue(this.cache.isEnabled());
        this.cache.start();
        Assert.assertTrue(this.pollExecutor.hasPendingTasks());
        this.cache.stop();
        Assert.assertTrue(this.pollExecutor.isShutdown());
        Assert.assertFalse(this.pollExecutor.hasPendingTasks());
    }

    @Test
    public void testBecomeLeader_isNoop_ifCacheIsDisabled() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER);
        this.cache.start();
        Assert.assertFalse(this.pollExecutor.hasPendingTasks());
        this.cache.becomeLeader();
        Assert.assertFalse(this.pollExecutor.hasPendingTasks());
    }

    @Test
    public void testBecomeLeader_throwsException_ifCacheIsStopped() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        DruidExceptionMatcher.defensive().expectMessageIs("Cache has not been started yet").assertThrowsAndMatches(() -> {
            this.cache.becomeLeader();
        });
    }

    @Test
    public void testReadCacheForDataSource_throwsException_ifCacheIsDisabled() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER);
        DruidExceptionMatcher.defensive().expectMessageIs("Segment metadata cache is not enabled.").assertThrowsAndMatches(() -> {
            this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
                return 0;
            });
        });
    }

    @Test
    public void testReadCacheForDataSource_throwsException_ifCacheIsStoppedOrNotLeader() {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        Assert.assertTrue(this.cache.isEnabled());
        DruidExceptionMatcher.internalServerError().expectMessageIs("Segment metadata cache has not been started yet.").assertThrowsAndMatches(() -> {
            this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
                return 0;
            });
        });
        this.cache.start();
        DruidExceptionMatcher.internalServerError().expectMessageIs("Not leader yet. Segment metadata cache is not usable.").assertThrowsAndMatches(() -> {
            this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
                return 0;
            });
        });
    }

    @Test(timeout = 60000)
    public void testReadCacheForDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException {
        setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
        this.cache.start();
        this.cache.becomeLeader();
        ArrayList arrayList = new ArrayList();
        Thread thread = new Thread(() -> {
            this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
                return 0;
            });
            arrayList.add("getDatasource completed");
        });
        thread.start();
        Thread.sleep(100L);
        Thread thread2 = new Thread(() -> {
            arrayList.add("before first sync");
            syncCacheAfterBecomingLeader();
        });
        thread2.start();
        thread.join();
        thread2.join();
        Assert.assertEquals(List.of("before first sync", "getDatasource completed"), arrayList);
        Thread thread3 = new Thread(() -> {
            this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
                return 0;
            });
            arrayList.add("getDatasource 2 completed");
        });
        thread3.start();
        thread3.join();
        Assert.assertEquals(List.of("before first sync", "getDatasource completed", "getDatasource 2 completed"), arrayList);
    }

    @Test
    public void testAddSegmentsToCache() {
        setupAndSyncCache();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").markUsed().asPlus();
        SegmentId id = asPlus.getDataSegment().getId();
        Assert.assertNull(this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
            return datasourceSegmentCache.findUsedSegment(id);
        }));
        Assert.assertEquals(1L, ((Integer) this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return Integer.valueOf(datasourceSegmentCache2.insertSegments(Set.of(asPlus)));
        })).intValue());
        Assert.assertEquals(asPlus.getDataSegment(), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache3 -> {
            return datasourceSegmentCache3.findUsedSegment(id);
        }));
    }

    @Test
    public void testSync_addsUsedSegment_ifNotPresentInCache() {
        setupAndSyncCache();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").updatedNow().markUsed().asPlus();
        insertSegmentsInMetadataStore(Set.of(asPlus));
        Assert.assertTrue(((Set) this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
            return datasourceSegmentCache.findUsedSegmentsPlusOverlappingAnyOf(List.of());
        })).isEmpty());
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/used/stale", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/used/updated", 1L);
        this.serviceEmitter.verifyValue("segment/used/count", 1L);
        Assert.assertEquals(asPlus.getDataSegment(), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findUsedSegment(asPlus.getDataSegment().getId());
        }));
    }

    @Test
    public void testSync_emitsAlert_ifErrorOccurs() {
        setupAndSyncCache();
        this.serviceEmitter.verifyEmitted("segment/metadataCache/sync/time", 1);
        this.derbyConnector.tearDown();
        syncCache();
        List alerts = this.serviceEmitter.getAlerts();
        Assert.assertEquals(1L, alerts.size());
        Assert.assertEquals("Could not sync segment metadata cache with metadata store", ((AlertEvent) alerts.get(0)).getDescription());
        this.serviceEmitter.verifyNotEmitted("segment/metadataCache/sync/time");
    }

    @Test
    public void testSync_doesNotFail_ifSegmentRecordIsBad() {
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").updatedNow().markUsed().asPlus();
        DataSegmentPlus asPlus2 = CreateDataSegments.ofDatasource("wiki").updatedNow().markUsed().asPlus();
        insertSegmentsInMetadataStore(Set.of(asPlus, asPlus2));
        this.derbyConnectorRule.segments().update("UPDATE %1$s SET id = 'invalid', payload = ? WHERE id = ?", "invalid".getBytes(StandardCharsets.UTF_8), asPlus2.getDataSegment().getId().toString());
        setupAndSyncCache();
        this.serviceEmitter.verifyEmitted("segment/metadataCache/sync/time", 1);
        this.serviceEmitter.verifyValue("segment/metadataCache/used/count", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/skipped", 1L);
        Assert.assertNull(this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
            return datasourceSegmentCache.findUsedSegment(asPlus2.getDataSegment().getId());
        }));
        Assert.assertEquals(asPlus.getDataSegment(), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findUsedSegment(asPlus.getDataSegment().getId());
        }));
    }

    @Test
    public void testSync_doesNotFail_ifPendingSegmentRecordIsBad() {
        this.derbyConnector.retryWithHandle(handle -> {
            return Integer.valueOf(handle.createStatement(StringUtils.format("INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", new Object[]{((MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get()).getPendingSegmentsTable(), this.derbyConnector.getQuoteString()})).bind("id", "1").bind("dataSource", "wiki").bind("created_date", "1").bind("start", "-start-").bind("end", "-end-").bind("sequence_name", "s1").bind("sequence_prev_id", "").bind("sequence_name_prev_id_sha1", "abcdef").bind("payload", new byte[0]).execute());
        });
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").updatedNow().markUsed().asPlus();
        insertSegmentsInMetadataStore(Set.of(asPlus));
        setupAndSyncCache();
        this.serviceEmitter.verifyEmitted("segment/metadataCache/sync/time", 1);
        this.serviceEmitter.verifyValue("segment/metadataCache/pending/skipped", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/used/count", 1L);
        this.serviceEmitter.verifyValue("segment/used/count", 1L);
        this.serviceEmitter.verifyValue("segment/pending/count", 0L);
        Assert.assertEquals(asPlus.getDataSegment(), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
            return datasourceSegmentCache.findUsedSegment(asPlus.getDataSegment().getId());
        }));
        Assert.assertTrue(((List) this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findPendingSegmentsOverlapping(Intervals.ETERNITY);
        })).isEmpty());
    }

    @Test
    public void testSync_updatesUsedSegment_ifCacheHasOlderEntry() {
        setupAndSyncCache();
        DateTime nowUtc = DateTimes.nowUtc();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").markUsed().lastUpdatedOn(nowUtc).asPlus();
        insertSegmentsInMetadataStore(Set.of(asPlus));
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Integer.valueOf(datasourceSegmentCache.insertSegments(Set.of(asPlus)));
        });
        Assert.assertEquals(Set.of(asPlus), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findUsedSegmentsPlusOverlappingAnyOf(List.of());
        }));
        DataSegmentPlus dataSegmentPlus = new DataSegmentPlus(asPlus.getDataSegment(), asPlus.getCreatedDate(), nowUtc.plus(1L), true, (String) null, (Long) null, (String) null);
        updateSegmentInMetadataStore(dataSegmentPlus);
        syncCache();
        this.serviceEmitter.verifyValue("segment/used/count", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/used/stale", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/used/updated", 1L);
        Assert.assertEquals(Set.of(dataSegmentPlus), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache3 -> {
            return datasourceSegmentCache3.findUsedSegmentsPlusOverlappingAnyOf(List.of());
        }));
    }

    @Test
    public void testSync_removesUsedSegment_ifNotPresentInMetadataStore() {
        setupAndSyncCache();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").markUsed().asPlus();
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Integer.valueOf(datasourceSegmentCache.insertSegments(Set.of(asPlus)));
        });
        DataSegment dataSegment = asPlus.getDataSegment();
        Assert.assertEquals(dataSegment, this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findUsedSegment(dataSegment.getId());
        }));
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/deleted", 1L);
        this.serviceEmitter.verifyNotEmitted("segment/used/count");
        Assert.assertNull(this.cache.readCacheForDataSource("wiki", datasourceSegmentCache3 -> {
            return datasourceSegmentCache3.findUsedSegment(dataSegment.getId());
        }));
    }

    @Test
    public void testSync_removesUnusedSegment_ifCacheHasOlderEntry() {
        setupAndSyncCache();
        DateTime nowUtc = DateTimes.nowUtc();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").markUsed().asPlus();
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Integer.valueOf(datasourceSegmentCache.insertSegments(Set.of(asPlus)));
        });
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return Boolean.valueOf(datasourceSegmentCache2.markSegmentAsUnused(asPlus.getDataSegment().getId(), nowUtc.minusMinutes(1)));
        });
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/deleted", 1L);
        this.serviceEmitter.verifyNotEmitted("segment/used/count");
        this.serviceEmitter.verifyNotEmitted("segment/metadataCache/used/count");
        this.serviceEmitter.verifyNotEmitted("segment/metadataCache/unused/count");
    }

    @Test
    public void testSync_doesNotRemoveIntervalWithOnlyUnusedSegments() {
        setupAndSyncCache();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").updatedNow().markUsed().asPlus();
        DateTime nowUtc = DateTimes.nowUtc();
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Integer.valueOf(datasourceSegmentCache.insertSegments(Set.of(asPlus)));
        });
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return Boolean.valueOf(datasourceSegmentCache2.markSegmentAsUnused(asPlus.getDataSegment().getId(), nowUtc.plusMinutes(1)));
        });
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/unused/count", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/interval/count", 1L);
        this.serviceEmitter.flush();
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/unused/count", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/interval/count", 1L);
    }

    @Test
    public void testSync_addsPendingSegment_ifNotPresentInCache() {
        setupAndSyncCache();
        PendingSegmentRecord createPendingSegment = createPendingSegment(DateTimes.nowUtc());
        this.derbyConnectorRule.pendingSegments().insert(List.of(createPendingSegment), false, TestHelper.JSON_MAPPER);
        SegmentIdWithShardSpec id = createPendingSegment.getId();
        Assert.assertTrue(((List) this.cache.readCacheForDataSource("wiki", datasourceSegmentCache -> {
            return datasourceSegmentCache.findPendingSegmentIdsWithExactInterval(createPendingSegment.getSequenceName(), id.getInterval());
        })).isEmpty());
        syncCache();
        this.serviceEmitter.verifyValue("segment/pending/count", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/pending/updated", 1L);
        Assert.assertEquals(List.of(id), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findPendingSegmentIdsWithExactInterval(createPendingSegment.getSequenceName(), id.getInterval());
        }));
    }

    @Test
    public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() {
        setupAndSyncCache();
        PendingSegmentRecord createPendingSegment = createPendingSegment(DateTimes.nowUtc().minusHours(1));
        this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Boolean.valueOf(datasourceSegmentCache.insertPendingSegment(createPendingSegment, false));
        });
        SegmentIdWithShardSpec id = createPendingSegment.getId();
        Assert.assertEquals(List.of(id), this.cache.readCacheForDataSource("wiki", datasourceSegmentCache2 -> {
            return datasourceSegmentCache2.findPendingSegmentIdsWithExactInterval(createPendingSegment.getSequenceName(), id.getInterval());
        }));
        syncCache();
        this.serviceEmitter.verifyNotEmitted("segment/pending/count");
        this.serviceEmitter.verifyValue("segment/metadataCache/pending/deleted", 1L);
        Assert.assertTrue(((List) this.cache.readCacheForDataSource("wiki", datasourceSegmentCache3 -> {
            return datasourceSegmentCache3.findPendingSegmentIdsWithExactInterval(createPendingSegment.getSequenceName(), id.getInterval());
        })).isEmpty());
    }

    @Test
    public void testSync_cleansUpDataSourceCache_ifEmptyAndNotInUse() {
        setupAndSyncCache();
        DataSegmentPlus asPlus = CreateDataSegments.ofDatasource("wiki").markUsed().lastUpdatedOn(DateTimes.nowUtc().minusHours(1)).asPlus();
        Assert.assertEquals(1L, ((Integer) this.cache.writeCacheForDataSource("wiki", datasourceSegmentCache -> {
            return Integer.valueOf(datasourceSegmentCache.insertSegments(Set.of(asPlus)));
        })).intValue());
        syncCache();
        this.serviceEmitter.verifyValue("segment/metadataCache/deleted", 1L);
        this.serviceEmitter.verifyValue("segment/metadataCache/dataSource/deleted", 1L);
    }

    private void insertSegmentsInMetadataStore(Set<DataSegmentPlus> set) {
        IndexerSqlMetadataStorageCoordinatorTestBase.insertSegments(set, this.derbyConnector, ((MetadataStorageTablesConfig) this.derbyConnectorRule.metadataTablesConfigSupplier().get()).getSegmentsTable(), TestHelper.JSON_MAPPER);
    }

    private void updateSegmentInMetadataStore(DataSegmentPlus dataSegmentPlus) {
        Assert.assertEquals(1L, this.derbyConnectorRule.segments().update("UPDATE %1$s SET used = ?, used_status_last_updated = ? WHERE id = ?", Boolean.valueOf(Boolean.TRUE.equals(dataSegmentPlus.getUsed())), dataSegmentPlus.getUsedStatusLastUpdatedDate().toString(), dataSegmentPlus.getDataSegment().getId().toString()));
    }

    private static PendingSegmentRecord createPendingSegment(DateTime dateTime) {
        return new PendingSegmentRecord(new SegmentIdWithShardSpec("wiki", Intervals.of("2021-01-01/P1D"), "v1", new NumberedShardSpec(0, 1)), "sequence1", (String) null, (String) null, "allocator1", dateTime);
    }
}
