package io.helidon.messaging.connectors.mock;

import io.helidon.common.reactive.Single;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

/* loaded from: input_file:io/helidon/messaging/connectors/mock/MockOutgoing.class */
public class MockOutgoing<P> {
    private final MockSubscriber mockSubscriber;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MockOutgoing(MockSubscriber mockSubscriber) {
        this.mockSubscriber = mockSubscriber;
    }

    public MockOutgoing<P> request(long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden!");
        }
        if (this.mockSubscriber.upstream() == null) {
            throw new IllegalStateException("Not subscribed yet.");
        }
        this.mockSubscriber.upstream().request(j);
        return this;
    }

    public MockOutgoing<P> requestMax() {
        return request(Long.MAX_VALUE);
    }

    public Single<Void> whenComplete() {
        return this.mockSubscriber.completed();
    }

    public MockOutgoing<P> awaitComplete(Duration duration) {
        whenComplete().await(duration);
        return this;
    }

    public MockOutgoing<P> awaitCount(Duration duration, int i) {
        CompletableFuture completableFuture = new CompletableFuture();
        Consumer<Integer> consumer = num -> {
            if (num.intValue() >= i) {
                completableFuture.complete(null);
            }
        };
        this.mockSubscriber.counters().add(consumer);
        consumer.accept(Integer.valueOf(this.mockSubscriber.data().size()));
        Single.create(completableFuture, true).await(duration);
        return this;
    }

    public MockOutgoing<P> awaitMessage(Duration duration, Function<Message<P>, Boolean> function) {
        request(1L);
        CompletableFuture completableFuture = new CompletableFuture();
        Consumer<Message<?>> consumer = message -> {
            if (((Boolean) function.apply(message)).booleanValue()) {
                completableFuture.complete(null);
            } else {
                request(1L);
            }
        };
        this.mockSubscriber.checkers().add(consumer);
        this.mockSubscriber.data().forEach(consumer);
        Single.create(completableFuture, true).await(duration);
        return this;
    }

    public MockOutgoing<P> awaitData(Duration duration, Function<Message<P>, P> function, P... pArr) {
        request(pArr.length);
        try {
            awaitCount(duration, pArr.length);
        } catch (CompletionException e) {
            if (!(e.getCause() instanceof TimeoutException)) {
                throw e;
            }
            MatcherAssert.assertThat("Not all " + pArr.length + " items delivered in time.", this.mockSubscriber.data().stream().map(message -> {
                return function.apply(message);
            }).toList(), Matchers.contains(pArr));
        }
        MatcherAssert.assertThat((List) this.mockSubscriber.data().stream().map(message2 -> {
            return function.apply(message2);
        }).collect(Collectors.toList()), Matchers.contains(pArr));
        return this;
    }

    @SafeVarargs
    public final MockOutgoing<P> awaitPayloads(Duration duration, P... pArr) {
        return awaitData(duration, (v0) -> {
            return v0.getPayload();
        }, pArr);
    }

    public <T> MockOutgoing<P> assertData(Function<Message<P>, P> function, Matcher<? super T> matcher) {
        MatcherAssert.assertThat(this.mockSubscriber.data().stream().map(message -> {
            return function.apply(message);
        }).toList(), matcher);
        return this;
    }

    public <T> MockOutgoing<P> assertPayloads(Matcher<? super T> matcher) {
        return assertData((v0) -> {
            return v0.getPayload();
        }, matcher);
    }

    @SafeVarargs
    public final MockOutgoing<P> assertPayloads(P... pArr) {
        return assertPayloads(Matchers.contains(pArr));
    }

    @SafeVarargs
    public final MockOutgoing<P> assertData(Function<Message<P>, P> function, P... pArr) {
        return assertData((v0) -> {
            return v0.getPayload();
        }, Matchers.contains(pArr));
    }

    public List<Message<P>> data() {
        return this.mockSubscriber.data().stream().map(message -> {
            return message;
        }).toList();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MockSubscriber subscriber() {
        return this.mockSubscriber;
    }
}
