package cern.nxcals.ds.importer.producer;

import cern.nxcals.common.concurrent.AutoCloseableLock;
import cern.nxcals.ds.importer.common.model.BatchDataX;
import cern.nxcals.ds.importer.common.model.BatchPublicationResult;
import cern.nxcals.ds.importer.producer.dao.BatchDAO;
import cern.nxcals.ds.importer.producer.dao.MetadataDAO;
import cern.nxcals.ds.importer.producer.listener.DataProducerListener;
import cern.nxcals.ds.importer.producer.listener.ProcessInfo;
import cern.nxcals.ds.importer.producer.model.Metadata;
import cern.nxcals.ds.importer.producer.status.PublicationStatusManager;
import cern.nxcals.ds.importer.producer.status.PublicationStatusManagerImpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Striped;
import java.io.Serializable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/lib/producer-0.0.37.jar:cern/nxcals/ds/importer/producer/DataProducer.class */
public class DataProducer {
    private static final Logger log = LoggerFactory.getLogger(DataProducer.class);
    private final Map<Metadata, AtomicReference<ProcessingState>> processingMetadata;
    private final Map<Serializable, Metadata> metadataByBatchId;
    private final Map<Metadata, Set<BatchDAO.PublicationTime>> metadataBatches;
    private final Map<Metadata, Instant> metadataStamps;
    private final Set<Metadata> lockedMetadata;
    private final Striped<Lock> metadataLocks;
    private final Executor processingExecutor;
    private final Executor updateExecutor;
    private final DataProcessor dataProcessor;
    private final MetadataDAO metadataDAO;
    private final BatchDAO batchDAO;
    private final DataProducerListener listener;
    private final Clock clock;
    private final ProducerStatsCollector statsCollector;
    private final PublicationStatusManager statusManager;

    @Value("${winccoa.producer.batch.window.seconds:300}")
    private int timeWindowSeconds;

    @Value("${winccoa.producer.instance.id}")
    private int processInstanceId;

    @Value("${winccoa.producer.delay.before.now.to.read.data:120}")
    private int delayBeforeNowToReadData;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/producer-0.0.37.jar:cern/nxcals/ds/importer/producer/DataProducer$ProcessingState.class */
    public enum ProcessingState {
        IDLE,
        SENDING_DATA,
        SENDING_DONE
    }

    @Autowired
    public DataProducer(@Qualifier("producerExecutor") Executor executor, @Qualifier("stateUpdater") Executor executor2, DataProcessor dataProcessor, MetadataDAO metadataDAO, BatchDAO batchDAO, DataProducerListener dataProducerListener, ProducerStatsCollector producerStatsCollector, PublicationStatusManager publicationStatusManager) {
        this(executor, executor2, dataProcessor, metadataDAO, batchDAO, dataProducerListener, Clock.systemDefaultZone(), producerStatsCollector, publicationStatusManager);
    }

    @VisibleForTesting
    DataProducer(Executor executor, Executor executor2, DataProcessor dataProcessor, MetadataDAO metadataDAO, BatchDAO batchDAO, DataProducerListener dataProducerListener, Clock clock, ProducerStatsCollector producerStatsCollector, PublicationStatusManager publicationStatusManager) {
        this.processingMetadata = new ConcurrentHashMap();
        this.metadataByBatchId = new ConcurrentHashMap();
        this.metadataBatches = new ConcurrentHashMap();
        this.metadataStamps = new ConcurrentHashMap();
        this.lockedMetadata = ConcurrentHashMap.newKeySet();
        this.metadataLocks = Striped.lazyWeakLock(1000);
        this.processingExecutor = (Executor) Objects.requireNonNull(executor);
        this.updateExecutor = (Executor) Objects.requireNonNull(executor2);
        this.dataProcessor = (DataProcessor) Objects.requireNonNull(dataProcessor);
        this.metadataDAO = (MetadataDAO) Objects.requireNonNull(metadataDAO);
        this.batchDAO = (BatchDAO) Objects.requireNonNull(batchDAO);
        this.listener = (DataProducerListener) Objects.requireNonNull(dataProducerListener);
        this.clock = (Clock) Objects.requireNonNull(clock);
        this.statsCollector = (ProducerStatsCollector) Objects.requireNonNull(producerStatsCollector);
        this.statusManager = (PublicationStatusManager) Objects.requireNonNull(publicationStatusManager);
    }

