package com.github.davidmoten.fsm.runtime.rx;

import com.github.davidmoten.fsm.runtime.Action3;
import com.github.davidmoten.fsm.runtime.CancelTimedSignal;
import com.github.davidmoten.fsm.runtime.Clock;
import com.github.davidmoten.fsm.runtime.EntityBehaviour;
import com.github.davidmoten.fsm.runtime.EntityState;
import com.github.davidmoten.fsm.runtime.EntityStateMachine;
import com.github.davidmoten.fsm.runtime.Event;
import com.github.davidmoten.fsm.runtime.ObjectState;
import com.github.davidmoten.fsm.runtime.Search;
import com.github.davidmoten.fsm.runtime.Signal;
import com.github.davidmoten.guavamini.Preconditions;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.GroupedFlowable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:com/github/davidmoten/fsm/runtime/rx/Processor.class */
public final class Processor<Id> {
    private final Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory;
    private final PublishSubject<Signal<?, Id>> subject;
    private final Scheduler signalScheduler;
    private final Scheduler processingScheduler;
    private final Map<ClassId<?, Id>, EntityStateMachine<?, Id>> stateMachines;
    private final Map<ClassIdPair<Id>, Disposable> subscriptions;
    private final Flowable<Signal<?, Id>> signals;
    private final Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform;
    private final FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy;
    private final Function<Consumer<ClassId<?, Id>>, Map<ClassId<?, Id>, Object>> mapFactory;
    private final Clock signallerClock;
    private final Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> preTransitionAction;
    private final Consumer<? super EntityStateMachine<?, Id>> postTransitionAction;
    private final Search<Id> search;

    /* loaded from: input_file:com/github/davidmoten/fsm/runtime/rx/Processor$Builder.class */
    public static class Builder<Id> {
        private Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory;
        private Scheduler signalScheduler;
        private Scheduler processingScheduler;
        private Flowable<Signal<?, Id>> signals;
        private Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform;
        private FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy;
        private Function<Consumer<ClassId<?, Id>>, Map<ClassId<?, Id>, Object>> mapFactory;
        private Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> preTransitionAction;
        private Consumer<? super EntityStateMachine<?, Id>> postTransitionAction;
        private final Map<Class<?>, EntityBehaviour<?, Id>> behaviours;

        private Builder() {
            this.signalScheduler = Schedulers.computation();
            this.processingScheduler = Schedulers.trampoline();
            this.signals = Flowable.empty();
            this.entityTransform = groupedFlowable -> {
                return groupedFlowable;
            };
            this.preGroupBy = flowable -> {
                return flowable;
            };
            this.preTransitionAction = (entityStateMachine, event, entityState) -> {
            };
            this.postTransitionAction = entityStateMachine2 -> {
            };
            this.behaviours = new HashMap();
        }

        public <T> Builder<Id> behaviour(Class<T> cls, EntityBehaviour<T, Id> entityBehaviour) {
            this.behaviours.put(cls, entityBehaviour);
            return this;
        }

        public Builder<Id> behaviourFactory(Function<Class<?>, EntityBehaviour<?, Id>> function) {
            this.behaviourFactory = function;
            return this;
        }

        public Builder<Id> signalScheduler(Scheduler scheduler) {
            this.signalScheduler = scheduler;
            return this;
        }

        public Builder<Id> processingScheduler(Scheduler scheduler) {
            this.processingScheduler = scheduler;
            return this;
        }

        public Builder<Id> signals(Flowable<Signal<?, Id>> flowable) {
            this.signals = flowable;
            return this;
        }

        public Builder<Id> entityTransform(Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> function) {
            this.entityTransform = function;
            return this;
        }

        public Builder<Id> preGroupBy(FlowableTransformer<Signal<?, Id>, Signal<?, Id>> flowableTransformer) {
            this.preGroupBy = flowableTransformer;
            return this;
        }

