package cern.nxcals.ds.importer.retransmission.producer;

import cern.nxcals.common.concurrent.AutoCloseableLock;
import cern.nxcals.ds.importer.common.model.BatchData;
import cern.nxcals.ds.importer.producer.DataProcessor;
import cern.nxcals.ds.importer.producer.DataProducer;
import cern.nxcals.ds.importer.producer.DefaultBatchConverter;
import cern.nxcals.ds.importer.producer.dao.BatchDAO;
import cern.nxcals.ds.importer.producer.listener.DataProducerListener;
import cern.nxcals.ds.importer.producer.model.Metadata;
import cern.nxcals.ds.importer.producer.model.VariableData;
import cern.nxcals.ds.importer.producer.stats.ProducerStats;
import cern.nxcals.ds.importer.producer.status.PublicationStatusManager;
import cern.nxcals.ds.importer.retransmission.dao.RetransmissionBatchDAO;
import cern.nxcals.ds.importer.retransmission.dao.RetransmissionMetadataDAO;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Component
@Primary
/* loaded from: input_file:BOOT-INF/classes/cern/nxcals/ds/importer/retransmission/producer/DataRetransmissionProducerImpl.class */
public class DataRetransmissionProducerImpl extends DataProducer implements IRetransmissionDataProducer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataRetransmissionProducerImpl.class);
    private List<Long> variableIds;
    private HashSet<Long> excludeVariableIds;
    private HashSet<String> excludeVariableNames;
    private Instant until;
    private String toSource;
    private BatchDAO.Batch batchInError;

    @Autowired
    public DataRetransmissionProducerImpl(@Qualifier("producerExecutor") Executor executor, @Qualifier("stateUpdater") Executor executor2, DataProcessor dataProcessor, RetransmissionMetadataDAO retransmissionMetadataDAO, BatchDAO batchDAO, DataProducerListener dataProducerListener, ProducerStats producerStats, PublicationStatusManager publicationStatusManager, DefaultBatchConverter defaultBatchConverter) {
        super(executor, executor2, dataProcessor, retransmissionMetadataDAO, batchDAO, dataProducerListener, Clock.systemDefaultZone(), producerStats, publicationStatusManager, defaultBatchConverter);
    }

    @Override // cern.nxcals.ds.importer.producer.DataProducer, cern.nxcals.ds.importer.producer.IDataProducer
    public void collectAndProcess(int i) {
        throw new RuntimeException("Collect and process by groupId is not implemented!");
    }

    @Override // cern.nxcals.ds.importer.retransmission.producer.IRetransmissionDataProducer
    public void collectAndProcessAll(String str, Instant instant, Instant instant2, List<Long> list, List<String> list2) {
        this.batchInError = null;
        this.variableIds = new ArrayList();
        this.toSource = null;
        this.excludeVariableIds = list == null ? null : new HashSet<>(list);
        this.excludeVariableNames = list2 == null ? null : new HashSet<>(list2);
        this.until = instant2;
        run(str, instant);
    }

    @Override // cern.nxcals.ds.importer.retransmission.producer.IRetransmissionDataProducer
    public void collectAndProcess(String str, String str2, Instant instant, Instant instant2, List<Long> list) {
        this.batchInError = null;
        this.variableIds = list;
        this.toSource = str2;
        this.excludeVariableIds = null;
        this.excludeVariableNames = null;
        this.until = instant2;
        run(str, instant);
    }

    private void run(String str, Instant instant) {
        Collection<Metadata> metadata = ((RetransmissionMetadataDAO) this.metadataDAO).getMetadata(instant, str);
        if (CollectionUtils.isEmpty(metadata)) {
            log.warn("No metadata found for rdb source: {}", str);
            return;
        }
        super.collectAndProcess(metadata);
        waitForCompletion();
        if (this.batchInError == null) {
            log.warn("Please send an e-mail to acc-logging-support@cern.ch specifying the data retransmission performed so they can remove data duplicates");
        } else {
            log.warn("Error getting batch from {}", this.batchInError.getFrom());
            log.warn("Please resume the data re-transfer from {}", this.batchInError.getFrom());
        }
    }

    private void waitForCompletion() {
        do {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                log.warn("Wait thread was interrupted, exiting loop", (Throwable) e);
                Thread.currentThread().interrupt();
                return;
            }
        } while (!this.processingMetadata.isEmpty());
    }

    @Override // cern.nxcals.ds.importer.producer.DataProducer
    protected BatchDAO.Batch getBatchFor(Metadata metadata, Instant instant, Instant instant2) {
        AutoCloseableLock lockFor = getLockFor(metadata);
        Throwable th = null;
        try {
            try {
                Metadata checkedAt = metadata.checkedAt(this.metadataStamps.get(metadata));
                if (lockFor != null) {
                    if (0 != 0) {
                        try {
                            lockFor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockFor.close();
                    }
                }
                Instant instant3 = instant2;
                if (this.until != null) {
                    if (instant.isAfter(this.until)) {
                        log.info("Returning empty batch since to timestamp {} is after selected to in retransmission range {}", instant2, this.until);
                        return ((RetransmissionBatchDAO) this.batchDAO).getEmptyBatchFor(checkedAt, instant, instant2);
                    }
                    if (instant.isBefore(this.until) && instant2.isAfter(this.until)) {
                        instant3 = this.until.minusMillis(1L);
                    }
                }
                BatchDAO.Batch batchFor = this.variableIds.isEmpty() ? this.batchDAO.getBatchFor(checkedAt, instant, instant3) : ((RetransmissionBatchDAO) this.batchDAO).getBatchFor(checkedAt, this.variableIds, instant, instant3);
                if (!batchFor.inError()) {
                    return replaceToRdbSource(removeExcludedVariables(batchFor));
                }
                log.info("Got batch in error!", batchFor);
                this.batchInError = batchFor;
                return batchFor;
            } finally {
            }
        } catch (Throwable th3) {
            if (lockFor != null) {
                if (th != null) {
                    try {
                        lockFor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockFor.close();
                }
            }
            throw th3;
        }
    }

    private BatchDAO.Batch replaceToRdbSource(BatchDAO.Batch batch) {
        if (StringUtils.isEmpty(this.toSource)) {
            return batch;
        }
        Iterator<VariableData> it = batch.getBatchData().getData().iterator();
        while (it.hasNext()) {
            it.next().setGroup(this.toSource);
        }
        return batch;
    }

    private BatchDAO.Batch removeExcludedVariables(BatchDAO.Batch batch) {
        if ((this.excludeVariableIds == null || this.excludeVariableIds.isEmpty()) && (this.excludeVariableNames == null || this.excludeVariableNames.isEmpty())) {
            return batch;
        }
        Collection<VariableData> data = batch.getBatchData().getData();
        ArrayList arrayList = new ArrayList();
        for (VariableData variableData : data) {
            if (!isVariableIdExcluded(Long.valueOf(variableData.getVariableId())) && !isVariableNameExcluded(variableData.getName())) {
                arrayList.add(variableData);
            }
        }
        return BatchDAO.Batch.builder().metadata(batch.getMetadata()).from(batch.getFrom()).batchData(new BatchData<>(batch.getBatchData().getId(), arrayList)).publicationTime(batch.getPublicationTime()).queryTime(batch.getQueryTime()).build();
    }

    private boolean isVariableIdExcluded(Long l) {
        return this.excludeVariableIds != null && this.excludeVariableIds.contains(l);
    }

    private boolean isVariableNameExcluded(String str) {
        return this.excludeVariableNames != null && this.excludeVariableNames.contains(str);
    }
}