    public void shutdown(int i) {
        if (this.listener != null) {
            this.listener.onShutdown(new ProcessInfo(this.processInstanceId, i));
        }
        if (this.statsCollector != null) {
            this.statsCollector.groupActive(i, false);
        }
    }

    public void initialDataProcessingScheduled(int i, Instant instant) {
        if (this.listener != null) {
            this.listener.onInitialProcessingScheduled(new ProcessInfo(this.processInstanceId, i), instant);
        }
        if (this.statsCollector != null) {
            this.statsCollector.groupActive(i, true);
        }
    }

    public void collectAndProcess(int i) {
        ProcessInfo processInfo = new ProcessInfo(this.processInstanceId, i);
        this.listener.onProcessingStart(processInfo);
        Collection<Metadata> metadata = this.metadataDAO.getMetadata(i);
        if (CollectionUtils.isEmpty(metadata)) {
            log.warn("No metadata found for group id: {}", Integer.valueOf(i));
            this.listener.onProcessingFinish(processInfo);
            return;
        }
        MetaDataQueryToTimeAligner metaDataQueryToTimeAligner = new MetaDataQueryToTimeAligner(metadata, this.timeWindowSeconds);
        for (Metadata metadata2 : metadata) {
            if (!this.processingMetadata.computeIfAbsent(metadata2, metadata3 -> {
                return new AtomicReference(ProcessingState.IDLE);
            }).compareAndSet(ProcessingState.IDLE, ProcessingState.SENDING_DATA)) {
                log.info("Processing of data of {} is currently ongoing", metadata2);
            } else if (metadata2.getLockedDue() != null) {
                log.warn("Cannot process data of {} as it seems to be locked due to ", metadata2, metadata2.getLockedDue());
                this.processingMetadata.get(metadata2).set(ProcessingState.IDLE);
            } else {
                this.lockedMetadata.remove(metadata2);
                log.info("Processing data of {} since {}", metadata2, metadata2.getLastCheckedUtcStamp());
                CompletableFuture.supplyAsync(() -> {
                    return collectAndProcess(metadata2, metaDataQueryToTimeAligner);
                }, this.processingExecutor).thenAcceptAsync(processingState -> {
                    this.processingMetadata.get(metadata2).set(processingState);
                    tryToFinishProcessingOf(metadata2);
                }, this.processingExecutor);
            }
        }
    }

