package org.apache.nifi.controller.status.history.questdb;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/nifi/controller/status/history/questdb/BufferedStatusHistoryStorage.class */
public final class BufferedStatusHistoryStorage implements StatusHistoryStorage {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedStatusHistoryStorage.class);
    private final StatusHistoryStorage storage;
    private final long persistFrequencyInMs;
    private final int persistBatchSize;
    private final String id = UUID.randomUUID().toString();
    private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("BufferedStatusHistoryStorage-" + this.id + "-%d").build());
    private final BlockingQueue<CapturedStatus<NodeStatus>> nodeStatusQueue = new LinkedBlockingQueue();
    private final BlockingQueue<CapturedStatus<GarbageCollectionStatus>> garbageCollectionStatusQueue = new LinkedBlockingQueue();
    private final BlockingQueue<CapturedStatus<ProcessGroupStatus>> processGroupStatusQueue = new LinkedBlockingQueue();
    private final BlockingQueue<CapturedStatus<ConnectionStatus>> connectionStatusQueue = new LinkedBlockingQueue();
    private final BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> remoteProcessGroupStatusQueue = new LinkedBlockingQueue();
    private final BlockingQueue<CapturedStatus<ProcessorStatus>> processorStatusQueue = new LinkedBlockingQueue();

    /* loaded from: input_file:org/apache/nifi/controller/status/history/questdb/BufferedStatusHistoryStorage$BufferedStatusHistoryStorageWorker.class */
    private class BufferedStatusHistoryStorageWorker implements Runnable {
        private BufferedStatusHistoryStorageWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            BufferedStatusHistoryStorage.LOGGER.debug("Start flushing");
            BlockingQueue<CapturedStatus<NodeStatus>> blockingQueue = BufferedStatusHistoryStorage.this.nodeStatusQueue;
            StatusHistoryStorage statusHistoryStorage = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage);
            flush(blockingQueue, statusHistoryStorage::storeNodeStatuses);
            BlockingQueue<CapturedStatus<GarbageCollectionStatus>> blockingQueue2 = BufferedStatusHistoryStorage.this.garbageCollectionStatusQueue;
            StatusHistoryStorage statusHistoryStorage2 = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage2);
            flush(blockingQueue2, statusHistoryStorage2::storeGarbageCollectionStatuses);
            BlockingQueue<CapturedStatus<ProcessGroupStatus>> blockingQueue3 = BufferedStatusHistoryStorage.this.processGroupStatusQueue;
            StatusHistoryStorage statusHistoryStorage3 = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage3);
            flush(blockingQueue3, statusHistoryStorage3::storeProcessGroupStatuses);
            BlockingQueue<CapturedStatus<ConnectionStatus>> blockingQueue4 = BufferedStatusHistoryStorage.this.connectionStatusQueue;
            StatusHistoryStorage statusHistoryStorage4 = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage4);
            flush(blockingQueue4, statusHistoryStorage4::storeConnectionStatuses);
            BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> blockingQueue5 = BufferedStatusHistoryStorage.this.remoteProcessGroupStatusQueue;
            StatusHistoryStorage statusHistoryStorage5 = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage5);
            flush(blockingQueue5, statusHistoryStorage5::storeRemoteProcessorGroupStatuses);
            BlockingQueue<CapturedStatus<ProcessorStatus>> blockingQueue6 = BufferedStatusHistoryStorage.this.processorStatusQueue;
            StatusHistoryStorage statusHistoryStorage6 = BufferedStatusHistoryStorage.this.storage;
            Objects.requireNonNull(statusHistoryStorage6);
            flush(blockingQueue6, statusHistoryStorage6::storeProcessorStatuses);
            BufferedStatusHistoryStorage.LOGGER.debug("Finish flushing");
        }

        private <T> void flush(BlockingQueue<T> blockingQueue, Consumer<Collection<T>> consumer) {
            ArrayList arrayList = new ArrayList(BufferedStatusHistoryStorage.this.persistBatchSize);
            blockingQueue.drainTo(arrayList, BufferedStatusHistoryStorage.this.persistBatchSize);
            if (arrayList.isEmpty()) {
                return;
            }
            try {
                consumer.accept(arrayList);
            } catch (Exception e) {
                BufferedStatusHistoryStorage.LOGGER.error("Error during flushing buffered status history information.", e);
            }
        }
    }

    public BufferedStatusHistoryStorage(StatusHistoryStorage statusHistoryStorage, long j, int i) {
        this.storage = statusHistoryStorage;
        this.persistFrequencyInMs = j;
        this.persistBatchSize = i;
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void init() {
        this.storage.init();
        this.scheduledFutures.add(this.scheduledExecutorService.scheduleWithFixedDelay(new BufferedStatusHistoryStorageWorker(), this.persistFrequencyInMs, this.persistFrequencyInMs, TimeUnit.MILLISECONDS));
        LOGGER.info("Flushing is initiated");
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void close() {
        this.storage.close();
        LOGGER.debug("Flushing shutdown started");
        int i = 0;
        int i2 = 0;
        Iterator<ScheduledFuture<?>> it = this.scheduledFutures.iterator();
        while (it.hasNext()) {
            if (it.next().cancel(true)) {
                i++;
            } else {
                i2++;
            }
        }
        LOGGER.debug("Flushing shutdown task cancellation status: completed [{}] failed [{}]", Integer.valueOf(i), Integer.valueOf(i2));
        LOGGER.debug("Scheduled Task Service shutdown remaining tasks [{}]", Integer.valueOf(this.scheduledExecutorService.shutdownNow().size()));
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getConnectionSnapshots(String str, Date date, Date date2) {
        return this.storage.getConnectionSnapshots(str, date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getProcessGroupSnapshots(String str, Date date, Date date2) {
        return this.storage.getProcessGroupSnapshots(str, date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getRemoteProcessGroupSnapshots(String str, Date date, Date date2) {
        return this.storage.getRemoteProcessGroupSnapshots(str, date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getProcessorSnapshots(String str, Date date, Date date2) {
        return this.storage.getProcessorSnapshots(str, date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getProcessorSnapshotsWithCounters(String str, Date date, Date date2) {
        return this.storage.getProcessorSnapshotsWithCounters(str, date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(Date date, Date date2) {
        return this.storage.getGarbageCollectionSnapshots(date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public List<StatusSnapshot> getNodeStatusSnapshots(Date date, Date date2) {
        return this.storage.getNodeStatusSnapshots(date, date2);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeNodeStatuses(Collection<CapturedStatus<NodeStatus>> collection) {
        this.nodeStatusQueue.addAll(collection);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeGarbageCollectionStatuses(Collection<CapturedStatus<GarbageCollectionStatus>> collection) {
        this.garbageCollectionStatusQueue.addAll(collection);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeProcessGroupStatuses(Collection<CapturedStatus<ProcessGroupStatus>> collection) {
        this.processGroupStatusQueue.addAll(collection);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeConnectionStatuses(Collection<CapturedStatus<ConnectionStatus>> collection) {
        this.connectionStatusQueue.addAll(collection);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeRemoteProcessorGroupStatuses(Collection<CapturedStatus<RemoteProcessGroupStatus>> collection) {
        this.remoteProcessGroupStatusQueue.addAll(collection);
    }

    @Override // org.apache.nifi.controller.status.history.questdb.StatusHistoryStorage
    public void storeProcessorStatuses(Collection<CapturedStatus<ProcessorStatus>> collection) {
        this.processorStatusQueue.addAll(collection);
    }
}
