package org.factcast.factus;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.core.Fact;
import org.factcast.core.FactCast;
import org.factcast.core.FactStreamPosition;
import org.factcast.core.spec.FactSpec;
import org.factcast.core.store.FactStore;
import org.factcast.core.subscription.Subscription;
import org.factcast.core.subscription.SubscriptionRequest;
import org.factcast.factus.batch.DefaultPublishBatch;
import org.factcast.factus.batch.PublishBatch;
import org.factcast.factus.event.EventConverter;
import org.factcast.factus.event.EventObject;
import org.factcast.factus.lock.InLockedOperation;
import org.factcast.factus.lock.Locked;
import org.factcast.factus.lock.LockedOnSpecs;
import org.factcast.factus.metrics.FactusMetrics;
import org.factcast.factus.metrics.TagKeys;
import org.factcast.factus.metrics.TimedOperation;
import org.factcast.factus.projection.Aggregate;
import org.factcast.factus.projection.AggregateUtil;
import org.factcast.factus.projection.FactStreamPositionAware;
import org.factcast.factus.projection.ManagedProjection;
import org.factcast.factus.projection.Projection;
import org.factcast.factus.projection.SnapshotProjection;
import org.factcast.factus.projection.SubscribedProjection;
import org.factcast.factus.projection.WriterToken;
import org.factcast.factus.projector.Projector;
import org.factcast.factus.projector.ProjectorFactory;
import org.factcast.factus.snapshot.AggregateRepository;
import org.factcast.factus.snapshot.ProjectionAndState;
import org.factcast.factus.snapshot.SnapshotRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/factcast/factus/FactusImpl.class */
public class FactusImpl implements Factus {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FactusImpl.class);
    public static final int PROGRESS_INTERVAL = 10000;
    private final FactCast fc;
    private final ProjectorFactory ehFactory;
    private final EventConverter eventConverter;
    private final AggregateRepository aggregateSnapshotRepository;
    private final SnapshotRepository projectionSnapshotRepository;
    private final FactusMetrics factusMetrics;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Set<AutoCloseable> managedObjects = Collections.synchronizedSet(new LinkedHashSet());

    /* loaded from: input_file:org/factcast/factus/FactusImpl$IntervalSnapshotter.class */
    static abstract class IntervalSnapshotter<P extends SnapshotProjection> implements BiConsumer<P, UUID> {
        private final Duration duration;
        private Instant nextSnapshot;

        IntervalSnapshotter(Duration duration) {
            this.duration = duration;
            this.nextSnapshot = Instant.now().plus((TemporalAmount) duration);
        }

        @Override // java.util.function.BiConsumer
        public void accept(P p, UUID uuid) {
            Instant now = Instant.now();
            if (now.isAfter(this.nextSnapshot)) {
                this.nextSnapshot = now.plus((TemporalAmount) this.duration);
                createSnapshot(p, uuid);
            }
        }

        abstract void createSnapshot(P p, UUID uuid);
    }

    @Override // org.factcast.factus.Factus
    @NonNull
    public PublishBatch batch() {
        return new DefaultPublishBatch(this.fc, this.eventConverter);
    }

    @Override // org.factcast.factus.Factus
    public <T> T publish(@NonNull EventObject eventObject, @NonNull Function<Fact, T> function) {
        Objects.requireNonNull(eventObject, "e is marked non-null but is null");
        Objects.requireNonNull(function, "resultFn is marked non-null but is null");
        assertNotClosed();
        InLockedOperation.assertNotInLockedOperation();
        Fact fact = this.eventConverter.toFact(eventObject);
        this.fc.publish(fact);
        return function.apply(fact);
    }

    private void assertNotClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException("Already closed.");
        }
    }

    @Override // org.factcast.factus.Factus, org.factcast.factus.SimplePublisher
    public void publish(@NonNull List<EventObject> list) {
        Objects.requireNonNull(list, "eventPojos is marked non-null but is null");
        publish(list, list2 -> {
            return null;
        });
    }

    @Override // org.factcast.factus.SimplePublisher
    public void publish(@NonNull Fact fact) {
        Objects.requireNonNull(fact, "f is marked non-null but is null");
        assertNotClosed();
        InLockedOperation.assertNotInLockedOperation();
        this.fc.publish(fact);
    }

    @Override // org.factcast.factus.Factus
    public <T> T publish(@NonNull List<EventObject> list, @NonNull Function<List<Fact>, T> function) {
        Objects.requireNonNull(list, "e is marked non-null but is null");
        Objects.requireNonNull(function, "resultFn is marked non-null but is null");
        assertNotClosed();
        InLockedOperation.assertNotInLockedOperation();
        Stream<EventObject> stream = list.stream();
        EventConverter eventConverter = this.eventConverter;
        Objects.requireNonNull(eventConverter);
        List<Fact> list2 = (List) stream.map(eventConverter::toFact).collect(Collectors.toList());
        this.fc.publish(list2);
        return function.apply(list2);
    }

    @Override // org.factcast.factus.ProjectionAccessor
    public <P extends ManagedProjection> void update(@NonNull P p, @NonNull Duration duration) {
        Objects.requireNonNull(p, "managedProjection is marked non-null but is null");
        Objects.requireNonNull(duration, "maxWaitTime is marked non-null but is null");
        assertNotClosed();
        log.trace("updating managed projection {}", p.getClass());
        this.factusMetrics.timed(TimedOperation.MANAGED_PROJECTION_UPDATE_DURATION, Tags.of(new Tag[]{Tag.of(TagKeys.CLASS, p.getClass().getName())}), () -> {
            p.withLock(() -> {
                catchupProjection((FactusImpl) p, p.factStreamPosition(), (BiConsumer<FactusImpl, UUID>) null);
            });
        });
    }

    @Override // org.factcast.factus.Factus
    public <P extends SubscribedProjection> Subscription subscribeAndBlock(@NonNull P p) {
        Objects.requireNonNull(p, "subscribedProjection is marked non-null but is null");
        return subscribeAndBlock(p, Duration.ofMinutes(5L));
    }

    @Override // org.factcast.factus.Factus
    public <P extends SubscribedProjection> Subscription subscribeAndBlock(@NonNull P p, @NonNull Duration duration) {
        Objects.requireNonNull(p, "subscribedProjection is marked non-null but is null");
        Objects.requireNonNull(duration, "retryWaitTime is marked non-null but is null");
        assertNotClosed();
        InLockedOperation.assertNotInLockedOperation();
        while (!this.closed.get()) {
            WriterToken acquireWriteToken = p.acquireWriteToken(duration);
            if (acquireWriteToken != null) {
                log.info("Acquired writer token for {}", p.getClass());
                Subscription doSubscribe = doSubscribe(p, acquireWriteToken);
                doSubscribe.onClose(() -> {
                    tryClose(acquireWriteToken);
                });
                this.managedObjects.add(() -> {
                    tryClose(doSubscribe);
                });
                return doSubscribe;
            }
            log.trace("failed to acquire writer token for {}. Will keep trying.", p.getClass());
        }
        throw new FactusClosedException("Factus is closed. Halting attempts to acquire a writer token.");
    }

    private <P extends SubscribedProjection> Subscription doSubscribe(@NonNull final P p, @NonNull final WriterToken writerToken) {
        Objects.requireNonNull(p, "subscribedProjection is marked non-null but is null");
        Objects.requireNonNull(writerToken, "token is marked non-null but is null");
        final Projector create = this.ehFactory.create(p);
        return this.fc.subscribe(SubscriptionRequest.follow(create.createFactSpecs()).fromNullable((UUID) Optional.ofNullable(p.factStreamPosition()).map((v0) -> {
            return v0.factId();
        }).orElse(null)), new AbstractFactObserver(p, 10000L, this.factusMetrics) { // from class: org.factcast.factus.FactusImpl.1
            private FactStreamPosition lastPositionApplied = null;

            @Override // org.factcast.factus.AbstractFactObserver
            public void onNextFacts(@NonNull List<Fact> list) {
                Objects.requireNonNull(list, "elements is marked non-null but is null");
                assertTokenIsValid();
                create.apply(list);
                this.lastPositionApplied = FactStreamPosition.from((Fact) Iterables.getLast(list));
            }

            private void assertTokenIsValid() {
                if (!writerToken.isValid()) {
                    throw new IllegalStateException("WriterToken is no longer valid.");
                }
            }

            @Override // org.factcast.factus.AbstractFactObserver
            public void onCatchupSignal() {
                create.onCatchup(this.lastPositionApplied);
                p.onCatchup();
            }

            public void onComplete() {
                p.onComplete();
            }

            public void onError(@NonNull Throwable th) {
                Objects.requireNonNull(th, "exception is marked non-null but is null");
                p.onError(th);
            }

            public void onFastForward(@NonNull FactStreamPosition factStreamPosition) {
                Objects.requireNonNull(factStreamPosition, "factIdToFfwdTo is marked non-null but is null");
                if (factStreamPosition.isAfter(this.lastPositionApplied)) {
                    p.factStreamPosition(factStreamPosition);
                }
            }
        });
    }

    @Override // org.factcast.factus.ProjectionAccessor
    @NonNull
    public <P extends SnapshotProjection> P fetch(Class<P> cls) {
        return (P) this.factusMetrics.timed(TimedOperation.FETCH_DURATION, Tags.of(new Tag[]{Tag.of(TagKeys.CLASS, cls.getName())}), () -> {
            return dofetch(cls);
        });
    }

    private <P extends SnapshotProjection> P dofetch(Class<P> cls) {
        assertNotClosed();
        if (Aggregate.class.isAssignableFrom(cls)) {
            throw new IllegalArgumentException("Method confusion: UUID aggregateId is missing as a second parameter for aggregates");
        }
        ProjectionAndState projectionAndState = (ProjectionAndState) this.projectionSnapshotRepository.findLatest(cls).orElseGet(() -> {
            return ProjectionAndState.of(instantiate(cls), null);
        });
        P p = (P) projectionAndState.projectionInstance();
        UUID catchupProjection = catchupProjection((FactusImpl) p, projectionAndState.lastFactIdApplied(), (BiConsumer<FactusImpl, UUID>) new IntervalSnapshotter<SnapshotProjection>(Duration.ofSeconds(30L)) { // from class: org.factcast.factus.FactusImpl.2
            @Override // org.factcast.factus.FactusImpl.IntervalSnapshotter
            void createSnapshot(SnapshotProjection snapshotProjection, UUID uuid) {
                FactusImpl.this.projectionSnapshotRepository.store(snapshotProjection, uuid);
            }
        });
        if (catchupProjection != null) {
            this.projectionSnapshotRepository.store(p, catchupProjection);
        }
        return p;
    }

    @Override // org.factcast.factus.ProjectionAccessor
    @NonNull
    public <A extends Aggregate> Optional<A> find(@NonNull Class<A> cls, @NonNull UUID uuid) {
        Objects.requireNonNull(cls, "aggregateClass is marked non-null but is null");
        Objects.requireNonNull(uuid, "aggregateId is marked non-null but is null");
        return (Optional) this.factusMetrics.timed(TimedOperation.FIND_DURATION, Tags.of(new Tag[]{Tag.of(TagKeys.CLASS, cls.getName())}), () -> {
            return doFind(cls, uuid);
        });
    }

    private <A extends Aggregate> Optional<A> doFind(Class<A> cls, UUID uuid) {
        assertNotClosed();
        ProjectionAndState projectionAndState = (ProjectionAndState) this.aggregateSnapshotRepository.findLatest(cls, uuid).orElse(ProjectionAndState.of(initial(cls, uuid), null));
        final Aggregate aggregate = (Aggregate) projectionAndState.projectionInstance();
        UUID catchupProjection = catchupProjection((FactusImpl) aggregate, projectionAndState.lastFactIdApplied(), (BiConsumer<FactusImpl, UUID>) new IntervalSnapshotter<Aggregate>(Duration.ofSeconds(30L)) { // from class: org.factcast.factus.FactusImpl.3
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.factcast.factus.FactusImpl.IntervalSnapshotter
            public void createSnapshot(Aggregate aggregate2, UUID uuid2) {
                FactusImpl.this.aggregateSnapshotRepository.store((AggregateRepository) aggregate, uuid2);
            }
        });
        if (catchupProjection != null) {
            this.aggregateSnapshotRepository.store((AggregateRepository) aggregate, catchupProjection);
        } else if (projectionAndState.lastFactIdApplied() == null) {
            return Optional.empty();
        }
        return Optional.of(aggregate);
    }

    private <P extends Projection> void catchupProjection(@NonNull P p, FactStreamPosition factStreamPosition, @Nullable BiConsumer<P, UUID> biConsumer) {
        Objects.requireNonNull(p, "projection is marked non-null but is null");
        catchupProjection((FactusImpl) p, (UUID) Optional.ofNullable(factStreamPosition).map((v0) -> {
            return v0.factId();
        }).orElse(null), (BiConsumer<FactusImpl, UUID>) biConsumer);
    }

    @VisibleForTesting
    protected <P extends Projection> UUID catchupProjection(@NonNull final P p, @Nullable final UUID uuid, @Nullable final BiConsumer<P, UUID> biConsumer) {
        Objects.requireNonNull(p, "projection is marked non-null but is null");
        final Projector create = this.ehFactory.create(p);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference();
        AbstractFactObserver abstractFactObserver = new AbstractFactObserver(p, 10000L, this.factusMetrics) { // from class: org.factcast.factus.FactusImpl.4
            @Override // org.factcast.factus.AbstractFactObserver
            public void onNextFacts(@NonNull List<Fact> list) {
                Objects.requireNonNull(list, "elements is marked non-null but is null");
                create.apply(list);
                FactStreamPosition from = FactStreamPosition.from((Fact) Iterables.getLast(list));
                atomicReference.set(from);
                if (biConsumer != null) {
                    biConsumer.accept(p, from.factId());
                }
                atomicInteger.incrementAndGet();
            }

            public void onComplete() {
                flush();
                p.onComplete();
            }

            @Override // org.factcast.factus.AbstractFactObserver
            public void onCatchupSignal() {
                flush();
                create.onCatchup((FactStreamPosition) atomicReference.get());
                p.onCatchup();
            }

            public void onError(@NonNull Throwable th) {
                Objects.requireNonNull(th, "exception is marked non-null but is null");
                flush();
                p.onError(th);
            }

            public void onFastForward(@NonNull FactStreamPosition factStreamPosition) {
                Objects.requireNonNull(factStreamPosition, "factIdToFfwdTo is marked non-null but is null");
                flush();
                FactStreamPosition factStreamPosition2 = (FactStreamPosition) atomicReference.get();
                if (factStreamPosition.isAfter(factStreamPosition2)) {
                    if (p instanceof FactStreamPositionAware) {
                        ((FactStreamPositionAware) p).factStreamPosition(factStreamPosition);
                    }
                    if (uuid == null && factStreamPosition2 == null) {
                        return;
                    }
                    atomicReference.set(factStreamPosition);
                }
            }
        };
        List<FactSpec> createFactSpecs = create.createFactSpecs();
        synchronized (p) {
            this.fc.subscribe(SubscriptionRequest.catchup(createFactSpecs).fromNullable(uuid), abstractFactObserver).awaitComplete();
        }
        return (UUID) Optional.ofNullable((FactStreamPosition) atomicReference.get()).map((v0) -> {
            return v0.factId();
        }).orElse(null);
    }

    @VisibleForTesting
    protected <A extends Aggregate> A initial(Class<A> cls, UUID uuid) {
        log.trace("Creating initial aggregate version for {} with id {}", cls.getSimpleName(), uuid);
        A a = (A) instantiate(cls);
        AggregateUtil.aggregateId(a, uuid);
        return a;
    }

    @NonNull
    private <P extends SnapshotProjection> P instantiate(Class<P> cls) {
        log.trace("Creating initial projection version for {}", cls);
        Constructor<P> declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
        declaredConstructor.setAccessible(true);
        return declaredConstructor.newInstance(new Object[0]);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.debug("factus is being closed");
        if (this.closed.getAndSet(true)) {
            log.warn("close is being called more than once!?");
            return;
        }
        Iterator it = new ArrayList(this.managedObjects).iterator();
        while (it.hasNext()) {
            AutoCloseable autoCloseable = (AutoCloseable) it.next();
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Exception e) {
                    log.warn("While closing {} of type {}:", new Object[]{autoCloseable, autoCloseable.getClass().getName(), e});
                }
            }
        }
    }

    @Override // org.factcast.factus.Factus
    public Fact toFact(@NonNull EventObject eventObject) {
        Objects.requireNonNull(eventObject, "e is marked non-null but is null");
        return this.eventConverter.toFact(eventObject);
    }

    @Override // org.factcast.factus.Factus
    public <M extends ManagedProjection> Locked<M> withLockOn(@NonNull M m) {
        Objects.requireNonNull(m, "managedProjection is marked non-null but is null");
        return new Locked<>(this.fc, this, m, this.ehFactory.create(m).createFactSpecs(), this.factusMetrics);
    }

    @Override // org.factcast.factus.Factus
    public <A extends Aggregate> Locked<A> withLockOn(@NonNull Class<A> cls, UUID uuid) {
        Objects.requireNonNull(cls, "aggregateClass is marked non-null but is null");
        A orElseGet = find(cls, uuid).orElseGet(() -> {
            return (Aggregate) instantiate(cls);
        });
        return new Locked<>(this.fc, this, orElseGet, this.ehFactory.create(orElseGet).createFactSpecs(), this.factusMetrics);
    }

    @Override // org.factcast.factus.Factus
    public <P extends SnapshotProjection> Locked<P> withLockOn(@NonNull Class<P> cls) {
        Objects.requireNonNull(cls, "projectionClass is marked non-null but is null");
        SnapshotProjection fetch = fetch(cls);
        return new Locked<>(this.fc, this, fetch, this.ehFactory.create(fetch).createFactSpecs(), this.factusMetrics);
    }

    @Override // org.factcast.factus.Factus
    public LockedOnSpecs withLockOn(@NonNull FactSpec factSpec, @NonNull FactSpec... factSpecArr) {
        Objects.requireNonNull(factSpec, "spec is marked non-null but is null");
        Objects.requireNonNull(factSpecArr, "additional is marked non-null but is null");
        LinkedList linkedList = new LinkedList();
        linkedList.add(factSpec);
        if (factSpecArr != null) {
            linkedList.addAll(Arrays.asList(factSpecArr));
        }
        return withLockOn(linkedList);
    }

    @Override // org.factcast.factus.Factus
    public LockedOnSpecs withLockOn(@NonNull List<FactSpec> list) {
        Objects.requireNonNull(list, "specs is marked non-null but is null");
        Preconditions.checkArgument(!list.isEmpty(), "Argument specs must not be empty");
        return new LockedOnSpecs(this.fc, this, list, this.factusMetrics);
    }

    @Override // org.factcast.factus.Factus
    public OptionalLong serialOf(@NonNull UUID uuid) {
        Objects.requireNonNull(uuid, "factId is marked non-null but is null");
        return this.fc.serialOf(uuid);
    }

    @Override // org.factcast.factus.Factus
    @NonNull
    public FactStore store() {
        return this.fc.store();
    }

    private void tryClose(AutoCloseable autoCloseable) {
        try {
            log.trace("Closing AutoCloseable for class {}", autoCloseable.getClass());
            autoCloseable.close();
        } catch (Exception e) {
            log.warn("Error while closing AutoCloseable for {}", autoCloseable.getClass(), e);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public FactusImpl(FactCast factCast, ProjectorFactory projectorFactory, EventConverter eventConverter, AggregateRepository aggregateRepository, SnapshotRepository snapshotRepository, FactusMetrics factusMetrics) {
        this.fc = factCast;
        this.ehFactory = projectorFactory;
        this.eventConverter = eventConverter;
        this.aggregateSnapshotRepository = aggregateRepository;
        this.projectionSnapshotRepository = snapshotRepository;
        this.factusMetrics = factusMetrics;
    }
}
