package io.confluent.ksql.reactive;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import java.util.Objects;
import kotlin.jvm.internal.LongCompanionObject;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/reactive/BasePublisher.class */
public abstract class BasePublisher<T> implements Publisher<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BasePublisher.class);
    protected final Context ctx;
    private volatile Subscriber<? super T> subscriber;
    private long demand;
    private boolean cancelled;
    private boolean sentComplete;
    private volatile Throwable failure;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/reactive/BasePublisher$Sub.class */
    public class Sub implements Subscription {
        private Sub() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            BasePublisher.this.ctx.runOnContext(r7 -> {
                BasePublisher.this.doRequest(j);
            });
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            BasePublisher.this.ctx.runOnContext(r3 -> {
                BasePublisher.this.doCancel();
            });
        }
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}, justification = "ctx should be mutable")
    public BasePublisher(Context context) {
        this.ctx = (Context) Objects.requireNonNull(context);
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        if (isFailed()) {
            throw new IllegalStateException("Cannot subscribe to failed publisher. Failure cause: " + this.failure);
        }
        Objects.requireNonNull(subscriber);
        if (VertxUtils.isEventLoopAndSameContext(this.ctx)) {
            doSubscribe(subscriber);
        } else {
            this.ctx.runOnContext(r5 -> {
                doSubscribe(subscriber);
            });
        }
    }

    public void close() {
        this.ctx.runOnContext(r3 -> {
            doClose();
        });
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "ctx should be mutable")
    public Context getContext() {
        return this.ctx;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkContext() {
        VertxUtils.checkContext(this.ctx);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void sendError(Throwable th) {
        checkContext();
        try {
            if (this.subscriber != null) {
                this.subscriber.onError(th);
            } else {
                log.error("Failure in publisher", th);
            }
            this.failure = th;
        } catch (Exception e) {
            logError("Exception encountered in onError", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendComplete() {
        try {
            this.sentComplete = true;
            this.subscriber.onComplete();
        } catch (Exception e) {
            logError("Exception encountered in onComplete", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doOnNext(T t) {
        if (beforeOnNext()) {
            try {
                this.subscriber.onNext(t);
            } catch (Exception e) {
                logError("Exception encountered in onNext", e);
            }
            if (this.demand != LongCompanionObject.MAX_VALUE) {
                this.demand--;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getDemand() {
        return this.demand;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Subscriber<? super T> getSubscriber() {
        return this.subscriber;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasSentComplete() {
        return this.sentComplete;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isCancelled() {
        return this.cancelled;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isFailed() {
        return this.failure != null;
    }

    protected abstract void maybeSend();

    protected boolean beforeOnNext() {
        return true;
    }

    protected void afterSubscribe() {
    }

    private void doSubscribe(Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        try {
            subscriber.onSubscribe(new Sub());
        } catch (Throwable th) {
            sendError(new IllegalStateException("Exception encountered in onSubscribe", th));
        }
        afterSubscribe();
    }

    private void doClose() {
        if (this.subscriber != null) {
            sendComplete();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doRequest(long j) {
        if (j <= 0) {
            sendError(new IllegalArgumentException("Amount requested must be > 0"));
        } else if (this.demand + j < 1) {
            this.demand = LongCompanionObject.MAX_VALUE;
            maybeSend();
        } else {
            this.demand += j;
            maybeSend();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doCancel() {
        this.cancelled = true;
        this.subscriber = null;
    }

    private void logError(String str, Exception exc) {
        log.error(str, (Throwable) exc);
        this.failure = exc;
    }
}