    private ProcessingState collectAndProcess(Metadata metadata, MetaDataQueryToTimeAligner metaDataQueryToTimeAligner) {
        Instant lastCheckedUtcStamp = metadata.getLastCheckedUtcStamp();
        Instant initialToTime = metaDataQueryToTimeAligner.getInitialToTime(metadata);
        Instant instant = this.clock.instant();
        if (initialToTime.isAfter(instant)) {
            log.info("Finished processing of {} until: {} as the next checked time {} is after current stamp {}", metadata, lastCheckedUtcStamp, initialToTime, instant);
            this.listener.onProcessingFinish(new ProcessInfo(this.processInstanceId, metadata.getGroupId()));
            return ProcessingState.IDLE;
        }
        this.metadataStamps.merge(metadata, metadata.getLastCheckedUtcStamp(), (instant2, instant3) -> {
            return instant3;
        });
        long j = 0;
        while (initialToTime.isBefore(this.clock.instant().minusSeconds(this.delayBeforeNowToReadData)) && !this.lockedMetadata.contains(metadata)) {
            BatchDAO.Batch batchFor = getBatchFor(metadata.checkedAt(this.metadataStamps.get(metadata)), lastCheckedUtcStamp, initialToTime);
            collectStatsFor(batchFor, metadata.getGroupId());
            if (batchFor.getBatchData().getData().isEmpty()) {
                log.debug("Data batch empty, skip processing for {} from {} to {}", metadata, lastCheckedUtcStamp, initialToTime);
                lastCheckedUtcStamp = initialToTime;
                initialToTime = metaDataQueryToTimeAligner.getNextToTime(lastCheckedUtcStamp);
            } else {
                if (batchFor.inError()) {
                    return ProcessingState.IDLE;
                }
                BatchDataX apply = DefaultBatchConverter.INSTANCE.apply(batchFor.getBatchData());
                j += apply.getData().size();
                this.metadataByBatchId.put(apply.getId(), metadata);
                this.metadataBatches.computeIfAbsent(metadata, metadata2 -> {
                    return ConcurrentHashMap.newKeySet();
                }).add(batchFor.getPublicationTime());
                log.debug("Sending {} records to processor for {} from {} to {}", Integer.valueOf(apply.getData().size()), metadata, lastCheckedUtcStamp, initialToTime);
                batchFor.setLogStartTime(this.clock.instant());
                this.dataProcessor.process(apply).thenAcceptAsync(batchPublicationResult -> {
                    handlePublicationResult(batchPublicationResult, batchFor);
                }, this.updateExecutor);
                lastCheckedUtcStamp = initialToTime;
                initialToTime = metaDataQueryToTimeAligner.getNextToTime(lastCheckedUtcStamp);
            }
        }
        log.info("Sent all [{}] records for {} between {} and {}", Long.valueOf(j), metadata, metadata.getLastCheckedUtcStamp(), lastCheckedUtcStamp);
        return ProcessingState.SENDING_DONE;
    }

