package org.apache.nifi.provenance.store;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.index.lucene.LuceneEventIndex;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.iterator.EventIterator;
import org.apache.nifi.provenance.store.iterator.SelectiveRecordReaderEventIterator;
import org.apache.nifi.provenance.store.iterator.SequentialRecordReaderEventIterator;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/provenance/store/WriteAheadStorePartition.class */
public class WriteAheadStorePartition implements EventStorePartition {
    private static final Logger logger = LoggerFactory.getLogger(WriteAheadStorePartition.class);
    private final RepositoryConfiguration config;
    private final File partitionDirectory;
    private final String partitionName;
    private final RecordWriterFactory recordWriterFactory;
    private final RecordReaderFactory recordReaderFactory;
    private final BlockingQueue<File> filesToCompress;
    private final AtomicLong idGenerator;
    private final EventFileManager eventFileManager;
    private final AtomicLong maxEventId = new AtomicLong(-1);
    private volatile boolean closed = false;
    private final AtomicReference<RecordWriterLease> eventWriterLeaseRef = new AtomicReference<>();
    private final SortedMap<Long, File> minEventIdToPathMap = new TreeMap();

    public WriteAheadStorePartition(File file, String str, RepositoryConfiguration repositoryConfiguration, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, BlockingQueue<File> blockingQueue, AtomicLong atomicLong, EventReporter eventReporter, EventFileManager eventFileManager) {
        this.partitionName = str;
        this.config = repositoryConfiguration;
        this.idGenerator = atomicLong;
        this.partitionDirectory = file;
        this.recordWriterFactory = recordWriterFactory;
        this.recordReaderFactory = recordReaderFactory;
        this.filesToCompress = blockingQueue;
        this.eventFileManager = eventFileManager;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
        RecordWriterLease recordWriterLease = this.eventWriterLeaseRef.get();
        if (recordWriterLease != null) {
            recordWriterLease.close();
        }
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public synchronized void initialize() throws IOException {
        File[] listFiles;
        long maxEventId;
        if (!this.partitionDirectory.exists()) {
            Files.createDirectories(this.partitionDirectory.toPath(), new FileAttribute[0]);
        }
        File[] listFiles2 = this.partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER);
        if (listFiles2 == null) {
            throw new IOException("Could not access files in the " + String.valueOf(this.partitionDirectory) + " directory");
        }
        long j = -1;
        List<File> asList = Arrays.asList(listFiles2);
        asList.sort(DirectoryUtils.LARGEST_ID_FIRST);
        Iterator it = asList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            File file = (File) it.next();
            try {
                maxEventId = this.recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE).getMaxEventId();
            } catch (Exception e) {
                logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, e);
            }
            if (maxEventId > -1) {
                j = maxEventId;
                break;
            }
        }
        synchronized (this.minEventIdToPathMap) {
            for (File file2 : asList) {
                this.minEventIdToPathMap.put(Long.valueOf(DirectoryUtils.getMinId(file2)), file2);
            }
        }
        this.maxEventId.set(j);
        if (this.config.isCompressOnRollover() && (listFiles = this.partitionDirectory.listFiles(file3 -> {
            return file3.getName().endsWith(".prov");
        })) != null) {
            for (File file4 : listFiles) {
                File file5 = new File(file4.getParentFile(), file4.getName() + ".gz");
                if (file5.exists()) {
                    file5.delete();
                }
            }
        }
        long j2 = j + 1;
        logger.info("After recovering {}, next Event ID to be generated will be {}", this.partitionDirectory, Long.valueOf(this.idGenerator.updateAndGet(j3 -> {
            return Math.max(j3, j2);
        })));
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public StorageResult addEvents(Iterable<ProvenanceEventRecord> iterable) throws IOException {
        RecordWriterLease lease;
        if (this.closed) {
            throw new IOException(String.valueOf(this) + " is closed");
        }
        while (true) {
            lease = getLease();
            if (lease.tryClaim()) {
                break;
            }
            RolloverState rolloverState = lease.getRolloverState();
            if (rolloverState.isRollover()) {
                logger.trace("Failed to obtain claim on Lease {}; Rollover State = {}", lease, rolloverState);
                if (tryRollover(lease)) {
                    logger.info("Successfully rolled over Event Writer for {} due to {}. Event File was {} and contained {} events.", new Object[]{this, rolloverState, FormatUtils.formatDataSize(r0.getBytesWritten()), Integer.valueOf(lease.getWriter().getRecordsWritten())});
                }
            }
        }
        logger.trace("Obtained claim on Lease {}", lease);
        RecordWriter writer = lease.getWriter();
        try {
            final Map<ProvenanceEventRecord, StorageSummary> addEvents = addEvents(iterable, writer);
            lease.relinquishClaim();
            logger.trace("Wrote {} events to Lease {}. Relinquished claim.", Integer.valueOf(addEvents.size()), lease);
            Integer num = null;
            RolloverState rolloverState2 = lease.getRolloverState();
            try {
                if (rolloverState2.isRollover()) {
                    logger.debug("Will attempt to roll over Lease {} because Rollover State is {}", lease, rolloverState2);
                    if (tryRollover(lease)) {
                        num = Integer.valueOf(writer.getRecordsWritten());
                        logger.info("Successfully rolled over Event Writer for {} after writing {} events due to {}", new Object[]{this, num, rolloverState2});
                    }
                }
            } catch (IOException e) {
                logger.error("Updated {} but failed to rollover to a new Event File", this, e);
            }
            final Integer num2 = num;
            return new StorageResult(this) { // from class: org.apache.nifi.provenance.store.WriteAheadStorePartition.1
                @Override // org.apache.nifi.provenance.store.StorageResult
                public Map<ProvenanceEventRecord, StorageSummary> getStorageLocations() {
                    return addEvents;
                }

                @Override // org.apache.nifi.provenance.store.StorageResult
                public boolean triggeredRollover() {
                    return num2 != null;
                }

                @Override // org.apache.nifi.provenance.store.StorageResult
                public Integer getEventsRolledOver() {
                    return num2;
                }

                public String toString() {
                    return getStorageLocations().toString();
                }
            };
        } catch (Throwable th) {
            lease.relinquishClaim();
            throw th;
        }
    }

    private RecordWriterLease getLease() throws IOException {
        do {
            RecordWriterLease recordWriterLease = this.eventWriterLeaseRef.get();
            if (recordWriterLease != null) {
                return recordWriterLease;
            }
        } while (!tryRollover(null));
        return this.eventWriterLeaseRef.get();
    }

    private synchronized boolean tryRollover(RecordWriterLease recordWriterLease) throws IOException {
        if (!Objects.equals(recordWriterLease, this.eventWriterLeaseRef.get())) {
            logger.trace("Will not rollover Lease {} because it's not the current event writer lease", recordWriterLease);
            return false;
        }
        long j = this.idGenerator.get();
        File file = new File(this.partitionDirectory, j + ".prov");
        RecordWriter createWriter = this.recordWriterFactory.createWriter(file, this.idGenerator, false, true);
        createWriter.writeHeader(j);
        RecordWriterLease recordWriterLease2 = new RecordWriterLease(createWriter, this.config.getMaxEventFileCapacity(), this.config.getMaxEventFileCount(), this.config.getMaxEventFileLife(TimeUnit.MILLISECONDS));
        if (!this.eventWriterLeaseRef.compareAndSet(recordWriterLease, recordWriterLease2)) {
            logger.trace("Did not update Event Writer Lease. Will remain {}. Not rolling over Lease.", recordWriterLease);
            try {
                createWriter.close();
            } catch (Exception e) {
                logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", createWriter, e);
            }
            file.delete();
            return false;
        }
        logger.trace("Updated lease from {} to {}", recordWriterLease, recordWriterLease2);
        if (recordWriterLease != null) {
            recordWriterLease.close();
        }
        synchronized (this.minEventIdToPathMap) {
            this.minEventIdToPathMap.put(Long.valueOf(j), file);
        }
        if (this.config.isCompressOnRollover() && recordWriterLease != null && recordWriterLease.getWriter() != null) {
            boolean z = false;
            while (!z && !this.closed) {
                try {
                    z = this.filesToCompress.offer(recordWriterLease.getWriter().getFile(), 1L, TimeUnit.SECONDS);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting to enqueue " + String.valueOf(recordWriterLease.getWriter().getFile()) + " for compression");
                }
            }
            logger.debug("Queued {} ({}) for compression", recordWriterLease, recordWriterLease.getWriter().getFile());
        }
        logger.debug("Successfully rolled over Lease from {} to {}", recordWriterLease, recordWriterLease2);
        return true;
    }

    private Map<ProvenanceEventRecord, StorageSummary> addEvents(Iterable<ProvenanceEventRecord> iterable, RecordWriter recordWriter) throws IOException {
        HashMap hashMap = new HashMap();
        try {
            long j = -1;
            int i = 0;
            for (Map.Entry<ProvenanceEventRecord, StorageSummary> entry : recordWriter.writeRecords(iterable).entrySet()) {
                ProvenanceEventRecord key = entry.getKey();
                StorageSummary value = entry.getValue();
                StorageSummary storageSummary = new StorageSummary(value.getEventId(), value.getStorageLocation(), this.partitionName, value.getBlockIndex(), value.getSerializedLength(), value.getBytesWritten());
                hashMap.put(key, storageSummary);
                j = Math.max(j, storageSummary.getEventId());
                i++;
            }
            if (i == 0) {
                return hashMap;
            }
            recordWriter.flush();
            long j2 = j;
            this.maxEventId.getAndUpdate(j3 -> {
                return Math.max(j2, j3);
            });
            if (this.config.isAlwaysSync()) {
                recordWriter.sync();
            }
            return hashMap;
        } catch (Exception e) {
            recordWriter.markDirty();
            throw e;
        }
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public long getSize() {
        return ((LongSummaryStatistics) getEventFilesFromDisk().collect(Collectors.summarizingLong((v0) -> {
            return v0.length();
        }))).getSum();
    }

    private Stream<File> getEventFilesFromDisk() {
        File[] listFiles = this.partitionDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER);
        return listFiles == null ? Stream.empty() : Arrays.stream(listFiles);
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public long getMaxEventId() {
        return this.maxEventId.get();
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public Optional<ProvenanceEventRecord> getEvent(long j) throws IOException {
        Optional<File> pathForEventId = getPathForEventId(j);
        if (!pathForEventId.isPresent()) {
            return Optional.empty();
        }
        RecordReader newRecordReader = this.recordReaderFactory.newRecordReader(pathForEventId.get(), Collections.emptyList(), this.config.getMaxAttributeChars());
        try {
            Optional<ProvenanceEventRecord> skipToEvent = newRecordReader.skipToEvent(j);
            if (!skipToEvent.isPresent()) {
                if (newRecordReader != null) {
                    newRecordReader.close();
                }
                return skipToEvent;
            }
            if (skipToEvent.get().getEventId() == j) {
                if (newRecordReader != null) {
                    newRecordReader.close();
                }
                return skipToEvent;
            }
            Optional<ProvenanceEventRecord> empty = Optional.empty();
            if (newRecordReader != null) {
                newRecordReader.close();
            }
            return empty;
        } catch (Throwable th) {
            if (newRecordReader != null) {
                try {
                    newRecordReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public List<ProvenanceEventRecord> getEvents(long j, int i, EventAuthorizer eventAuthorizer) throws IOException {
        ArrayList arrayList = new ArrayList(Math.min(i, LuceneEventIndex.MAX_LINEAGE_NODES));
        EventIterator createEventIterator = createEventIterator(j);
        while (true) {
            try {
                Optional<ProvenanceEventRecord> nextEvent = createEventIterator.nextEvent();
                if (!nextEvent.isPresent() || arrayList.size() >= i) {
                    break;
                }
                ProvenanceEventRecord provenanceEventRecord = nextEvent.get();
                if (eventAuthorizer.isAuthorized(provenanceEventRecord)) {
                    arrayList.add(provenanceEventRecord);
                }
            } catch (Throwable th) {
                if (createEventIterator != null) {
                    try {
                        createEventIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (createEventIterator != null) {
            createEventIterator.close();
        }
        return arrayList;
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public EventIterator createEventIterator(long j) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.minEventIdToPathMap) {
            File file = null;
            for (Map.Entry<Long, File> entry : this.minEventIdToPathMap.entrySet()) {
                if (entry.getKey().longValue() > j) {
                    if (arrayList.isEmpty() && file != null) {
                        arrayList.add(file);
                    }
                    arrayList.add(entry.getValue());
                }
                file = entry.getValue();
            }
            if (file != null && !arrayList.contains(file)) {
                arrayList.add(file);
            }
        }
        return arrayList.isEmpty() ? EventIterator.EMPTY : new SequentialRecordReaderEventIterator(arrayList, this.recordReaderFactory, j, this.config.getMaxAttributeChars());
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public EventIterator createEventIterator(List<Long> list) {
        ArrayList arrayList;
        synchronized (this.minEventIdToPathMap) {
            arrayList = new ArrayList(this.minEventIdToPathMap.values());
        }
        return arrayList.isEmpty() ? EventIterator.EMPTY : new SelectiveRecordReaderEventIterator(arrayList, this.recordReaderFactory, list, this.config.getMaxAttributeChars());
    }

    private Optional<File> getPathForEventId(long j) {
        File file = null;
        synchronized (this.minEventIdToPathMap) {
            for (Map.Entry<Long, File> entry : this.minEventIdToPathMap.entrySet()) {
                if (entry.getKey().longValue() > j) {
                    break;
                }
                file = entry.getValue();
            }
        }
        return Optional.ofNullable(file);
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public void purgeOldEvents(long j, TimeUnit timeUnit) {
        long currentTimeMillis = System.currentTimeMillis() - timeUnit.toMillis(j);
        List list = (List) getEventFilesFromDisk().filter(file -> {
            return file.lastModified() < currentTimeMillis;
        }).sorted(DirectoryUtils.SMALLEST_ID_FIRST).filter(this::delete).collect(Collectors.toList());
        if (list.isEmpty()) {
            logger.debug("No Provenance Event files that exceed time-based threshold of {} {}", Long.valueOf(j), timeUnit);
        } else {
            logger.info("Purged {} Provenance Event files from Provenance Repository because the events were older than {} {}: {}", new Object[]{Integer.valueOf(list.size()), Long.valueOf(j), timeUnit, list});
        }
    }

    private File getActiveEventFile() {
        RecordWriterLease recordWriterLease = this.eventWriterLeaseRef.get();
        if (recordWriterLease == null) {
            return null;
        }
        return recordWriterLease.getWriter().getFile();
    }

    @Override // org.apache.nifi.provenance.store.EventStorePartition
    public long purgeOldestEvents() {
        List<File> list = (List) getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
        if (list.size() < 2) {
            return 0L;
        }
        File activeEventFile = getActiveEventFile();
        if (activeEventFile == null) {
            logger.debug("There is currently no Active Event File for {}. Will not purge oldest events until the Active Event File has been established.", this);
            return 0L;
        }
        for (File file : list) {
            if (file.equals(activeEventFile)) {
                return 0L;
            }
            long length = file.length();
            if (delete(file)) {
                logger.info("{} Deleted {} event file ({}) due to storage limits", new Object[]{this, file, FormatUtils.formatDataSize(length)});
                return length;
            }
            logger.warn("{} Failed to delete oldest event file {}. This file should be cleaned up manually.", this, file);
        }
        return 0L;
    }

    private boolean delete(File file) {
        if (file.equals(getActiveEventFile())) {
            logger.debug("Attempting to age off Active Event File {}. Will return without deleting the file.", file);
            return false;
        }
        long minId = DirectoryUtils.getMinId(file);
        synchronized (this.minEventIdToPathMap) {
            this.minEventIdToPathMap.remove(Long.valueOf(minId));
        }
        this.eventFileManager.obtainWriteLock(file);
        try {
            if (file.exists() && !file.delete()) {
                logger.warn("Failed to remove Provenance Event file {}; this file should be cleaned up manually", file);
                this.eventFileManager.releaseWriteLock(file);
                return false;
            }
            File tocFile = TocUtil.getTocFile(file);
            if (tocFile.exists() && !tocFile.delete()) {
                logger.warn("Failed to remove Provenance Table-of-Contents file {}; this file should be cleaned up manually", tocFile);
            }
            return true;
        } finally {
            this.eventFileManager.releaseWriteLock(file);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reindexLatestEvents(EventIndex eventIndex) {
        List list = (List) getEventFilesFromDisk().sorted(DirectoryUtils.SMALLEST_ID_FIRST).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        long minimumEventIdToReindex = eventIndex.getMinimumEventIdToReindex(this.partitionName);
        long maxEventId = getMaxEventId();
        logger.info("The last Provenance Event indexed for partition {} is {}, but the last event written to partition has ID {}. Re-indexing up to the last {} events for {} to ensure that the Event Index is accurate and up-to-date", new Object[]{this.partitionName, Long.valueOf(minimumEventIdToReindex), Long.valueOf(maxEventId), Long.valueOf(maxEventId - minimumEventIdToReindex), this.partitionDirectory});
        int i = 0;
        int size = list.size() - 1;
        while (true) {
            if (size < 0) {
                break;
            }
            if (DirectoryUtils.getMinId((File) list.get(size)) <= minimumEventIdToReindex) {
                i = size;
                break;
            }
            size--;
        }
        List<File> subList = list.subList(i, list.size());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.min(4, subList.size()), new NamedThreadFactory("Re-Index Provenance Events", true));
        ArrayList arrayList = new ArrayList(subList.size());
        AtomicLong atomicLong = new AtomicLong(0L);
        long nanoTime = System.nanoTime();
        int i2 = 0;
        for (File file : subList) {
            int i3 = i2;
            i2++;
            boolean z = i3 == 0;
            arrayList.add(newFixedThreadPool.submit(() -> {
                HashMap hashMap = new HashMap(LuceneEventIndex.MAX_LINEAGE_NODES);
                try {
                    try {
                        RecordReader newRecordReader = this.recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE);
                        if (z) {
                            try {
                                if (!newRecordReader.skipToEvent(minimumEventIdToReindex).isPresent()) {
                                    if (newRecordReader != null) {
                                        newRecordReader.close();
                                        return;
                                    }
                                    return;
                                }
                            } catch (Throwable th) {
                                if (newRecordReader != null) {
                                    try {
                                        newRecordReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        }
                        while (true) {
                            long bytesConsumed = newRecordReader.getBytesConsumed();
                            StandardProvenanceEventRecord nextRecord = newRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            hashMap.put(nextRecord, new StorageSummary(nextRecord.getEventId(), file.getName(), this.partitionName, Integer.valueOf(newRecordReader.getBlockIndex()), newRecordReader.getBytesConsumed() - bytesConsumed, 0L));
                            if (hashMap.size() == 1000) {
                                eventIndex.reindexEvents(hashMap);
                                atomicLong.addAndGet(hashMap.size());
                                hashMap.clear();
                            }
                        }
                        eventIndex.reindexEvents(hashMap);
                        atomicLong.addAndGet(hashMap.size());
                        hashMap.clear();
                        if (newRecordReader != null) {
                            newRecordReader.close();
                        }
                    } catch (Exception e) {
                        logger.error("Failed to index Provenance Events found in {}", file, e);
                    }
                } catch (EOFException | FileNotFoundException e2) {
                    logger.warn("Failed to find event with ID {} in Event File {}", new Object[]{Long.valueOf(minimumEventIdToReindex), file, e2});
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("Interrupted while waiting for Provenance events to be re-indexed", e);
            } catch (ExecutionException e2) {
                logger.error("Failed to re-index some Provenance events. These events may not be query-able via the Provenance interface", e2.getCause());
            }
        }
        try {
            eventIndex.commitChanges(this.partitionName);
        } catch (IOException e3) {
            logger.error("Failed to re-index Provenance Events for partition {}", this.partitionName, e3);
        }
        newFixedThreadPool.shutdown();
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
        logger.info("Finished re-indexing {} events across {} files for {} in {}.{} seconds", new Object[]{Long.valueOf(atomicLong.get()), Integer.valueOf(subList.size()), this.partitionDirectory, Long.valueOf(millis / 1000), Long.valueOf(millis % 1000)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventIterator getEventsByTimestamp(long j, long j2) throws IOException {
        List<File> list = getEventFilesFromDisk().sorted(DirectoryUtils.LARGEST_ID_FIRST).toList();
        if (list.isEmpty()) {
            return EventIterator.EMPTY;
        }
        ArrayList arrayList = new ArrayList();
        for (File file : list) {
            ProvenanceEventRecord firstEvent = getFirstEvent(file);
            if (firstEvent == null) {
                return EventIterator.EMPTY;
            }
            long eventTime = firstEvent.getEventTime();
            if (eventTime <= j2) {
                arrayList.add(file);
                if (eventTime < j) {
                    break;
                }
            }
        }
        return new SequentialRecordReaderEventIterator(arrayList, this.recordReaderFactory, 0L, Integer.MAX_VALUE).filter(provenanceEventRecord -> {
            return provenanceEventRecord.getEventTime() >= j && provenanceEventRecord.getEventTime() <= j2;
        });
    }

    private ProvenanceEventRecord getFirstEvent(File file) throws IOException {
        RecordReader newRecordReader = this.recordReaderFactory.newRecordReader(file, Collections.emptyList(), Integer.MAX_VALUE);
        try {
            StandardProvenanceEventRecord nextRecord = newRecordReader.nextRecord();
            if (newRecordReader != null) {
                newRecordReader.close();
            }
            return nextRecord;
        } catch (Throwable th) {
            if (newRecordReader != null) {
                try {
                    newRecordReader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public String toString() {
        return "Provenance Event Store Partition[directory=" + String.valueOf(this.partitionDirectory) + "]";
    }
}
