package net.pincette.rs.streams;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import net.pincette.rs.Chain;
import net.pincette.rs.Fanout;
import net.pincette.rs.PassThrough;
import net.pincette.rs.Util;
import net.pincette.util.Pair;

/* loaded from: input_file:net/pincette/rs/streams/Streams.class */
public class Streams<K, V, T, U> {
    private static final Duration DEFAULT_GRACE_PERIOD = Duration.ofSeconds(3);
    private static final String FROM_ERROR = "No preceding call to from.";
    private static final String TOPIC_SINK_ERROR = "No topic sink.";
    private static final String TOPIC_SOURCE_ERROR = "No topic source.";
    private final Function<Set<String>, TopicSink<K, V, U>> topicSink;
    private final Function<Set<String>, TopicSource<K, V, T>> topicSource;
    private final List<Pair<String, Flow.Processor<Message<K, V>, Message<K, V>>>> topicConsumers = new ArrayList();
    private final List<Pair<String, Flow.Publisher<Message<K, V>>>> topicProducers = new ArrayList();
    private Duration gracePeriod = DEFAULT_GRACE_PERIOD;
    private Flow.Publisher<Message<K, V>> fromPublisher;
    private Consumer<Throwable> onError;
    private TopicSink<K, V, U> sink;
    private TopicSource<K, V, T> source;

    protected Streams(Function<Set<String>, TopicSource<K, V, T>> function, Function<Set<String>, TopicSink<K, V, U>> function2) {
        this.topicSource = function;
        this.topicSink = function2;
    }

    public static <K, V, T, U> Streams<K, V, T, U> streams(Function<Set<String>, TopicSource<K, V, T>> function, Function<Set<String>, TopicSink<K, V, U>> function2) {
        return new Streams<>(function, function2);
    }

    private static <T> Set<String> topics(List<Pair<String, T>> list) {
        return (Set) list.stream().map(pair -> {
            return (String) pair.first;
        }).collect(Collectors.toSet());
    }

