package org.apache.druid.metadata.segment.cache;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.TransactionCallback;

@ThreadSafe
/* loaded from: input_file:org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.class */
public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache {
    private static final EmittingLogger log = new EmittingLogger(HeapMemorySegmentMetadataCache.class);
    private static final int SQL_MAX_RETRIES = 3;
    private static final int SQL_QUIET_RETRIES = 2;
    private static final int READY_TIMEOUT_MILLIS = 300000;
    private static final int MIN_SYNC_DELAY_MILLIS = 1000;
    private static final int MAX_IMMEDIATE_SYNC_RETRIES = 3;
    private final ObjectMapper jsonMapper;
    private final Duration pollDuration;
    private final SegmentMetadataCache.UsageMode cacheMode;
    private final MetadataStorageTablesConfig tablesConfig;
    private final SQLMetadataConnector connector;
    private final ListeningScheduledExecutorService pollExecutor;
    private final ServiceEmitter emitter;
    private final Object cacheStateLock = new Object();
    private final AtomicBoolean isCacheReady = new AtomicBoolean(false);

    @GuardedBy("cacheStateLock")
    private CacheState currentCacheState = CacheState.STOPPED;

    @GuardedBy("cacheStateLock")
    private ListenableFuture<Long> nextSyncFuture = null;

    @GuardedBy("cacheStateLock")
    private int consecutiveSyncFailures = 0;
    private final ConcurrentHashMap<String, HeapMemoryDatasourceSegmentCache> datasourceToSegmentCache = new ConcurrentHashMap<>();
    private final AtomicReference<DateTime> syncFinishTime = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState = new int[CacheState.values().length];

        static {
            try {
                $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[CacheState.STOPPED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[CacheState.FOLLOWER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[CacheState.LEADER_FIRST_SYNC_PENDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[CacheState.LEADER_FIRST_SYNC_STARTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[CacheState.LEADER_READY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache$CacheState.class */
    public enum CacheState {
        STOPPED,
        FOLLOWER,
        LEADER_FIRST_SYNC_PENDING,
        LEADER_FIRST_SYNC_STARTED,
        LEADER_READY
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache$DatasourceSegmentSummary.class */
    public static class DatasourceSegmentSummary {
        final List<SegmentRecord> persistedSegments = new ArrayList();
        final List<PendingSegmentRecord> persistedPendingSegments = new ArrayList();
        final Set<SegmentId> usedSegmentIdsToRefresh = new HashSet();
        final Set<DataSegmentPlus> usedSegments = new HashSet();

        private DatasourceSegmentSummary() {
        }

        private void addSegmentRecord(SegmentRecord segmentRecord) {
            this.persistedSegments.add(segmentRecord);
        }

        private void addPendingSegmentRecord(PendingSegmentRecord pendingSegmentRecord) {
            this.persistedPendingSegments.add(pendingSegmentRecord);
        }
    }

    @Inject
    public HeapMemorySegmentMetadataCache(ObjectMapper objectMapper, Supplier<SegmentsMetadataManagerConfig> supplier, Supplier<MetadataStorageTablesConfig> supplier2, SQLMetadataConnector sQLMetadataConnector, ScheduledExecutorFactory scheduledExecutorFactory, ServiceEmitter serviceEmitter) {
        this.jsonMapper = objectMapper;
        this.cacheMode = ((SegmentsMetadataManagerConfig) supplier.get()).getCacheMode();
        this.pollDuration = ((SegmentsMetadataManagerConfig) supplier.get()).getPollDuration().toStandardDuration();
        this.tablesConfig = (MetadataStorageTablesConfig) supplier2.get();
        this.connector = sQLMetadataConnector;
        this.pollExecutor = isEnabled() ? MoreExecutors.listeningDecorator(scheduledExecutorFactory.create(1, "SegmentMetadataCache-%s")) : null;
        this.emitter = serviceEmitter;
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public void start() {
        if (!isEnabled()) {
            log.info("Segment metadata cache is not enabled.", new Object[0]);
            return;
        }
        synchronized (this.cacheStateLock) {
            if (this.currentCacheState == CacheState.STOPPED) {
                updateCacheState(CacheState.FOLLOWER, "Scheduling sync with metadata store");
            }
            if (this.cacheMode == SegmentMetadataCache.UsageMode.ALWAYS) {
                performFirstSync();
            }
            scheduleSyncWithMetadataStore(this.pollDuration.getMillis());
        }
    }

    private void performFirstSync() {
        try {
            log.info("Cache is in usage mode[%s]. Starting first sync with metadata store.", new Object[]{this.cacheMode});
            long syncWithMetadataStore = syncWithMetadataStore();
            emitMetric(Metric.SYNC_DURATION_MILLIS, syncWithMetadataStore);
            log.info("Finished first sync of cache with metadata store in [%d] millis.", new Object[]{Long.valueOf(syncWithMetadataStore)});
        } catch (Throwable th) {
            throw InternalServerError.exception(th, "Could not sync segment metadata cache with metadata store", new Object[0]);
        }
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public void stop() {
        synchronized (this.cacheStateLock) {
            if (isEnabled()) {
                this.pollExecutor.shutdownNow();
                this.datasourceToSegmentCache.forEach((str, heapMemoryDatasourceSegmentCache) -> {
                    heapMemoryDatasourceSegmentCache.stop();
                });
                this.datasourceToSegmentCache.clear();
                this.syncFinishTime.set(null);
                updateCacheState(CacheState.STOPPED, "Stopped sync with metadata store");
            }
        }
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public void becomeLeader() {
        synchronized (this.cacheStateLock) {
            if (isEnabled()) {
                if (this.currentCacheState == CacheState.STOPPED) {
                    throw DruidException.defensive("Cache has not been started yet", new Object[0]);
                }
                updateCacheState(CacheState.LEADER_FIRST_SYNC_PENDING, "We are now leader");
                if (this.nextSyncFuture != null && !this.nextSyncFuture.isDone()) {
                    this.nextSyncFuture.cancel(true);
                }
            }
        }
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public void stopBeingLeader() {
        synchronized (this.cacheStateLock) {
            if (isEnabled()) {
                updateCacheState(CacheState.FOLLOWER, "Not leader anymore");
            }
        }
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public boolean isEnabled() {
        return this.cacheMode != SegmentMetadataCache.UsageMode.NEVER;
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public boolean isSyncedForRead() {
        return isEnabled() && this.isCacheReady.get();
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public <T> T readCacheForDataSource(String str, SegmentMetadataCache.Action<T> action) {
        verifyCacheIsUsableAndAwaitSync();
        HeapMemoryDatasourceSegmentCache cacheWithReference = getCacheWithReference(str);
        try {
            T t = (T) cacheWithReference.withReadLock(() -> {
                try {
                    return action.perform(cacheWithReference);
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            });
            if (cacheWithReference != null) {
                cacheWithReference.close();
            }
            return t;
        } catch (Throwable th) {
            if (cacheWithReference != null) {
                try {
                    cacheWithReference.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.druid.metadata.segment.cache.SegmentMetadataCache
    public <T> T writeCacheForDataSource(String str, SegmentMetadataCache.Action<T> action) {
        verifyCacheIsUsableAndAwaitSync();
        HeapMemoryDatasourceSegmentCache cacheWithReference = getCacheWithReference(str);
        try {
            T t = (T) cacheWithReference.withWriteLock(() -> {
                try {
                    return action.perform(cacheWithReference);
                } catch (Exception e) {
                    Throwables.throwIfUnchecked(e);
                    throw new RuntimeException(e);
                }
            });
            if (cacheWithReference != null) {
                cacheWithReference.close();
            }
            return t;
        } catch (Throwable th) {
            if (cacheWithReference != null) {
                try {
                    cacheWithReference.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private HeapMemoryDatasourceSegmentCache getCacheWithReference(String str) {
        return this.datasourceToSegmentCache.compute(str, (str2, heapMemoryDatasourceSegmentCache) -> {
            HeapMemoryDatasourceSegmentCache heapMemoryDatasourceSegmentCache = heapMemoryDatasourceSegmentCache == null ? new HeapMemoryDatasourceSegmentCache(str2) : heapMemoryDatasourceSegmentCache;
            heapMemoryDatasourceSegmentCache.acquireReference();
            return heapMemoryDatasourceSegmentCache;
        });
    }

    private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String str) {
        return this.datasourceToSegmentCache.computeIfAbsent(str, HeapMemoryDatasourceSegmentCache::new);
    }

    private void verifyCacheIsUsableAndAwaitSync() {
        if (!isEnabled()) {
            throw DruidException.defensive("Segment metadata cache is not enabled.", new Object[0]);
        }
        synchronized (this.cacheStateLock) {
            switch (AnonymousClass2.$SwitchMap$org$apache$druid$metadata$segment$cache$HeapMemorySegmentMetadataCache$CacheState[this.currentCacheState.ordinal()]) {
                case 1:
                    throw InternalServerError.exception("Segment metadata cache has not been started yet.", new Object[0]);
                case 2:
                    throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable.", new Object[0]);
                case LocalDataSegmentPuller.DEFAULT_RETRY_COUNT /* 3 */:
                case 4:
                    if (this.cacheMode == SegmentMetadataCache.UsageMode.ALWAYS) {
                        waitForCacheToFinishSync();
                        verifyCacheIsUsableAndAwaitSync();
                        break;
                    }
                    break;
            }
        }
    }

    private void waitForCacheToFinishSync() {
        synchronized (this.cacheStateLock) {
            log.info("Waiting for cache to finish sync with metadata store.", new Object[0]);
            while (true) {
                if (this.currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING || this.currentCacheState == CacheState.LEADER_FIRST_SYNC_STARTED) {
                    try {
                        this.cacheStateLock.wait(300000L);
                    } catch (InterruptedException e) {
                        log.noStackTrace().info(e, "Interrupted while waiting for cache to be ready", new Object[0]);
                    } catch (Exception e2) {
                        log.error(e2, "Error while waiting for cache to be ready", new Object[0]);
                        throw DruidException.defensive(e2, "Error while waiting for cache to be ready", new Object[0]);
                    }
                } else {
                    log.info("Wait complete. Cache is now in state[%s].", new Object[]{this.currentCacheState});
                }
            }
        }
    }

    private void updateCacheState(CacheState cacheState, String str) {
        synchronized (this.cacheStateLock) {
            this.currentCacheState = cacheState;
            log.info("%s. Cache is now in state[%s].", new Object[]{str, this.currentCacheState});
            this.isCacheReady.set(this.currentCacheState == CacheState.LEADER_READY);
            this.cacheStateLock.notifyAll();
        }
    }

    private void scheduleSyncWithMetadataStore(long j) {
        synchronized (this.cacheStateLock) {
            this.nextSyncFuture = this.pollExecutor.schedule(this::syncWithMetadataStore, j, TimeUnit.MILLISECONDS);
            Futures.addCallback(this.nextSyncFuture, new FutureCallback<Long>() { // from class: org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache.1
                public void onSuccess(Long l) {
                    long max;
                    synchronized (HeapMemorySegmentMetadataCache.this.cacheStateLock) {
                        if (HeapMemorySegmentMetadataCache.this.currentCacheState == CacheState.LEADER_FIRST_SYNC_STARTED) {
                            HeapMemorySegmentMetadataCache.this.updateCacheState(CacheState.LEADER_READY, StringUtils.format("Finished sync with metadata store in [%d] millis", new Object[]{l}));
                        }
                    }
                    HeapMemorySegmentMetadataCache.this.emitMetric(Metric.SYNC_DURATION_MILLIS, l.longValue());
                    synchronized (HeapMemorySegmentMetadataCache.this.cacheStateLock) {
                        HeapMemorySegmentMetadataCache.this.consecutiveSyncFailures = 0;
                        max = HeapMemorySegmentMetadataCache.this.currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING ? 0L : Math.max(HeapMemorySegmentMetadataCache.this.pollDuration.getMillis() - l.longValue(), 0L);
                    }
                    HeapMemorySegmentMetadataCache.this.scheduleSyncWithMetadataStore(max);
                }

                public void onFailure(Throwable th) {
                    long millis;
                    if (th instanceof CancellationException) {
                        HeapMemorySegmentMetadataCache.log.noStackTrace().info(th, "Sync with metadata store was cancelled", new Object[0]);
                    } else {
                        HeapMemorySegmentMetadataCache.log.noStackTrace().makeAlert(th, "Could not sync segment metadata cache with metadata store", new Object[0]).emit();
                    }
                    synchronized (HeapMemorySegmentMetadataCache.this.cacheStateLock) {
                        HeapMemorySegmentMetadataCache heapMemorySegmentMetadataCache = HeapMemorySegmentMetadataCache.this;
                        int i = heapMemorySegmentMetadataCache.consecutiveSyncFailures + 1;
                        heapMemorySegmentMetadataCache.consecutiveSyncFailures = i;
                        millis = (i > 3 || HeapMemorySegmentMetadataCache.this.currentCacheState != CacheState.LEADER_FIRST_SYNC_PENDING) ? HeapMemorySegmentMetadataCache.this.pollDuration.getMillis() : 1000L;
                    }
                    HeapMemorySegmentMetadataCache.this.scheduleSyncWithMetadataStore(millis);
                }
            }, this.pollExecutor);
        }
    }

    private long syncWithMetadataStore() {
        DateTime nowUtc = DateTimes.nowUtc();
        Stopwatch createStarted = Stopwatch.createStarted();
        synchronized (this.cacheStateLock) {
            if (this.currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING) {
                updateCacheState(CacheState.LEADER_FIRST_SYNC_STARTED, "Started sync of latest updates from metadata store");
            }
        }
        HashMap hashMap = new HashMap();
        if (this.syncFinishTime.get() == null) {
            retrieveAllUsedSegments(hashMap);
        } else {
            retrieveAllSegmentIds(hashMap);
            updateSegmentIdsInCache(hashMap, nowUtc);
            retrieveUsedSegmentPayloads(hashMap);
        }
        updateUsedSegmentPayloadsInCache(hashMap);
        retrieveAllPendingSegments(hashMap);
        updatePendingSegmentsInCache(hashMap, nowUtc);
        markCacheSynced();
        this.syncFinishTime.set(DateTimes.nowUtc());
        return createStarted.millisElapsed();
    }

    private void markCacheSynced() {
        for (String str : Set.copyOf(this.datasourceToSegmentCache.keySet())) {
            HeapMemoryDatasourceSegmentCache orDefault = this.datasourceToSegmentCache.getOrDefault(str, new HeapMemoryDatasourceSegmentCache(str));
            CacheStats markCacheSynced = orDefault.markCacheSynced();
            if (orDefault.isEmpty()) {
                this.datasourceToSegmentCache.compute(str, (str2, heapMemoryDatasourceSegmentCache) -> {
                    if (heapMemoryDatasourceSegmentCache == null || !heapMemoryDatasourceSegmentCache.isEmpty() || heapMemoryDatasourceSegmentCache.isBeingUsedByTransaction()) {
                        return heapMemoryDatasourceSegmentCache;
                    }
                    emitMetric(str, Metric.DELETED_DATASOURCES, 1L);
                    return null;
                });
            } else {
                emitMetric(str, Metric.CACHED_INTERVALS, markCacheSynced.getNumIntervals());
                emitMetric(str, Metric.CACHED_USED_SEGMENTS, markCacheSynced.getNumUsedSegments());
                emitMetric(str, Metric.CACHED_UNUSED_SEGMENTS, markCacheSynced.getNumUnusedSegments());
                emitMetric(str, Metric.CACHED_PENDING_SEGMENTS, markCacheSynced.getNumPendingSegments());
            }
        }
    }

    private void retrieveAllSegmentIds(Map<String, DatasourceSegmentSummary> map) {
        Stopwatch createStarted = Stopwatch.createStarted();
        String format = StringUtils.format("SELECT id, dataSource, used_status_last_updated FROM %s WHERE used = true", new Object[]{this.tablesConfig.getSegmentsTable()});
        int intValue = ((Integer) inReadOnlyTransaction((handle, transactionStatus) -> {
            ResultIterator it = handle.createQuery(format).setFetchSize(this.connector.getStreamingFetchSize()).map((i, resultSet, statementContext) -> {
                return SegmentRecord.fromResultSet(resultSet);
            }).iterator();
            int i2 = 0;
            while (it.hasNext()) {
                try {
                    SegmentRecord segmentRecord = (SegmentRecord) it.next();
                    if (segmentRecord == null) {
                        i2++;
                    } else {
                        ((DatasourceSegmentSummary) map.computeIfAbsent(segmentRecord.getSegmentId().getDataSource(), str -> {
                            return new DatasourceSegmentSummary();
                        })).addSegmentRecord(segmentRecord);
                    }
                } catch (Throwable th) {
                    if (it != null) {
                        try {
                            it.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            Integer valueOf = Integer.valueOf(i2);
            if (it != null) {
                it.close();
            }
            return valueOf;
        })).intValue();
        if (intValue > 0) {
            emitMetric(Metric.SKIPPED_SEGMENTS, intValue);
        }
        map.forEach((str, datasourceSegmentSummary) -> {
            emitMetric(str, Metric.PERSISTED_USED_SEGMENTS, datasourceSegmentSummary.persistedSegments.size());
        });
        emitMetric(Metric.RETRIEVE_SEGMENT_IDS_DURATION_MILLIS, createStarted.millisElapsed());
    }

    private <T> T inReadOnlyTransaction(TransactionCallback<T> transactionCallback) {
        return (T) this.connector.retryReadOnlyTransaction(transactionCallback, 2, 3);
    }

    private void retrieveRequiredUsedSegments(String str, DatasourceSegmentSummary datasourceSegmentSummary) {
        Set<SegmentId> set = datasourceSegmentSummary.usedSegmentIdsToRefresh;
        if (set.isEmpty()) {
            return;
        }
        inReadOnlyTransaction((handle, transactionStatus) -> {
            CloseableIterator<DataSegmentPlus> retrieveSegmentsByIdIterator = SqlSegmentsMetadataQuery.forHandle(handle, this.connector, this.tablesConfig, this.jsonMapper).retrieveSegmentsByIdIterator(str, set, false);
            try {
                Set<DataSegmentPlus> set2 = datasourceSegmentSummary.usedSegments;
                Objects.requireNonNull(set2);
                retrieveSegmentsByIdIterator.forEachRemaining((v1) -> {
                    r1.add(v1);
                });
                if (retrieveSegmentsByIdIterator != null) {
                    retrieveSegmentsByIdIterator.close();
                }
                return 0;
            } catch (Throwable th) {
                if (retrieveSegmentsByIdIterator != null) {
                    try {
                        retrieveSegmentsByIdIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    private void updateSegmentIdsInCache(Map<String, DatasourceSegmentSummary> map, DateTime dateTime) {
        Stopwatch createStarted = Stopwatch.createStarted();
        map.forEach((str, datasourceSegmentSummary) -> {
            SegmentSyncResult syncSegmentIds = getCacheForDatasource(str).syncSegmentIds(datasourceSegmentSummary.persistedSegments, dateTime);
            emitNonZeroMetric(str, Metric.STALE_USED_SEGMENTS, syncSegmentIds.getExpiredIds().size());
            emitNonZeroMetric(str, Metric.DELETED_SEGMENTS, syncSegmentIds.getDeleted());
            datasourceSegmentSummary.usedSegmentIdsToRefresh.addAll(syncSegmentIds.getExpiredIds());
        });
        this.datasourceToSegmentCache.forEach((str2, heapMemoryDatasourceSegmentCache) -> {
            if (map.containsKey(str2)) {
                return;
            }
            emitNonZeroMetric(str2, Metric.DELETED_SEGMENTS, heapMemoryDatasourceSegmentCache.syncSegmentIds(List.of(), dateTime).getDeleted());
        });
        emitMetric(Metric.UPDATE_IDS_DURATION_MILLIS, createStarted.millisElapsed());
    }

    private void retrieveUsedSegmentPayloads(Map<String, DatasourceSegmentSummary> map) {
        Stopwatch createStarted = Stopwatch.createStarted();
        map.forEach(this::retrieveRequiredUsedSegments);
        emitMetric(Metric.RETRIEVE_SEGMENT_PAYLOADS_DURATION_MILLIS, createStarted.millisElapsed());
    }

    private void retrieveAllUsedSegments(Map<String, DatasourceSegmentSummary> map) {
        Stopwatch createStarted = Stopwatch.createStarted();
        String format = StringUtils.format("SELECT id, payload, created_date, used_status_last_updated FROM %s WHERE used = true", new Object[]{this.tablesConfig.getSegmentsTable()});
        int intValue = ((Integer) inReadOnlyTransaction((handle, transactionStatus) -> {
            ResultIterator it = handle.createQuery(format).setFetchSize(this.connector.getStreamingFetchSize()).map((i, resultSet, statementContext) -> {
                return mapToSegmentPlus(resultSet);
            }).iterator();
            int i2 = 0;
            while (it.hasNext()) {
                try {
                    DataSegmentPlus dataSegmentPlus = (DataSegmentPlus) it.next();
                    if (dataSegmentPlus == null) {
                        i2++;
                    } else {
                        ((DatasourceSegmentSummary) map.computeIfAbsent(dataSegmentPlus.getDataSegment().getDataSource(), str -> {
                            return new DatasourceSegmentSummary();
                        })).usedSegments.add(dataSegmentPlus);
                    }
                } catch (Throwable th) {
                    if (it != null) {
                        try {
                            it.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            Integer valueOf = Integer.valueOf(i2);
            if (it != null) {
                it.close();
            }
            return valueOf;
        })).intValue();
        if (intValue > 0) {
            emitMetric(Metric.SKIPPED_SEGMENTS, intValue);
        }
        map.forEach((str, datasourceSegmentSummary) -> {
            emitMetric(str, Metric.PERSISTED_USED_SEGMENTS, datasourceSegmentSummary.usedSegments.size());
        });
        emitMetric(Metric.RETRIEVE_SEGMENT_PAYLOADS_DURATION_MILLIS, createStarted.millisElapsed());
    }

    private void updateUsedSegmentPayloadsInCache(Map<String, DatasourceSegmentSummary> map) {
        map.forEach((str, datasourceSegmentSummary) -> {
            emitNonZeroMetric(str, Metric.UPDATED_USED_SEGMENTS, getCacheForDatasource(str).insertSegments(datasourceSegmentSummary.usedSegments));
        });
    }

    private void updatePendingSegmentsInCache(Map<String, DatasourceSegmentSummary> map, DateTime dateTime) {
        map.forEach((str, datasourceSegmentSummary) -> {
            SegmentSyncResult syncPendingSegments = getCacheForDatasource(str).syncPendingSegments(datasourceSegmentSummary.persistedPendingSegments, dateTime);
            emitMetric(str, Metric.PERSISTED_PENDING_SEGMENTS, datasourceSegmentSummary.persistedPendingSegments.size());
            emitNonZeroMetric(str, Metric.UPDATED_PENDING_SEGMENTS, syncPendingSegments.getUpdated());
            emitNonZeroMetric(str, Metric.DELETED_PENDING_SEGMENTS, syncPendingSegments.getDeleted());
        });
        this.datasourceToSegmentCache.forEach((str2, heapMemoryDatasourceSegmentCache) -> {
            if (map.containsKey(str2)) {
                return;
            }
            emitNonZeroMetric(str2, Metric.DELETED_PENDING_SEGMENTS, heapMemoryDatasourceSegmentCache.syncPendingSegments(List.of(), dateTime).getDeleted());
        });
    }

    private void retrieveAllPendingSegments(Map<String, DatasourceSegmentSummary> map) {
        Stopwatch createStarted = Stopwatch.createStarted();
        String format = StringUtils.format("SELECT id, dataSource, payload, sequence_name, sequence_prev_id, upgraded_from_segment_id, task_allocator_id, created_date FROM %1$s", new Object[]{this.tablesConfig.getPendingSegmentsTable()});
        AtomicInteger atomicInteger = new AtomicInteger();
        inReadOnlyTransaction((handle, transactionStatus) -> {
            return handle.createQuery(format).setFetchSize(this.connector.getStreamingFetchSize()).map((i, resultSet, statementContext) -> {
                String str = null;
                String str2 = null;
                try {
                    str = resultSet.getString("id");
                    str2 = resultSet.getString("dataSource");
                    ((DatasourceSegmentSummary) map.computeIfAbsent(str2, str3 -> {
                        return new DatasourceSegmentSummary();
                    })).addPendingSegmentRecord(PendingSegmentRecord.fromResultSet(resultSet, this.jsonMapper));
                } catch (Exception e) {
                    log.error(e, "Error occurred while reading Pending Segment ID[%s] of datasource[%s].", new Object[]{str, str2});
                    atomicInteger.incrementAndGet();
                }
                return 0;
            }).list();
        });
        emitMetric(Metric.RETRIEVE_PENDING_SEGMENTS_DURATION_MILLIS, createStarted.millisElapsed());
        if (atomicInteger.get() > 0) {
            emitMetric(Metric.SKIPPED_PENDING_SEGMENTS, atomicInteger.get());
        }
    }

    private DataSegmentPlus mapToSegmentPlus(ResultSet resultSet) {
        String str = null;
        try {
            str = resultSet.getString(1);
            return new DataSegmentPlus((DataSegment) JacksonUtils.readValue(this.jsonMapper, resultSet.getBytes(2), DataSegment.class), DateTimes.of(resultSet.getString(3)), DateTimes.of(resultSet.getString(4)), true, null, null, null);
        } catch (Throwable th) {
            log.error(th, "Could not read segment with ID[%s]", new Object[]{str});
            return null;
        }
    }

    private void emitMetric(String str, long j) {
        this.emitter.emit(ServiceMetricEvent.builder().setMetric(str, Long.valueOf(j)));
    }

    private void emitNonZeroMetric(String str, String str2, long j) {
        if (j == 0) {
            return;
        }
        emitMetric(str, str2, j);
    }

    private void emitMetric(String str, String str2, long j) {
        this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", str).setMetric(str2, Long.valueOf(j)));
    }
}
