package cern.nxcals.ds.importer.producer;

import cern.cmw.datax.ImmutableData;
import cern.nxcals.ds.importer.common.NxcalsPublisher;
import cern.nxcals.ds.importer.common.model.BatchDataX;
import cern.nxcals.ds.importer.common.model.BatchPublicationResult;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/lib/producer-0.0.27.jar:cern/nxcals/ds/importer/producer/DataProcessor.class */
public class DataProcessor {
    private static final Logger log = LoggerFactory.getLogger(DataProcessor.class);
    private final Map<Serializable, CompletableFuture<BatchPublicationResult>> resultMap = new ConcurrentHashMap();
    private final Map<Serializable, Set<Serializable>> subBatchesByDataId = new ConcurrentHashMap();
    private final NxcalsPublisher<BatchDataX> publisher;

    @Value("${winccoa.producer.batch.size:1000}")
    private int batchSize;

    public DataProcessor(NxcalsPublisher<BatchDataX> nxcalsPublisher) {
        this.publisher = nxcalsPublisher;
    }

    public CompletableFuture<BatchPublicationResult> process(BatchDataX batchDataX) {
        Serializable id = batchDataX.getId();
        log.debug("Processing {} items for batch id: {}", Integer.valueOf(batchDataX.getData().size()), id);
        if (CollectionUtils.isEmpty(batchDataX.getData())) {
            return CompletableFuture.completedFuture(new BatchPublicationResult(id, null));
        }
        CompletableFuture<BatchPublicationResult> completableFuture = new CompletableFuture<>();
        this.resultMap.put(id, completableFuture);
        List<BatchDataX> createDataBatches = createDataBatches(batchDataX.getData());
        try {
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            Iterator<BatchDataX> it = createDataBatches.iterator();
            while (it.hasNext()) {
                newKeySet.add(it.next().getId());
            }
            this.subBatchesByDataId.put(id, newKeySet);
            Iterator<BatchDataX> it2 = createDataBatches.iterator();
            while (it2.hasNext()) {
                this.publisher.send(it2.next()).thenAccept(batchPublicationResult -> {
                    dataBatchProcessed(id, batchPublicationResult);
                });
            }
        } catch (InterruptedException e) {
            log.error("Interrupted exception sending data [{}] sub batches", id, e);
            Thread.currentThread().interrupt();
            this.resultMap.remove(id).complete(new BatchPublicationResult(id, e));
        }
        return completableFuture;
    }

    private List<BatchDataX> createDataBatches(Collection<ImmutableData> collection) {
        ArrayList arrayList = new ArrayList();
        Iterator it = ListUtils.partition(new ArrayList(collection), this.batchSize).iterator();
        while (it.hasNext()) {
            arrayList.add(new BatchDataX(UUID.randomUUID(), (List) it.next()));
        }
        return arrayList;
    }

    private void dataBatchProcessed(Serializable serializable, BatchPublicationResult batchPublicationResult) {
        log.debug("Finishing batch {} with exception [{}] for dataId: {}", batchPublicationResult.getBatchId(), batchPublicationResult.getCause(), serializable);
        Set<Serializable> set = this.subBatchesByDataId.get(serializable);
        if (set == null) {
            log.warn("Cannot find sub batches defined for data id: {}", serializable);
            return;
        }
        if (!set.isEmpty()) {
            if (batchPublicationResult.getCause() == null) {
                set.remove(batchPublicationResult.getBatchId());
            } else {
                set.clear();
            }
        }
        if (set.isEmpty()) {
            log.debug("Finishing all batches for data id [{}]", serializable);
            CompletableFuture<BatchPublicationResult> remove = this.resultMap.remove(serializable);
            if (remove != null) {
                remove.complete(new BatchPublicationResult(serializable, batchPublicationResult.getCause()));
            }
        }
    }

    @VisibleForTesting
    void setBatchSize(int i) {
        this.batchSize = i;
    }
}