    private static <K, V, T> Flow.Publisher<Message<K, V>> connect(Flow.Publisher<T> publisher, String str, TopicSource<K, V, T> topicSource) {
        Flow.Processor<T, Message<K, V>> connect = topicSource.connect(str);
        publisher.subscribe(connect);
        return connect;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V, T> Flow.Publisher<T> connect(Flow.Publisher<Message<K, V>> publisher, String str, TopicSink<K, V, T> topicSink) {
        Flow.Processor<Message<K, V>, T> connect = topicSink.connect(str);
        publisher.subscribe(connect);
        return connect;
    }

    private Flow.Subscriber<Message<K, V>> connectTopicConsumers(String str) {
        List<Flow.Subscriber<Message<K, V>>> list = topicConsumers(str);
        return list.size() > 1 ? Fanout.of(list) : list.get(0);
    }

    public Streams<K, V, T, U> consume(String str, Consumer<Flow.Publisher<Message<K, V>>> consumer) {
        consumer.accept(from(str));
        return this;
    }

    private TopicSink<K, V, U> createTopicSink() {
        if (this.topicSink == null) {
            return null;
        }
        Set<String> set = topics(this.topicProducers);
        if (set.isEmpty()) {
            return null;
        }
        TopicSink<K, V, U> apply = this.topicSink.apply(set);
        this.topicProducers.stream().map(pair -> {
            return connect((Flow.Publisher) pair.second, (String) pair.first, apply);
        }).forEach(publisher -> {
            publisher.subscribe(sinkSubscriber(apply));
        });
        return apply;
    }

    private TopicSource<K, V, T> createTopicSource() {
        if (this.topicSource == null) {
            return null;
        }
        Set<String> set = topics(this.topicConsumers);
        if (set.isEmpty()) {
            return null;
        }
        TopicSource<K, V, T> apply = this.topicSource.apply(set);
        apply.publishers().forEach((str, publisher) -> {
            connect(publisher, str, apply).subscribe(connectTopicConsumers(str));
        });
        return apply;
    }

    public Flow.Publisher<Message<K, V>> from(String str) {
        if (this.topicSource == null) {
            throw new IllegalArgumentException(TOPIC_SOURCE_ERROR);
        }
        Flow.Processor passThrough = PassThrough.passThrough();
        this.topicConsumers.add(Pair.pair(str, passThrough));
        return passThrough;
    }

    public Streams<K, V, T, U> from(String str, UnaryOperator<Flow.Publisher<Message<K, V>>> unaryOperator) {
        if (this.topicSource == null) {
            throw new IllegalArgumentException(TOPIC_SOURCE_ERROR);
        }
        this.fromPublisher = (Flow.Publisher) unaryOperator.apply(from(str));
        return this;
    }

    public Streams<K, V, T, U> from(String str, Flow.Processor<Message<K, V>, Message<K, V>> processor) {
        if (this.topicSource == null) {
            throw new IllegalArgumentException(TOPIC_SOURCE_ERROR);
        }
        this.fromPublisher = Chain.with(from(str)).map(processor).get();
        return this;
    }

    public Streams<K, V, T, U> onError(Consumer<Throwable> consumer) {
        this.onError = consumer;
        return this;
    }

    public Streams<K, V, T, U> process(UnaryOperator<Flow.Publisher<Message<K, V>>> unaryOperator) {
        if (this.fromPublisher == null) {
            throw new IllegalArgumentException(FROM_ERROR);
        }
        this.fromPublisher = (Flow.Publisher) unaryOperator.apply(this.fromPublisher);
        return this;
    }

    public Streams<K, V, T, U> process(Flow.Processor<Message<K, V>, Message<K, V>> processor) {
        if (this.fromPublisher == null) {
            throw new IllegalArgumentException(FROM_ERROR);
        }
        this.fromPublisher = Chain.with(this.fromPublisher).map(processor).get();
        return this;
    }

    private Flow.Subscriber<U> sinkSubscriber(TopicSink<K, V, U> topicSink) {
        if (this.onError == null) {
            return topicSink.subscriber();
        }
        Consumer<Throwable> consumer = this.onError;
        Objects.requireNonNull(consumer);
        return Util.subscribe(Util.onErrorProcessor((v1) -> {
            r0.accept(v1);
        }), topicSink.subscriber());
    }

    public void start() {
        this.sink = createTopicSink();
        this.source = createTopicSource();
        if (this.source != null) {
            this.source.start();
            stopSink(this.gracePeriod);
        }
    }

    public void stop() {
        stop(DEFAULT_GRACE_PERIOD);
    }

    public void stop(Duration duration) {
        this.gracePeriod = (duration == null || duration.isNegative()) ? DEFAULT_GRACE_PERIOD : duration;
        if (this.source != null) {
            this.source.stop();
        } else {
            stopSink(duration);
        }
    }

    private void stopSink(Duration duration) {
        if (this.sink != null) {
            this.sink.stop(duration);
        }
    }

    public Streams<K, V, T, U> subscribe(Flow.Subscriber<? super Message<K, V>> subscriber) {
        if (this.fromPublisher == null) {
            throw new IllegalArgumentException(FROM_ERROR);
        }
        this.fromPublisher.subscribe(subscriber);
        this.fromPublisher = null;
        return this;
    }

    public Streams<K, V, T, U> to(String str, Flow.Publisher<Message<K, V>> publisher) {
        if (this.topicSink == null) {
            throw new IllegalArgumentException(TOPIC_SINK_ERROR);
        }
        this.topicProducers.add(Pair.pair(str, publisher));
        return this;
    }

    public Streams<K, V, T, U> to(String str) {
        if (this.topicSink == null) {
            throw new IllegalArgumentException(TOPIC_SINK_ERROR);
        }
        if (this.fromPublisher == null) {
            throw new IllegalArgumentException(FROM_ERROR);
        }
        to(str, this.fromPublisher);
        this.fromPublisher = null;
        return this;
    }

    private List<Flow.Subscriber<Message<K, V>>> topicConsumers(String str) {
        return (List) this.topicConsumers.stream().filter(pair -> {
            return ((String) pair.first).equals(str);
        }).map(pair2 -> {
            return (Flow.Processor) pair2.second;
        }).collect(Collectors.toList());
    }
}
