package cern.nxcals.ds.importer.consumer;

import cern.cmw.datax.EntryType;
import cern.cmw.datax.ImmutableData;
import cern.nxcals.api.ingestion.IllegalRecordRuntimeException;
import cern.nxcals.api.ingestion.Publisher;
import cern.nxcals.ds.importer.common.NxcalsPublisher;
import cern.nxcals.ds.importer.common.Utils;
import cern.nxcals.ds.importer.common.model.BatchDataX;
import cern.nxcals.ds.importer.common.model.BatchPublicationResult;
import cern.nxcals.ds.importer.common.model.VariablePublicationException;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
/* loaded from: input_file:BOOT-INF/lib/consumer-0.1.2.jar:cern/nxcals/ds/importer/consumer/DefaultNxcalsPublisher.class */
public class DefaultNxcalsPublisher implements NxcalsPublisher<BatchDataX> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultNxcalsPublisher.class);
    private final Map<Serializable, CompletableFuture<BatchPublicationResult>> batchTasks = new ConcurrentHashMap();
    private final Map<Serializable, AtomicInteger> batchRecordsToProcess = new ConcurrentHashMap();
    private final ScheduledExecutorService channelReader = Executors.newScheduledThreadPool(1, Utils.createThreadFactory("channelReader-%d"));
    private final Publisher<ImmutableData> nxcalsPublisher;
    private final Channel<ImmutableData, BatchDataX, BatchRecordDataX> channel;
    private final Supplier<Integer> drainSizeSupplier;
    private final Executor nxcalsExecutor;
    private final Executor ingestionHandler;
    private final PublisherStatsCollector statsCollector;

    public DefaultNxcalsPublisher(Channel<ImmutableData, BatchDataX, BatchRecordDataX> channel, Publisher<ImmutableData> publisher, @Qualifier("nxcalsExecutorQueueRemainingSizeSupplier") Supplier<Integer> supplier, @Qualifier("nxcalsExecutor") Executor executor, @Qualifier("nxcalsHandler") Executor executor2, PublisherStatsCollector publisherStatsCollector) {
        this.nxcalsPublisher = (Publisher) Objects.requireNonNull(publisher);
        this.channel = (Channel) Objects.requireNonNull(channel);
        this.drainSizeSupplier = supplier;
        this.nxcalsExecutor = executor;
        this.ingestionHandler = executor2;
        this.statsCollector = publisherStatsCollector;
    }

    @Override // cern.nxcals.ds.importer.common.NxcalsPublisher
    public CompletableFuture<BatchPublicationResult> send(BatchDataX batchDataX) throws InterruptedException {
        Objects.requireNonNull(batchDataX);
        CompletableFuture<BatchPublicationResult> completableFuture = new CompletableFuture<>();
        Serializable id = batchDataX.getId();
        if (CollectionUtils.isEmpty(batchDataX.getData())) {
            log.info("No data found to be sent inside the batch: {}", id);
            completableFuture.complete(new BatchPublicationResult(id, null));
            return completableFuture;
        }
        if (this.batchTasks.putIfAbsent(id, completableFuture) != null) {
            String str = "Cannot process batch as there's already an unprocessed batch with the same id: " + id;
            log.warn(str);
            completableFuture.complete(new BatchPublicationResult(id, new IllegalStateException(str)));
            return completableFuture;
        }
        this.statsCollector.batch();
        this.batchRecordsToProcess.put(id, new AtomicInteger(batchDataX.getData().size()));
        this.channel.put(batchDataX);
        return completableFuture;
    }

    @PostConstruct
    public void start() {
        this.channelReader.scheduleAtFixedRate(() -> {
            try {
                readAndPublish();
            } catch (Exception e) {
                log.error("This must not happen! Exception while reading from the channel", (Throwable) e);
            }
        }, 0L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdown() {
        this.channelReader.shutdown();
    }

    @VisibleForTesting
    void readAndPublish() {
        LinkedList linkedList = new LinkedList();
        log.debug("Publishing {} records to NXCALS", Integer.valueOf(this.channel.drainTo(linkedList, this.drainSizeSupplier.get().intValue())));
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            publishToNxcals((BatchRecordDataX) it.next());
        }
    }

    private void publishToNxcals(BatchRecordDataX batchRecordDataX) {
        this.statsCollector.sendRecord();
        try {
            this.nxcalsPublisher.publishAsync(batchRecordDataX.getData(), this.nxcalsExecutor).whenCompleteAsync((result, th) -> {
                if (th != null) {
                    log.warn("Got exception while publishing to NXCALS", th);
                    handleException(batchRecordDataX, th);
                } else {
                    log.trace("Record {} sucessfully sent to NXCALS", batchRecordDataX);
                    markRecordAsSent(batchRecordDataX);
                }
            }, this.ingestionHandler);
        } catch (RejectedExecutionException e) {
            log.error("Request has been rejected, retrying (this must not happen as long as the drain size supplier is properly implemented!)", (Throwable) e);
            this.ingestionHandler.execute(() -> {
                republish(batchRecordDataX);
            });
        }
    }

    private void republish(BatchRecordDataX batchRecordDataX) {
        this.statsCollector.retryRecord();
        try {
            this.channel.put(new BatchDataX(batchRecordDataX.getBatchId(), Collections.singletonList(batchRecordDataX.getData())));
        } catch (InterruptedException e) {
            log.error("Interrupted while republishing to NXCALS", (Throwable) e);
            Thread.currentThread().interrupt();
            throw new IllegalThreadStateException(e.getMessage());
        }
    }

    private <T> void markRecordAsSent(BatchRecord<T> batchRecord) {
        this.statsCollector.sentRecord();
        Serializable batchId = batchRecord.getBatchId();
        if (this.batchRecordsToProcess.get(batchId).decrementAndGet() == 0) {
            log.debug("Batch {} fully processed", batchId);
            this.batchRecordsToProcess.remove(batchId);
            this.batchTasks.remove(batchId).complete(new BatchPublicationResult(batchId, null));
        }
    }

    private void handleException(BatchRecordDataX batchRecordDataX, Throwable th) {
        Serializable batchId = batchRecordDataX.getBatchId();
        if (canRePublishFor(th)) {
            log.debug("Republishing data to NXCALS for batch {} due to {}", batchId, th);
            republish(batchRecordDataX);
            return;
        }
        log.error("Cannot republish data to NXCALS for batch {} due to ", batchId, th);
        this.batchRecordsToProcess.remove(batchId);
        CompletableFuture<BatchPublicationResult> remove = this.batchTasks.remove(batchId);
        ImmutableData data = batchRecordDataX.getData();
        remove.complete(new BatchPublicationResult(batchId, new VariablePublicationException((String) data.getEntry("variable_name").getAs(EntryType.STRING), String.format("Cannot send data for %s due to %s", data.getEntry("variable_name").get(), th.getMessage()), th)));
    }

    private boolean canRePublishFor(Throwable th) {
        Throwable th2 = th;
        while (true) {
            Throwable th3 = th2;
            if (th3 == null) {
                return true;
            }
            if ((th3 instanceof IllegalRecordRuntimeException) || (th3 instanceof IllegalStateException) || (th3 instanceof IllegalArgumentException)) {
                return false;
            }
            th2 = th3.getCause();
        }
    }
}
