package io.fluxcapacitor.common.tracking;

import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/common/tracking/DefaultTrackingStrategy.class */
public class DefaultTrackingStrategy implements TrackingStrategy {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTrackingStrategy.class);
    private final MessageStore source;
    private final TaskScheduler scheduler;
    private final int segments;
    private final Map<Tracker, WaitingTracker> waitingTrackers;
    private final ConcurrentHashMap<String, TrackerCluster> clusters;
    private volatile long lastSeenIndex;
    private final Registration sourceRegistration;
    private volatile boolean stopped;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/fluxcapacitor/common/tracking/DefaultTrackingStrategy$WaitingTracker.class */
    public class WaitingTracker implements Runnable {
        private final Tracker tracker;
        private final Registration scheduleToken;
        private final Runnable followUp;

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.scheduleToken.cancel();
                if (DefaultTrackingStrategy.this.waitingTrackers.remove(this.tracker, this)) {
                    this.followUp.run();
                }
            } catch (Throwable th) {
                DefaultTrackingStrategy.log.error("Failed to execute tracker fetch / follow up", th);
            }
        }

        @Generated
        @ConstructorProperties({"tracker", "scheduleToken", "followUp"})
        public WaitingTracker(Tracker tracker, Registration registration, Runnable runnable) {
            this.tracker = tracker;
            this.scheduleToken = registration;
            this.followUp = runnable;
        }
    }

    public DefaultTrackingStrategy(MessageStore messageStore) {
        this(messageStore, new InMemoryTaskScheduler());
    }

    public DefaultTrackingStrategy(MessageStore messageStore, TaskScheduler taskScheduler) {
        this(messageStore, taskScheduler, Position.MAX_SEGMENT);
    }

    protected DefaultTrackingStrategy(MessageStore messageStore, TaskScheduler taskScheduler, int i) {
        this.waitingTrackers = Collections.synchronizedMap(new HashMap());
        this.clusters = new ConcurrentHashMap<>();
        this.lastSeenIndex = -1L;
        this.source = messageStore;
        this.scheduler = taskScheduler;
        this.segments = i;
        this.sourceRegistration = messageStore.registerMonitor(this::onUpdate);
        purgeCeasedTrackers(Duration.ofSeconds(2L));
    }

    @Override // io.fluxcapacitor.common.tracking.TrackingStrategy
    public void getBatch(Tracker tracker, PositionStore positionStore) {
        Position position;
        List<SerializedMessage> batch;
        List<SerializedMessage> filter;
        TrackerCluster trackerCluster = this.clusters.get(tracker.getConsumerName());
        int[] claimSegment = claimSegment(tracker);
        try {
            try {
                if (claimSegment[0] == claimSegment[1]) {
                    waitForMessages(tracker, new MessageBatch(claimSegment, Collections.emptyList(), null, Position.newPosition()), positionStore);
                    if (trackerCluster == null || Objects.deepEquals(trackerCluster.getSegment(tracker), claimSegment)) {
                        return;
                    }
                    onClusterUpdate(trackerCluster);
                    return;
                }
                int adjustMaxSize = adjustMaxSize(tracker, tracker.getMaxSize());
                long j = this.lastSeenIndex;
                do {
                    position = position(tracker, positionStore, claimSegment);
                    batch = getBatch(claimSegment, position, adjustMaxSize);
                    filter = filter(batch, claimSegment, position, tracker);
                    if (!batch.isEmpty() && filter.isEmpty()) {
                        long longValue = batch.get(batch.size() - 1).getIndex().longValue();
                        if (longValue < indexFromMillis(System.currentTimeMillis() - tracker.maxTimeout())) {
                            tracker.send(new MessageBatch(claimSegment, filter, Long.valueOf(longValue), position));
                            if (trackerCluster == null || Objects.deepEquals(trackerCluster.getSegment(tracker), claimSegment)) {
                                return;
                            }
                            onClusterUpdate(trackerCluster);
                            return;
                        }
                        positionStore.storePosition(tracker.getConsumerName(), claimSegment, longValue);
                        tracker = tracker.withLastTrackerIndex(Long.valueOf(longValue));
                    }
                    if (batch.isEmpty() || !filter.isEmpty()) {
                        break;
                    }
                } while (!tracker.hasMissedDeadline());
                MessageBatch messageBatch = new MessageBatch(claimSegment, filter, getLastIndex(batch), position);
                if (messageBatch.isEmpty()) {
                    waitForMessages(tracker, messageBatch, positionStore);
                    if (j < this.lastSeenIndex && (messageBatch.getLastIndex() == null || messageBatch.getLastIndex().longValue() < this.lastSeenIndex)) {
                        WaitingTracker waitingTracker = this.waitingTrackers.get(tracker);
                        if (waitingTracker != null && waitingTracker.tracker == tracker) {
                            waitingTracker.run();
                        }
                    }
                } else {
                    tracker.send(messageBatch);
                }
                if (trackerCluster == null || Objects.deepEquals(trackerCluster.getSegment(tracker), claimSegment)) {
                    return;
                }
                onClusterUpdate(trackerCluster);
            } catch (Throwable th) {
                log.error("Failed to get a batch for tracker {}", tracker, th);
                waitForMessages(tracker, new MessageBatch(claimSegment, Collections.emptyList(), null, Position.newPosition()), positionStore);
                if (trackerCluster == null || Objects.deepEquals(trackerCluster.getSegment(tracker), claimSegment)) {
                    return;
                }
                onClusterUpdate(trackerCluster);
            }
        } catch (Throwable th2) {
            if (trackerCluster != null && !Objects.deepEquals(trackerCluster.getSegment(tracker), claimSegment)) {
                onClusterUpdate(trackerCluster);
            }
            throw th2;
        }
    }

    @Override // io.fluxcapacitor.common.tracking.TrackingStrategy
    public void claimSegment(Tracker tracker, PositionStore positionStore) {
        int[] claimSegment = claimSegment(tracker);
        if (claimSegment[0] == claimSegment[1]) {
            waitForUpdate(tracker, new MessageBatch(claimSegment, Collections.emptyList(), null, Position.newPosition()), () -> {
                claimSegment(tracker, positionStore);
            });
        } else {
            tracker.send(new MessageBatch(claimSegment, Collections.emptyList(), null, position(tracker, positionStore, claimSegment)));
        }
    }

    protected List<SerializedMessage> getBatch(int[] iArr, Position position, int i) {
        return this.source.getBatch(position.lowestIndexForSegment(iArr).orElse(null), i);
    }

    protected void waitForMessages(Tracker tracker, MessageBatch messageBatch, PositionStore positionStore) {
        waitForUpdate(tracker, messageBatch, () -> {
            getBatch(tracker, positionStore);
        });
    }

    protected void waitForUpdate(Tracker tracker, MessageBatch messageBatch, Runnable runnable) {
        if (tracker.hasMissedDeadline()) {
            tracker.send(messageBatch);
            return;
        }
        this.clusters.compute(tracker.getConsumerName(), (str, trackerCluster) -> {
            return ((TrackerCluster) Optional.ofNullable(trackerCluster).orElseGet(() -> {
                return new TrackerCluster(this.segments);
            })).withWaitingTracker(tracker);
        });
        Registration schedule = this.scheduler.schedule(tracker.getDeadline(), () -> {
            if (this.waitingTrackers.keySet().removeIf(tracker2 -> {
                return tracker2 == tracker;
            })) {
                this.clusters.compute(tracker.getConsumerName(), (str2, trackerCluster2) -> {
                    return (trackerCluster2 == null || !trackerCluster2.contains(tracker)) ? trackerCluster2 : trackerCluster2.withActiveTracker(tracker);
                });
                tracker.send(messageBatch);
            }
        });
        WaitingTracker remove = this.waitingTrackers.remove(tracker);
        this.waitingTrackers.put(tracker, new WaitingTracker(tracker, schedule, runnable));
        if (remove != null) {
            log.warn("Tracker replaced another waiting tracker. This should normally not happen. New tracker: {}", tracker);
            remove.tracker.send(messageBatch);
        }
    }

    protected Position position(Tracker tracker, PositionStore positionStore, int[] iArr) {
        if (tracker.clientControlledIndex()) {
            return new Position(iArr, ((Long) Optional.ofNullable(tracker.getLastTrackerIndex()).orElseGet(() -> {
                return Long.valueOf(indexFromMillis(System.currentTimeMillis() - 1000));
            })).longValue());
        }
        Position position = positionStore.position(tracker.getConsumerName());
        if (position.isNew(iArr)) {
            return new Position(iArr, tracker.getLastTrackerIndex() == null ? indexFromMillis(System.currentTimeMillis() - 1000) : tracker.getLastTrackerIndex().longValue());
        }
        return position;
    }

    protected List<SerializedMessage> filter(List<SerializedMessage> list, int[] iArr, Position position, Tracker tracker) {
        return list.stream().filter(serializedMessage -> {
            return tracker.canHandle(ensureMessageSegment(serializedMessage), iArr) && (tracker.ignoreSegment() || position.isNewMessage(serializedMessage));
        }).toList();
    }

    protected SerializedMessage ensureMessageSegment(SerializedMessage serializedMessage) {
        serializedMessage.setSegment(Integer.valueOf(serializedMessage.getSegment() == null ? ConsistentHashing.computeSegment(serializedMessage.getMessageId(), this.segments) : serializedMessage.getSegment().intValue() % this.segments));
        return serializedMessage;
    }

    protected int adjustMaxSize(Tracker tracker, int i) {
        return ((Integer) Optional.ofNullable(this.clusters.get(tracker.getConsumerName())).map(trackerCluster -> {
            return Integer.valueOf(trackerCluster.getTrackers().size() * i);
        }).orElse(Integer.valueOf(i))).intValue();
    }

    protected int[] claimSegment(Tracker tracker) {
        return this.clusters.compute(tracker.getConsumerName(), (str, trackerCluster) -> {
            return ((TrackerCluster) Optional.ofNullable(trackerCluster).orElseGet(() -> {
                return new TrackerCluster(this.segments);
            })).withActiveTracker(tracker);
        }).getSegment(tracker);
    }

    protected void onUpdate(List<SerializedMessage> list) {
        if (this.stopped) {
            return;
        }
        synchronized (this.waitingTrackers) {
            this.lastSeenIndex = ((Long) Optional.ofNullable(getLastIndex(list)).orElse(Long.valueOf(this.lastSeenIndex))).longValue();
            new ArrayList(this.waitingTrackers.values()).forEach((v0) -> {
                v0.run();
            });
        }
    }

    protected void onClusterUpdate(TrackerCluster trackerCluster) {
        if (this.stopped) {
            return;
        }
        synchronized (this.waitingTrackers) {
            this.waitingTrackers.entrySet().stream().filter(entry -> {
                return trackerCluster.contains((Tracker) entry.getKey());
            }).map((v0) -> {
                return v0.getValue();
            }).toList().forEach((v0) -> {
                v0.run();
            });
        }
    }

    @Override // io.fluxcapacitor.common.tracking.TrackingStrategy
    public void disconnectTrackers(Predicate<Tracker> predicate, boolean z) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        try {
            synchronized (this.waitingTrackers) {
                this.waitingTrackers.keySet().removeIf(tracker -> {
                    boolean test = predicate.test(tracker);
                    if (test) {
                        hashSet.add(tracker);
                    }
                    return test;
                });
                this.clusters.replaceAll((str, trackerCluster) -> {
                    TrackerCluster purgeTrackers = trackerCluster.purgeTrackers(predicate);
                    if (!Objects.equals(purgeTrackers, trackerCluster) && !purgeTrackers.isEmpty()) {
                        hashSet2.add(purgeTrackers);
                    }
                    return purgeTrackers;
                });
                this.clusters.values().removeIf((v0) -> {
                    return v0.isEmpty();
                });
            }
            hashSet2.forEach(this::onClusterUpdate);
            if (z) {
                hashSet.forEach(tracker2 -> {
                    try {
                        tracker2.send(new MessageBatch(new int[]{0, 0}, Collections.emptyList(), null, Position.newPosition()));
                    } catch (Exception e) {
                        log.error("Failed to send final empty batch to disconnecting tracker: {}", predicate, e);
                    }
                });
            }
        } catch (Throwable th) {
            if (z) {
                hashSet.forEach(tracker22 -> {
                    try {
                        tracker22.send(new MessageBatch(new int[]{0, 0}, Collections.emptyList(), null, Position.newPosition()));
                    } catch (Exception e) {
                        log.error("Failed to send final empty batch to disconnecting tracker: {}", predicate, e);
                    }
                });
            }
            throw th;
        }
    }

    protected void purgeCeasedTrackers(Duration duration) {
        this.scheduler.schedule(System.currentTimeMillis() + duration.toMillis(), () -> {
            this.clusters.replaceAll((str, trackerCluster) -> {
                TrackerCluster purgeTrackers = trackerCluster.purgeTrackers(tracker -> {
                    return tracker.getPurgeDelay() != null && trackerCluster.getProcessingDuration(tracker).filter(duration2 -> {
                        return duration2.toMillis() > tracker.getPurgeDelay().longValue();
                    }).isPresent();
                });
                if (purgeTrackers != trackerCluster) {
                    HashSet hashSet = new HashSet(trackerCluster.getTrackers());
                    hashSet.removeAll(purgeTrackers.getTrackers());
                    if (!hashSet.isEmpty()) {
                        log.warn("Purged trackers from consumer {} because they have ceased processing: {}", str, hashSet);
                        return purgeTrackers;
                    }
                }
                return trackerCluster;
            });
            purgeCeasedTrackers(duration);
        });
    }

    private Long getLastIndex(List<SerializedMessage> list) {
        if (list.isEmpty()) {
            return null;
        }
        return list.get(list.size() - 1).getIndex();
    }

    private static long indexFromMillis(long j) {
        return j << 16;
    }

    @Override // io.fluxcapacitor.common.tracking.TrackingStrategy, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.stopped = true;
        this.scheduler.shutdown();
        this.sourceRegistration.cancel();
        this.source.close();
    }
}
