package cern.nxcals.api.ingestion;

import cern.nxcals.api.ingestion.BufferedPublisher;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.5.5.jar:cern/nxcals/api/ingestion/BufferReader.class */
class BufferReader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BufferReader.class);
    private final ScheduledExecutorService reader = Executors.newSingleThreadScheduledExecutor();

    @NonNull
    private final DataSink<RecordData, DefaultCallback> sink;

    @NonNull
    private final Buffer<BufferedPublisher.DelayedMessage> buffer;
    private final int nbOfEntitiesPerRound;

    @NonNull
    private final Executor processor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(Duration duration) {
        this.reader.scheduleAtFixedRate(this::flush, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.reader.shutdown();
    }

    @VisibleForTesting
    void flush() {
        LinkedList linkedList = new LinkedList();
        try {
            this.buffer.drainTo(linkedList);
            if (linkedList.isEmpty()) {
                log.debug("No messages to send");
                return;
            }
            log.debug("Flushing {} messages to NXCALS", Integer.valueOf(linkedList.size()));
            HashMap hashMap = new HashMap(this.nbOfEntitiesPerRound);
            linkedList.forEach(delayedMessage -> {
                ((List) hashMap.computeIfAbsent(delayedMessage.getEntityData().getEntityKey(), str -> {
                    return new LinkedList();
                })).add(delayedMessage);
            });
            hashMap.values().forEach(this::send);
        } catch (Exception e) {
            log.error("This must not happen. If it does it's either a developer error or jvm problem", e);
            completeExceptionally(linkedList, e);
        }
    }

    private void send(List<BufferedPublisher.DelayedMessage> list) {
        try {
            this.processor.execute(() -> {
                try {
                    partitionsOf(list).forEach(this::send);
                } catch (Exception e) {
                    completeExceptionally(list, e);
                }
            });
        } catch (RejectedExecutionException e) {
            completeExceptionally(list, e);
        }
    }

    private void completeExceptionally(Collection<BufferedPublisher.DelayedMessage> collection, Throwable th) {
        collection.forEach(delayedMessage -> {
            if (delayedMessage.getResult().isDone()) {
                return;
            }
            delayedMessage.completeExceptionally(th);
        });
    }

    private Collection<LinkedList<BufferedPublisher.DelayedMessage>> partitionsOf(List<BufferedPublisher.DelayedMessage> list) {
        list.sort(Comparator.comparing(this::msgTime));
        LinkedList linkedList = new LinkedList();
        Object obj = null;
        for (BufferedPublisher.DelayedMessage delayedMessage : list) {
            String partitionKeyOf = getPartitionKeyOf(delayedMessage.getEntityData());
            if (!partitionKeyOf.equals(obj)) {
                linkedList.add(new LinkedList());
                obj = partitionKeyOf;
            }
            ((LinkedList) linkedList.getLast()).add(delayedMessage);
        }
        return linkedList;
    }

    private Long msgTime(BufferedPublisher.DelayedMessage delayedMessage) {
        return delayedMessage.getEntityData().getTime();
    }

    private String getPartitionKeyOf(BufferedPublisher.EntityData entityData) {
        return StringUtils.join(entityData.getPartitionKey(), entityData.getSchemaKey());
    }

    private void send(LinkedList<BufferedPublisher.DelayedMessage> linkedList) {
        sendSync(linkedList.removeFirst());
        linkedList.forEach(this::sendAsync);
    }

    private void sendAsync(BufferedPublisher.DelayedMessage delayedMessage) {
        try {
            delayedMessage.getExecutor().execute(() -> {
                sendSync(delayedMessage);
            });
        } catch (RejectedExecutionException e) {
            delayedMessage.completeExceptionally(e);
        }
    }

    private void sendSync(BufferedPublisher.DelayedMessage delayedMessage) {
        try {
            this.sink.send(getRecordFrom(delayedMessage), (result, exc) -> {
                if (exc != null) {
                    delayedMessage.completeExceptionally(exc);
                } else {
                    delayedMessage.complete(result);
                }
            });
        } catch (Exception e) {
            delayedMessage.completeExceptionally(e);
        }
    }

    private RecordData getRecordFrom(BufferedPublisher.DelayedMessage delayedMessage) {
        return new RecordData(delayedMessage.getEntityData().getEntityMaker().get(), delayedMessage.getData(), delayedMessage.getEntityData().getTime().longValue());
    }

    public BufferReader(@NonNull DataSink<RecordData, DefaultCallback> dataSink, @NonNull Buffer<BufferedPublisher.DelayedMessage> buffer, int i, @NonNull Executor executor) {
        if (dataSink == null) {
            throw new NullPointerException("sink is marked non-null but is null");
        }
        if (buffer == null) {
            throw new NullPointerException("buffer is marked non-null but is null");
        }
        if (executor == null) {
            throw new NullPointerException("processor is marked non-null but is null");
        }
        this.sink = dataSink;
        this.buffer = buffer;
        this.nbOfEntitiesPerRound = i;
        this.processor = executor;
    }
}