    private BatchDAO.Batch getBatchFor(Metadata metadata, Instant instant, Instant instant2) {
        AutoCloseableLock lockFor = getLockFor(metadata);
        Throwable th = null;
        try {
            try {
                BatchDAO.Batch batchFor = this.batchDAO.getBatchFor(metadata, instant, instant2);
                if (lockFor != null) {
                    if (0 != 0) {
                        try {
                            lockFor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockFor.close();
                    }
                }
                return batchFor;
            } finally {
            }
        } catch (Throwable th3) {
            if (lockFor != null) {
                if (th != null) {
                    try {
                        lockFor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockFor.close();
                }
            }
            throw th3;
        }
    }

    private void collectStatsFor(BatchDAO.Batch batch, int i) {
        if (batch.inError()) {
            this.statsCollector.readError(i);
            return;
        }
        this.statsCollector.readRecords(i, batch.getBatchData().getData().size());
        Duration queryTime = batch.getQueryTime();
        if (queryTime != null) {
            this.statsCollector.readTime(i, queryTime.toMillis());
        }
    }

    private void handlePublicationResult(BatchPublicationResult batchPublicationResult, BatchDAO.Batch batch) {
        batch.setLogEndTime(this.clock.instant());
        Metadata remove = this.metadataByBatchId.remove(batchPublicationResult.getBatchId());
        log.debug("Handling publication result for metadata: {} until {}", remove, batch.getPublicationTime());
        if (batchPublicationResult.getCause() == null) {
            updatePublicationStatus(batch, remove);
            return;
        }
        log.error("Got exception while publishing data for {}", remove, batchPublicationResult.getCause());
        this.statsCollector.publishError(remove.getGroupId());
        this.lockedMetadata.add(remove);
        remove.setLockedDue(batchPublicationResult.getCause());
        this.metadataDAO.lockMetadata(remove, this.metadataStamps.get(remove));
    }

    private void updatePublicationStatus(BatchDAO.Batch batch, Metadata metadata) {
        this.statusManager.addPublishedBatch(metadata, batch);
        TreeSet treeSet = new TreeSet(this.metadataBatches.get(metadata));
        treeSet.headSet(batch.getPublicationTime(), true).last().done();
        BatchDAO.PublicationTime finishedPublicationTimeOf = getFinishedPublicationTimeOf(treeSet);
        if (finishedPublicationTimeOf == null) {
            log.debug("Publication of batches for metadata {} is not completed until {}, cannot update metadata stamps", metadata, metadata.getLastCheckedUtcStamp());
            return;
        }
        log.debug("Publication of batches for metadata {} is completed until {}, updating metadata stamps", metadata, finishedPublicationTimeOf.getUntil());
        Instant until = finishedPublicationTimeOf.getUntil();
        AutoCloseableLock lockFor = getLockFor(metadata);
        Throwable th = null;
        try {
            try {
                PublicationStatusManagerImpl.PublicationStatus publicationStatus = this.statusManager.getPublicationStatus(metadata, until);
                this.metadataDAO.lockVariablesForChangesIfNeeded(metadata, until);
                this.batchDAO.updateBatchInfo(publicationStatus);
                this.statusManager.removePublishedBatches(metadata, publicationStatus.getPublishedBatches());
                this.metadataStamps.merge(metadata, until, (instant, instant2) -> {
                    return instant2;
                });
                if (lockFor != null) {
                    if (0 != 0) {
                        try {
                            lockFor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockFor.close();
                    }
                }
                this.metadataBatches.get(metadata).removeAll(treeSet.headSet(finishedPublicationTimeOf, true));
                tryToFinishProcessingOf(metadata);
            } finally {
            }
        } catch (Throwable th3) {
            if (lockFor != null) {
                if (th != null) {
                    try {
                        lockFor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockFor.close();
                }
            }
            throw th3;
        }
    }

    private void tryToFinishProcessingOf(Metadata metadata) {
        if (this.processingMetadata.get(metadata).get() == ProcessingState.SENDING_DONE && CollectionUtils.isEmpty(this.metadataBatches.get(metadata))) {
            Instant instant = this.metadataStamps.get(metadata);
            this.metadataBatches.remove(metadata);
            this.metadataStamps.remove(metadata);
            this.statusManager.remove(metadata);
            this.processingMetadata.get(metadata).set(ProcessingState.IDLE);
            if (processingMetadataFinishedForGroupId(metadata.getGroupId())) {
                this.listener.onProcessingFinish(new ProcessInfo(this.processInstanceId, metadata.getGroupId()));
            }
            log.info("Finished processing metadata of {} until: {}", metadata, instant);
        }
    }

    private boolean processingMetadataFinishedForGroupId(int i) {
        for (Map.Entry<Metadata, AtomicReference<ProcessingState>> entry : this.processingMetadata.entrySet()) {
            if (entry.getKey().getGroupId() == i && entry.getValue().get() != ProcessingState.IDLE) {
                return false;
            }
        }
        return true;
    }

    private BatchDAO.PublicationTime getFinishedPublicationTimeOf(NavigableSet<BatchDAO.PublicationTime> navigableSet) {
        BatchDAO.PublicationTime publicationTime = null;
        for (BatchDAO.PublicationTime publicationTime2 : navigableSet) {
            log.trace("Publication to {}, Done: {}", publicationTime2.getUntil(), Boolean.valueOf(publicationTime2.isDone()));
            if (!publicationTime2.isDone()) {
                break;
            }
            publicationTime = publicationTime2;
        }
        return publicationTime;
    }

    @VisibleForTesting
    void setTimeWindowSeconds(int i) {
        this.timeWindowSeconds = i;
    }

    private AutoCloseableLock getLockFor(Object obj) {
        Objects.requireNonNull(obj);
        return AutoCloseableLock.getFor(this.metadataLocks.get(obj));
    }
}
