package org.apache.kafka.streams.state.internals;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.class */
public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSegment> {
    private final RocksDBMetricsRecorder metricsRecorder;
    private final RocksDBStore physicalStore;
    private final Map<Long, LogicalKeyValueSegment> reservedSegments;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogicalKeyValueSegments(String str, String str2, long j, long j2, RocksDBMetricsRecorder rocksDBMetricsRecorder) {
        super(str, j, j2);
        this.reservedSegments = new HashMap();
        this.metricsRecorder = rocksDBMetricsRecorder;
        this.physicalStore = new RocksDBStore(str, str2, rocksDBMetricsRecorder, false);
    }

    @Override // org.apache.kafka.streams.state.internals.Segments
    public LogicalKeyValueSegment getOrCreateSegment(long j, ProcessorContext processorContext) {
        if (this.segments.containsKey(Long.valueOf(j))) {
            return (LogicalKeyValueSegment) this.segments.get(Long.valueOf(j));
        }
        if (j < 0) {
            throw new IllegalArgumentException("Negative segment IDs are reserved for reserved segments, and should be created through createReservedSegment() instead");
        }
        LogicalKeyValueSegment logicalKeyValueSegment = new LogicalKeyValueSegment(j, segmentName(j), this.physicalStore);
        if (this.segments.put(Long.valueOf(j), logicalKeyValueSegment) != null) {
            throw new IllegalStateException("LogicalKeyValueSegment already exists. Possible concurrent access.");
        }
        return logicalKeyValueSegment;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogicalKeyValueSegment createReservedSegment(long j, String str) {
        if (j >= 0) {
            throw new IllegalArgumentException("segmentId for a reserved segment must be negative");
        }
        LogicalKeyValueSegment logicalKeyValueSegment = new LogicalKeyValueSegment(j, str, this.physicalStore);
        if (this.reservedSegments.put(Long.valueOf(j), logicalKeyValueSegment) != null) {
            throw new IllegalStateException("LogicalKeyValueSegment already exists.");
        }
        return logicalKeyValueSegment;
    }

    LogicalKeyValueSegment getReservedSegment(long j) {
        return this.reservedSegments.get(Long.valueOf(j));
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public void openExisting(ProcessorContext processorContext, long j) {
        this.metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(processorContext), processorContext.taskId());
        this.physicalStore.openDB(processorContext.appConfigs(), processorContext.stateDir());
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments
    public void cleanupExpiredSegments(long j) {
        super.cleanupExpiredSegments(j);
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public void flush() {
        this.physicalStore.flush();
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public void close() {
        super.close();
        Iterator<LogicalKeyValueSegment> it = this.reservedSegments.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.reservedSegments.clear();
        this.physicalStore.close();
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public String segmentName(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("Negative segment IDs are reserved for reserved segments, which have custom names that should not be accessed from this method");
        }
        return super.segmentName(j);
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public /* bridge */ /* synthetic */ List allSegments(boolean z) {
        return super.allSegments(z);
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public /* bridge */ /* synthetic */ List segments(long j, long j2, boolean z) {
        return super.segments(j, j2, z);
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSegments, org.apache.kafka.streams.state.internals.Segments
    public /* bridge */ /* synthetic */ long segmentId(long j) {
        return super.segmentId(j);
    }
}