        public Builder<Id> mapFactory(Function<Consumer<ClassId<?, Id>>, Map<ClassId<?, Id>, Object>> function) {
            this.mapFactory = function;
            return this;
        }

        public Builder<Id> preTransition(Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> action3) {
            this.preTransitionAction = action3;
            return this;
        }

        public Builder<Id> postTransition(Consumer<? super EntityStateMachine<?, Id>> consumer) {
            this.postTransitionAction = consumer;
            return this;
        }

        public Processor<Id> build() {
            Preconditions.checkArgument((this.behaviourFactory == null && this.behaviours.isEmpty()) ? false : true, "one of behaviourFactory or multiple calls to behaviour must be made (behaviour must be specified)");
            Preconditions.checkArgument(this.behaviourFactory == null || this.behaviours.isEmpty(), "cannot specify both behaviourFactory and behaviour");
            if (!this.behaviours.isEmpty()) {
                this.behaviourFactory = cls -> {
                    return this.behaviours.get(cls);
                };
            }
            return new Processor<>(this.behaviourFactory, this.processingScheduler, this.signalScheduler, this.signals, this.entityTransform, this.preGroupBy, this.mapFactory, this.preTransitionAction, this.postTransitionAction);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/fsm/runtime/rx/Processor$Signals.class */
    public static final class Signals<Id> {
        final Deque<Event<?>> signalsToSelf;
        final Deque<Signal<?, Id>> signalsToOther;

        private Signals() {
            this.signalsToSelf = new ArrayDeque();
            this.signalsToOther = new ArrayDeque();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/fsm/runtime/rx/Processor$TransitionHandler.class */
    public final class TransitionHandler implements Callable<Signals<Id>>, BiConsumer<Signals<Id>, Emitter<EntityStateMachine<?, Id>>> {
        private final Event<?> event;
        private final ClassId<?, Id> classId;
        private final Scheduler.Worker worker;
        EntityStateMachine<?, Id> machine;

        TransitionHandler(ClassId<?, Id> classId, Event<?> event, Scheduler.Worker worker, EntityStateMachine<?, Id> entityStateMachine) {
            this.classId = classId;
            this.event = event;
            this.worker = worker;
            this.machine = entityStateMachine;
        }

        @Override // java.util.concurrent.Callable
        public Signals<Id> call() throws Exception {
            Signals<Id> signals = new Signals<>();
            signals.signalsToSelf.offerFirst(this.event);
            return signals;
        }

        public void accept(Signals<Id> signals, Emitter<EntityStateMachine<?, Id>> emitter) throws Exception {
            Event<?> pollLast = signals.signalsToSelf.pollLast();
            if (pollLast != null) {
                applySignalToSelf(signals, emitter, pollLast);
            } else {
                applySignalsToOthers(this.classId, this.worker, signals);
                emitter.onComplete();
            }
        }

        private <T> void applySignalToSelf(Signals<Id> signals, Emitter<? super EntityStateMachine<?, Id>> emitter, Event<T> event) throws Exception {
            this.machine = this.machine.signal(event);
            Processor.this.postTransitionAction.accept(this.machine);
            emitter.onNext(this.machine);
            List<Event<? super Object>> signalsToSelf = this.machine.signalsToSelf();
            for (int size = signalsToSelf.size() - 1; size >= 0; size--) {
                signals.signalsToSelf.offerLast(signalsToSelf.get(size));
            }
            Iterator<Signal<?, ?>> it = this.machine.signalsToOther().iterator();
            while (it.hasNext()) {
                signals.signalsToOther.offerFirst(it.next());
            }
        }

        private void applySignalsToOthers(ClassId<?, Id> classId, Scheduler.Worker worker, Signals<Id> signals) {
            while (true) {
                Signal<?, Id> pollLast = signals.signalsToOther.pollLast();
                if (pollLast == null) {
                    return;
                }
                if (pollLast.isImmediate()) {
                    Processor.this.subject.onNext(pollLast);
                } else if (pollLast.event() instanceof CancelTimedSignal) {
                    cancel(pollLast);
                } else {
                    long longValue = pollLast.time().get().longValue() - worker.now(TimeUnit.MILLISECONDS);
                    if (longValue <= 0) {
                        Processor.this.subject.onNext(pollLast);
                    } else {
                        scheduleSignal(classId, worker, pollLast, pollLast, longValue);
                    }
                }
            }
        }

        private void cancel(Signal<?, Id> signal) {
            CancelTimedSignal cancelTimedSignal = (CancelTimedSignal) signal.event();
            Disposable disposable = (Disposable) Processor.this.subscriptions.remove(new ClassIdPair(new ClassId(cancelTimedSignal.fromClass(), cancelTimedSignal.fromId()), new ClassId(signal.cls(), signal.id())));
            if (disposable != null) {
                disposable.dispose();
            }
        }

        private void scheduleSignal(ClassId<?, Id> classId, Scheduler.Worker worker, Signal<?, Id> signal, Signal<?, Id> signal2, long j) {
            ClassIdPair classIdPair = new ClassIdPair(classId, new ClassId(signal.cls(), signal.id()));
            long now = Processor.this.signalScheduler.now(TimeUnit.MILLISECONDS);
            Disposable schedule = worker.schedule(() -> {
                Processor.this.subject.onNext(signal2.now());
            }, j, TimeUnit.MILLISECONDS);
            worker.schedule(() -> {
                Processor.this.subscriptions.remove(classIdPair);
            }, j - (Processor.this.signalScheduler.now(TimeUnit.MILLISECONDS) - now), TimeUnit.MILLISECONDS);
            Disposable disposable = (Disposable) Processor.this.subscriptions.put(classIdPair, schedule);
            if (disposable != null) {
                disposable.dispose();
            }
        }
    }

    private Processor(Function<Class<?>, EntityBehaviour<?, Id>> function, Scheduler scheduler, Scheduler scheduler2, Flowable<Signal<?, Id>> flowable, Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> function2, FlowableTransformer<Signal<?, Id>, Signal<?, Id>> flowableTransformer, Function<Consumer<ClassId<?, Id>>, Map<ClassId<?, Id>, Object>> function3, Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> action3, Consumer<? super EntityStateMachine<?, Id>> consumer) {
        this.stateMachines = new ConcurrentHashMap();
        this.subscriptions = new ConcurrentHashMap();
        this.search = new Search<Id>() { // from class: com.github.davidmoten.fsm.runtime.rx.Processor.1
            @Override // com.github.davidmoten.fsm.runtime.Search
            public <T> Optional<T> search(Class<T> cls, Id id) {
                return Processor.this.getStateMachine(cls, id).get();
            }
        };
        Preconditions.checkNotNull(function);
        Preconditions.checkNotNull(scheduler2);
        Preconditions.checkNotNull(flowable);
        Preconditions.checkNotNull(function2);
        Preconditions.checkNotNull(flowableTransformer);
        Preconditions.checkNotNull(action3);
        Preconditions.checkNotNull(consumer);
        this.behaviourFactory = function;
        this.signalScheduler = scheduler2;
        this.processingScheduler = scheduler;
        this.subject = PublishSubject.create();
        this.signals = flowable;
        this.entityTransform = function2;
        this.preGroupBy = flowableTransformer;
        this.mapFactory = function3;
        this.signallerClock = Clock.from(scheduler2);
        this.preTransitionAction = action3;
        this.postTransitionAction = consumer;
    }

    public static <Id> Builder<Id> behaviourFactory(Function<Class<?>, EntityBehaviour<?, Id>> function) {
        return new Builder().behaviourFactory(function);
    }

    public static <T, Id> Builder<Id> behaviour(Class<T> cls, EntityBehaviour<T, Id> entityBehaviour) {
        return new Builder().behaviour(cls, entityBehaviour);
    }

    public static <Id> Builder<Id> signalScheduler(Scheduler scheduler) {
        return new Builder().signalScheduler(scheduler);
    }

    public static <Id> Builder<Id> processingScheduler(Scheduler scheduler) {
        return new Builder().processingScheduler(scheduler);
    }

    public Flowable<EntityStateMachine<?, Id>> flowable() {
        return Flowable.defer(() -> {
            Scheduler.Worker createWorker = this.signalScheduler.createWorker();
            Flowable compose = this.subject.toSerialized().toFlowable(BackpressureStrategy.BUFFER).mergeWith(this.signals).doOnCancel(() -> {
                createWorker.dispose();
            }).compose(this.preGroupBy);
            if (this.mapFactory != null) {
                throw new UnsupportedOperationException("cannot use mapFactory in RxJava2, author will need to get API supplemented in RxJava2");
            }
            return compose.groupBy(signal -> {
                return new ClassId(signal.cls(), signal.id());
            }, Functions.identity()).flatMap(groupedFlowable -> {
                return (Flowable) this.entityTransform.apply(grouped(groupedFlowable.getKey(), groupedFlowable.flatMap(processSignalsToSelfAndSendSignalsToOthers(createWorker, (ClassId) groupedFlowable.getKey())).doOnNext(entityStateMachine -> {
                }).subscribeOn(this.processingScheduler)));
            });
        });
    }

    private static <K, T> GroupedFlowable<K, T> grouped(K k, final Flowable<T> flowable) {
        return new GroupedFlowable<K, T>(k) { // from class: com.github.davidmoten.fsm.runtime.rx.Processor.2
            protected void subscribeActual(Subscriber<? super T> subscriber) {
                flowable.subscribe(subscriber);
            }
        };
    }

    private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(Scheduler.Worker worker, ClassId<?, Id> classId) {
        return signal -> {
            return process(classId, signal.event(), worker).toList().toFlowable().flatMapIterable(Functions.identity());
        };
    }

    private Flowable<EntityStateMachine<?, Id>> process(ClassId<?, Id> classId, Event<?> event, Scheduler.Worker worker) {
        TransitionHandler transitionHandler = new TransitionHandler(classId, event, worker, getStateMachine(classId.cls(), classId.id()));
        return Flowable.generate(transitionHandler, transitionHandler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> EntityStateMachine<T, Id> getStateMachine(Class<T> cls, Id id) {
        return this.stateMachines.computeIfAbsent(new ClassId<>(cls, id), classId -> {
            try {
                return ((EntityBehaviour) this.behaviourFactory.apply(cls)).create(id).withSearch(this.search).withClock(this.signallerClock).withPreTransition(this.preTransitionAction);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public <T> Optional<T> getObject(Class<T> cls, Id id) {
        try {
            return getStateMachine(cls, id).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void signal(Signal<?, Id> signal) {
        this.subject.onNext(signal);
    }

    public <T> void signal(Class<T> cls, Id id, Event<? super T> event) {
        this.subject.onNext(Signal.create(cls, id, event));
    }

    public <T> void signal(ClassId<T, Id> classId, Event<? super T> event) {
        signal(classId.cls(), classId.id(), event);
    }

    public <T> ObjectState<T> get(Class<T> cls, Id id) {
        return this.stateMachines.get(new ClassId(cls, id));
    }

    public void onCompleted() {
        this.subject.onComplete();
    }

    public void cancelSignal(Class<?> cls, Id id, Class<?> cls2, Id id2) {
        Disposable remove = this.subscriptions.remove(new ClassIdPair(new ClassId(cls, id), new ClassId(cls2, id2)));
        if (remove != null) {
            remove.dispose();
        }
    }

    public void cancelSignalToSelf(Class<?> cls, Id id) {
        cancelSignal(cls, id, cls, id);
    }

    public void cancelSignalToSelf(ClassId<?, Id> classId) {
        cancelSignalToSelf(classId.cls(), classId.id());
    }
}
