package cern.nxcals.ds.importer.producer;

import cern.nxcals.common.concurrent.AutoCloseableLock;
import cern.nxcals.ds.importer.common.model.BatchDataX;
import cern.nxcals.ds.importer.common.model.BatchPublicationResult;
import cern.nxcals.ds.importer.producer.dao.BatchDAO;
import cern.nxcals.ds.importer.producer.dao.MetadataDAO;
import cern.nxcals.ds.importer.producer.listener.DataProducerListener;
import cern.nxcals.ds.importer.producer.listener.ProcessInfo;
import cern.nxcals.ds.importer.producer.model.Metadata;
import cern.nxcals.ds.importer.producer.stats.ProducerStats;
import cern.nxcals.ds.importer.producer.status.PublicationStatusManager;
import cern.nxcals.ds.importer.producer.status.PublicationStatusManagerImpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Striped;
import java.io.Serializable;
import java.time.Clock;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:BOOT-INF/lib/producer-0.1.2.jar:cern/nxcals/ds/importer/producer/DataProducer.class */
public abstract class DataProducer implements IDataProducer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataProducer.class);
    protected final Map<Metadata, AtomicReference<ProcessingState>> processingMetadata = new ConcurrentHashMap();
    protected final Map<Serializable, Metadata> metadataByBatchId = new ConcurrentHashMap();
    protected final Map<Metadata, Set<BatchDAO.PublicationTime>> metadataBatches = new ConcurrentHashMap();
    protected final Map<Metadata, Instant> metadataStamps = new ConcurrentHashMap();
    protected final Set<Metadata> lockedMetadata = ConcurrentHashMap.newKeySet();
    protected final Striped<Lock> metadataLocks = Striped.lazyWeakLock(1000);
    protected final Executor processingExecutor;
    protected final Executor updateExecutor;
    protected final DataProcessor dataProcessor;
    protected final MetadataDAO metadataDAO;
    protected final BatchDAO batchDAO;
    protected final DataProducerListener listener;
    protected final Clock clock;
    protected final ProducerStats producerStats;
    protected final PublicationStatusManager statusManager;
    private final DefaultBatchConverter batchConverter;

    @Value("${producer.batch.window.seconds:300}")
    protected int timeWindowSeconds;

    @Value("${producer.instance.id}")
    protected int processInstanceId;

    @Value("${producer.delay.before.now.to.read.data:120}")
    protected int delayBeforeNowToReadData;

    @Value("${producer.max.delay.to.insert.local.data.hours:12}")
    protected int maxDelay2InsertLocalData;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/producer-0.1.2.jar:cern/nxcals/ds/importer/producer/DataProducer$ProcessingState.class */
    public enum ProcessingState {
        SENDING_DATA,
        SENDING_DONE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataProducer(Executor executor, Executor executor2, DataProcessor dataProcessor, MetadataDAO metadataDAO, BatchDAO batchDAO, DataProducerListener dataProducerListener, Clock clock, ProducerStats producerStats, PublicationStatusManager publicationStatusManager, DefaultBatchConverter defaultBatchConverter) {
        this.processingExecutor = (Executor) Objects.requireNonNull(executor);
        this.updateExecutor = (Executor) Objects.requireNonNull(executor2);
        this.dataProcessor = (DataProcessor) Objects.requireNonNull(dataProcessor);
        this.metadataDAO = (MetadataDAO) Objects.requireNonNull(metadataDAO);
        this.batchDAO = (BatchDAO) Objects.requireNonNull(batchDAO);
        this.listener = (DataProducerListener) Objects.requireNonNull(dataProducerListener);
        this.clock = (Clock) Objects.requireNonNull(clock);
        this.producerStats = producerStats;
        this.statusManager = (PublicationStatusManager) Objects.requireNonNull(publicationStatusManager);
        this.batchConverter = (DefaultBatchConverter) Objects.requireNonNull(defaultBatchConverter);
    }

    @Override // cern.nxcals.ds.importer.producer.IDataProducer
    public void shutdown(int i) {
        if (this.listener != null) {
            this.listener.onShutdown(new ProcessInfo(this.processInstanceId, i));
        }
    }

    @Override // cern.nxcals.ds.importer.producer.IDataProducer
    public void initialDataProcessingScheduled(int i, Instant instant) {
        if (this.listener != null) {
            this.listener.onInitialProcessingScheduled(new ProcessInfo(this.processInstanceId, i), instant);
        }
    }

    @Override // cern.nxcals.ds.importer.producer.IDataProducer
    public void collectAndProcess(int i) {
        ProcessInfo processInfo = new ProcessInfo(this.processInstanceId, i);
        this.listener.onProcessingStart(processInfo);
        this.producerStats.updateInitialProcessorMetrics(i);
        if (isProcessingOngoing(i)) {
            return;
        }
        Collection<Metadata> metadata = this.metadataDAO.getMetadata(i);
        if (CollectionUtils.isEmpty(metadata)) {
            log.warn("No metadata found for group id: {}", Integer.valueOf(i));
            this.listener.onProcessingFinish(processInfo);
        } else {
            collectAndProcess(metadata);
            this.producerStats.updateProcessingMetadata(i, this.processingMetadata.keySet());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectAndProcess(Collection<Metadata> collection) {
        if (collection.isEmpty()) {
            return;
        }
        Collection<Metadata> preprocessMetadata = preprocessMetadata(collection);
        MetaDataQueryToTimeAligner metaDataQueryToTimeAligner = new MetaDataQueryToTimeAligner(preprocessMetadata, this.timeWindowSeconds);
        for (Metadata metadata : preprocessMetadata) {
            this.processingMetadata.put(metadata, new AtomicReference<>(ProcessingState.SENDING_DATA));
            if (metadata.getLockedDue() != null) {
                log.warn("Cannot process data of {} as it seems to be locked due to ", metadata, metadata.getLockedDue());
            } else {
                this.lockedMetadata.remove(metadata);
                log.info("Processing data of {} since {}", metadata, metadata.getLastCheckedUtcStamp());
                CompletableFuture.supplyAsync(() -> {
                    return collectAndProcess(metadata, metaDataQueryToTimeAligner);
                }, this.processingExecutor).thenAcceptAsync(processingState -> {
                    this.processingMetadata.get(metadata).set(processingState);
                    tryToFinishProcessingOf(metadata);
                }, this.processingExecutor);
            }
        }
    }

    protected Collection<Metadata> preprocessMetadata(Collection<Metadata> collection) {
        return collection;
    }

    private boolean isProcessingOngoing(int i) {
        boolean z = false;
        for (Map.Entry<Metadata, AtomicReference<ProcessingState>> entry : this.processingMetadata.entrySet()) {
            Metadata key = entry.getKey();
            if (key.getGroupId() == i) {
                Set<BatchDAO.PublicationTime> set = this.metadataBatches.get(key);
                Instant instant = this.metadataStamps.get(key);
                Logger logger = log;
                Object[] objArr = new Object[4];
                objArr[0] = key;
                objArr[1] = entry.getValue();
                objArr[2] = instant;
                objArr[3] = Integer.valueOf(set != null ? set.size() : 0);
                logger.info("--> Still processing metadata {} with status {}, last stamp {} and {} not confirmed batches {}", objArr);
                z = true;
            }
        }
        return z;
    }

    private ProcessingState collectAndProcess(Metadata metadata, MetaDataQueryToTimeAligner metaDataQueryToTimeAligner) {
        Pair<Instant, Instant> initialFromAndTo = getInitialFromAndTo(metadata, metaDataQueryToTimeAligner);
        Instant left = initialFromAndTo.getLeft();
        Instant right = initialFromAndTo.getRight();
        if (!isNotToTimeInFuture(metadata, left, right)) {
            this.producerStats.resetReadTime(metadata.getGroupId());
            return ProcessingState.SENDING_DONE;
        }
        this.metadataStamps.put(metadata, metadata.getLastCheckedUtcStamp());
        long j = 0;
        while (right.isBefore(this.clock.instant().minusSeconds(this.delayBeforeNowToReadData)) && !this.lockedMetadata.contains(metadata)) {
            log.info("Getting batch for {} from {} to {}", metadata, left, right);
            BatchDAO.Batch batchFor = getBatchFor(metadata, left, right);
            this.producerStats.collectStatsFor(batchFor, metadata.getGroupId());
            if (batchFor.inError()) {
                log.info("Batch in error for {} from {} to {}", metadata, left, right);
                this.producerStats.resetReadTime(metadata.getGroupId());
                return ProcessingState.SENDING_DONE;
            }
            if (batchFor.getBatchData().getData().isEmpty()) {
                log.info("Data batch empty, skip processing for {} from {} to {}", metadata, left, right);
                left = right;
                right = getNextToTime(metaDataQueryToTimeAligner, metadata, left);
            } else {
                BatchDataX apply = this.batchConverter.apply(batchFor.getBatchData());
                j += apply.getData().size();
                this.metadataByBatchId.put(apply.getId(), metadata);
                this.metadataBatches.computeIfAbsent(metadata, metadata2 -> {
                    return ConcurrentHashMap.newKeySet();
                }).add(batchFor.getPublicationTime());
                log.debug("Sending {} records to processor for {} from {} to {}", Integer.valueOf(apply.getData().size()), metadata, left, right);
                batchFor.setLogStartTime(this.clock.instant());
                this.dataProcessor.process(apply).thenAcceptAsync(batchPublicationResult -> {
                    handlePublicationResult(batchPublicationResult, batchFor);
                }, this.updateExecutor);
                log.debug("Sent {} records to processor for {} from {} to {}", Integer.valueOf(apply.getData().size()), metadata, left, right);
                left = right;
                right = getNextToTime(metaDataQueryToTimeAligner, metadata, left);
            }
        }
        log.info("Sent all [{}] records for {} between {} and {}", Long.valueOf(j), metadata, metadata.getLastCheckedUtcStamp(), left);
        this.producerStats.resetReadTime(metadata.getGroupId());
        return ProcessingState.SENDING_DONE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Instant getNextToTime(MetaDataQueryToTimeAligner metaDataQueryToTimeAligner, Metadata metadata, Instant instant) {
        return metaDataQueryToTimeAligner.getNextToTime(instant);
    }

    private Pair<Instant, Instant> getInitialFromAndTo(Metadata metadata, MetaDataQueryToTimeAligner metaDataQueryToTimeAligner) {
        Instant lastCheckedUtcStamp = metadata.getLastCheckedUtcStamp();
        if (!metadata.isNullableLastCheckedUtcStamp() && this.maxDelay2InsertLocalData != -1) {
            Instant minusSeconds = Instant.now().minusSeconds(TimeUnit.HOURS.toSeconds(this.maxDelay2InsertLocalData));
            if (lastCheckedUtcStamp.isBefore(minusSeconds)) {
                log.info("Detected from {} which is older than {}, resetting it to {} ", lastCheckedUtcStamp, minusSeconds, minusSeconds);
                Instant alignTime = MetaDataQueryToTimeAligner.alignTime(minusSeconds, true);
                return Pair.of(alignTime, metaDataQueryToTimeAligner.getNextToTime(alignTime));
            }
        }
        return Pair.of(lastCheckedUtcStamp, metaDataQueryToTimeAligner.getInitialToTime(metadata));
    }

    private boolean isNotToTimeInFuture(Metadata metadata, Instant instant, Instant instant2) {
        Instant instant3 = this.clock.instant();
        if (instant2.isBefore(instant3.minusSeconds(this.delayBeforeNowToReadData))) {
            return true;
        }
        log.info("Finished processing of {} until: {} as the next checked time {} is after to stamp boundaries {}", metadata, instant, instant2, instant3.minusSeconds(this.delayBeforeNowToReadData));
        this.listener.onProcessingFinish(new ProcessInfo(this.processInstanceId, metadata.getGroupId()));
        return false;
    }

    protected abstract BatchDAO.Batch getBatchFor(Metadata metadata, Instant instant, Instant instant2);

    private void handlePublicationResult(BatchPublicationResult batchPublicationResult, BatchDAO.Batch batch) {
        Metadata metadata = null;
        try {
            metadata = this.metadataByBatchId.remove(batchPublicationResult.getBatchId());
            batch.setLogEndTime(this.clock.instant());
            log.debug("Handling publication result for metadata: {} until {}", metadata, batch.getPublicationTime());
            if (batchPublicationResult.getCause() != null) {
                log.error("Got exception while publishing data for {}", metadata, batchPublicationResult.getCause());
                this.producerStats.incrementPublishError(metadata.getGroupId());
                this.lockedMetadata.add(metadata);
                metadata.setLockedDue(batchPublicationResult.getCause());
                this.metadataDAO.lockMetadata(metadata, this.metadataStamps.get(metadata));
                return;
            }
            AutoCloseableLock lockFor = getLockFor(metadata);
            Throwable th = null;
            try {
                try {
                    tryToUpdatePublicationStatus(batch, metadata);
                    if (lockFor != null) {
                        if (0 != 0) {
                            try {
                                lockFor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lockFor.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Error while handling publication for {}", metadata, e);
        }
    }

    private void tryToUpdatePublicationStatus(BatchDAO.Batch batch, Metadata metadata) {
        this.statusManager.addPublishedBatch(metadata, batch);
        TreeSet treeSet = new TreeSet(this.metadataBatches.get(metadata));
        NavigableSet<BatchDAO.PublicationTime> headSet = treeSet.headSet(batch.getPublicationTime(), true);
        headSet.last().done();
        BatchDAO.PublicationTime finishedPublicationTimeOf = getFinishedPublicationTimeOf(treeSet);
        if (finishedPublicationTimeOf == null) {
            log.debug("Publication of batches for metadata {} is not completed until {}, cannot update metadata stamps as the following batches are not done yet {}", metadata, batch.getPublicationTime(), headSet.stream().filter(publicationTime -> {
                return !publicationTime.isDone();
            }).collect(Collectors.toList()));
        } else {
            updatePublicationStatus(metadata, treeSet, finishedPublicationTimeOf);
        }
    }

    protected void updatePublicationStatus(Metadata metadata, NavigableSet<BatchDAO.PublicationTime> navigableSet, BatchDAO.PublicationTime publicationTime) {
        log.debug("Publication of batches for metadata {} is completed until {}, updating metadata stamps", metadata, publicationTime.getUntil());
        PublicationStatusManagerImpl.PublicationStatus publicationStatus = this.statusManager.getPublicationStatus(metadata, publicationTime.getUntil());
        this.batchDAO.updateLastLoggedValues(publicationStatus);
        this.metadataDAO.lockVariablesForChangesIfNeeded(metadata, publicationStatus);
        this.batchDAO.updateBatchInfo(publicationStatus);
        this.statusManager.removePublishedBatches(metadata, publicationStatus.getPublishedBatches());
        Instant instant = publicationStatus.getLastCheckedStamp().toInstant();
        this.metadataStamps.merge(metadata, instant, (instant2, instant3) -> {
            return instant3;
        });
        log.debug("Metadata stamps updated for {} until {} and distinct last updates {}", metadata, instant, publicationStatus.getLastCheckedStamps());
        this.metadataBatches.get(metadata).removeAll(navigableSet.headSet(publicationTime, true));
        tryToFinishProcessingOf(metadata);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tryToFinishProcessingOf(Metadata metadata) {
        ProcessingState processingState = this.processingMetadata.get(metadata).get();
        if (processingState != ProcessingState.SENDING_DONE) {
            log.debug("Cannot finish processing of {} as its status is still {}", metadata, processingState);
            return;
        }
        Set<BatchDAO.PublicationTime> set = this.metadataBatches.get(metadata);
        if (!CollectionUtils.isEmpty(set)) {
            log.debug("Still {} batches to be confirmed for {}: {}", Integer.valueOf(set.size()), metadata, set);
            return;
        }
        Instant instant = this.metadataStamps.get(metadata);
        this.metadataBatches.remove(metadata);
        this.metadataStamps.remove(metadata);
        this.statusManager.remove(metadata);
        this.processingMetadata.remove(metadata);
        if (this.producerStats.updateProcessingMetadata(metadata.getGroupId(), this.processingMetadata.keySet()) == 0) {
            this.producerStats.updateReadWriteErrors(metadata.getGroupId());
            this.listener.onProcessingFinish(new ProcessInfo(this.processInstanceId, metadata.getGroupId()));
        }
        log.info("Finished processing metadata of {} until: {} ", metadata, instant);
    }

    private BatchDAO.PublicationTime getFinishedPublicationTimeOf(NavigableSet<BatchDAO.PublicationTime> navigableSet) {
        BatchDAO.PublicationTime publicationTime = null;
        for (BatchDAO.PublicationTime publicationTime2 : navigableSet) {
            log.trace("Publication to {}, Done: {}", publicationTime2.getUntil(), Boolean.valueOf(publicationTime2.isDone()));
            if (!publicationTime2.isDone()) {
                break;
            }
            publicationTime = publicationTime2;
        }
        return publicationTime;
    }

    @VisibleForTesting
    void setTimeWindowSeconds(int i) {
        this.timeWindowSeconds = i;
    }

    void setMaxDelay2InsertLocalData(int i) {
        this.maxDelay2InsertLocalData = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final AutoCloseableLock getLockFor(Object obj) {
        Objects.requireNonNull(obj);
        return AutoCloseableLock.getFor(this.metadataLocks.get(obj));
    }
}
