package org.factcast.store.internal.listen;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import org.factcast.store.StoreConfigurationProperties;
import org.factcast.store.internal.PgConstants;
import org.factcast.store.internal.PgMetrics;
import org.factcast.store.internal.StoreMetrics;
import org.factcast.store.internal.notification.BlacklistChangeNotification;
import org.factcast.store.internal.notification.FactInsertionNotification;
import org.factcast.store.internal.notification.SchemaStoreChangeNotification;
import org.factcast.store.internal.notification.StoreNotification;
import org.postgresql.PGNotification;
import org.postgresql.jdbc.PgConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:org/factcast/store/internal/listen/PgListener.class */
public class PgListener implements InitializingBean, DisposableBean {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PgListener.class);

    @NonNull
    private final PgConnectionSupplier pgConnectionSupplier;

    @NonNull
    private final EventBus eventBus;

    @NonNull
    private final StoreConfigurationProperties props;

    @NonNull
    private final PgMetrics pgMetrics;
    private Thread listenerThread;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/factcast/store/internal/listen/PgListener$NotificationReceiverLoop.class */
    public class NotificationReceiverLoop implements Runnable {
        protected NotificationReceiverLoop() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (PgListener.this.running.get()) {
                try {
                    PgConnection unpooledConnection = PgListener.this.pgConnectionSupplier.getUnpooledConnection("notification-receiver-loop");
                    try {
                        PgListener.this.connectionSetup(unpooledConnection);
                        while (PgListener.this.running.get()) {
                            PgListener.this.processNotifications(PgListener.this.receiveNotifications(unpooledConnection));
                        }
                        if (unpooledConnection != null) {
                            unpooledConnection.close();
                        }
                    } catch (Throwable th) {
                        if (unpooledConnection != null) {
                            try {
                                unpooledConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    if (PgListener.this.running.get()) {
                        PgListener.log.warn("While waiting for Notifications", e);
                        sleep();
                    }
                }
            }
        }

        private void sleep() {
            TimeUnit.MILLISECONDS.sleep(PgListener.this.props.getFactNotificationNewConnectionWaitTimeInMillis());
        }
    }

    @VisibleForTesting
    protected void listen() {
        log.trace("Starting instance Listener");
        this.listenerThread = new Thread(new NotificationReceiverLoop(), "PG Instance Listener");
        this.listenerThread.setDaemon(true);
        this.listenerThread.setUncaughtExceptionHandler((thread, th) -> {
            log.error("thread " + thread + " encountered an unhandled exception", th);
        });
        this.listenerThread.start();
        try {
            log.info("Waiting to establish postgres listener (max 15sec.)");
            log.info("postgres listener " + (this.countDownLatch.await(15L, TimeUnit.SECONDS) ? "" : "not ") + "established");
        } catch (InterruptedException e) {
        }
    }

    private void connectionSetup(PgConnection pgConnection) throws SQLException {
        setupPostgresListeners(pgConnection);
        this.countDownLatch.countDown();
        informSubscribersAboutFreshConnection();
    }

    @VisibleForTesting
    protected void setupPostgresListeners(PgConnection pgConnection) throws SQLException {
        PreparedStatement prepareStatement = pgConnection.prepareStatement(PgConstants.LISTEN_INSERT_CHANNEL_SQL);
        try {
            PreparedStatement prepareStatement2 = pgConnection.prepareStatement(PgConstants.LISTEN_ROUNDTRIP_CHANNEL_SQL);
            try {
                PreparedStatement prepareStatement3 = pgConnection.prepareStatement(PgConstants.LISTEN_BLACKLIST_CHANGE_CHANNEL_SQL);
                try {
                    PreparedStatement prepareStatement4 = pgConnection.prepareStatement(PgConstants.LISTEN_SCHEMASTORE_CHANGE_CHANNEL_SQL);
                    try {
                        PreparedStatement prepareStatement5 = pgConnection.prepareStatement(PgConstants.LISTEN_TRANSFORMATIONSTORE_CHANGE_CHANNEL_SQL);
                        try {
                            PreparedStatement prepareStatement6 = pgConnection.prepareStatement(PgConstants.LISTEN_TRUNCATION_CHANNEL_SQL);
                            try {
                                PreparedStatement prepareStatement7 = pgConnection.prepareStatement(PgConstants.LISTEN_UPDATE_CHANNEL_SQL);
                                try {
                                    prepareStatement.execute();
                                    prepareStatement2.execute();
                                    prepareStatement3.execute();
                                    prepareStatement4.execute();
                                    prepareStatement5.execute();
                                    prepareStatement6.execute();
                                    prepareStatement7.execute();
                                    if (prepareStatement7 != null) {
                                        prepareStatement7.close();
                                    }
                                    if (prepareStatement6 != null) {
                                        prepareStatement6.close();
                                    }
                                    if (prepareStatement5 != null) {
                                        prepareStatement5.close();
                                    }
                                    if (prepareStatement4 != null) {
                                        prepareStatement4.close();
                                    }
                                    if (prepareStatement3 != null) {
                                        prepareStatement3.close();
                                    }
                                    if (prepareStatement2 != null) {
                                        prepareStatement2.close();
                                    }
                                    if (prepareStatement != null) {
                                        prepareStatement.close();
                                    }
                                } catch (Throwable th) {
                                    if (prepareStatement7 != null) {
                                        try {
                                            prepareStatement7.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            } catch (Throwable th3) {
                                if (prepareStatement6 != null) {
                                    try {
                                        prepareStatement6.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                }
                                throw th3;
                            }
                        } catch (Throwable th5) {
                            if (prepareStatement5 != null) {
                                try {
                                    prepareStatement5.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            }
                            throw th5;
                        }
                    } catch (Throwable th7) {
                        if (prepareStatement4 != null) {
                            try {
                                prepareStatement4.close();
                            } catch (Throwable th8) {
                                th7.addSuppressed(th8);
                            }
                        }
                        throw th7;
                    }
                } catch (Throwable th9) {
                    if (prepareStatement3 != null) {
                        try {
                            prepareStatement3.close();
                        } catch (Throwable th10) {
                            th9.addSuppressed(th10);
                        }
                    }
                    throw th9;
                }
            } catch (Throwable th11) {
                if (prepareStatement2 != null) {
                    try {
                        prepareStatement2.close();
                    } catch (Throwable th12) {
                        th11.addSuppressed(th12);
                    }
                }
                throw th11;
            }
        } catch (Throwable th13) {
            if (prepareStatement != null) {
                try {
                    prepareStatement.close();
                } catch (Throwable th14) {
                    th13.addSuppressed(th14);
                }
            }
            throw th13;
        }
    }

    @VisibleForTesting
    protected void informSubscribersAboutFreshConnection() {
        post(FactInsertionNotification.internal());
        post(BlacklistChangeNotification.internal());
        post(SchemaStoreChangeNotification.internal());
    }

    @VisibleForTesting
    protected void processNotifications(PGNotification[] pGNotificationArr) {
        List of = List.of((Object[]) pGNotificationArr);
        Predicate predicate = pGNotification -> {
            return PgConstants.CHANNEL_FACT_INSERT.equals(pGNotification.getName());
        };
        streamWithCompactedBlacklistChanges(of.stream().filter(Predicate.not(predicate)).toList()).map(StoreNotification::createFrom).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(this::post);
        compact(of.stream().filter(predicate).map(FactInsertionNotification::from).filter((v0) -> {
            return Objects.nonNull(v0);
        })).forEach((v1) -> {
            post(v1);
        });
    }

    @VisibleForTesting
    Stream<FactInsertionNotification> compact(Stream<FactInsertionNotification> stream) {
        HashSet hashSet = new HashSet();
        return stream.filter(factInsertionNotification -> {
            return hashSet.add(factInsertionNotification.nsAndType());
        });
    }

    @VisibleForTesting
    Stream<PGNotification> streamWithCompactedBlacklistChanges(List<PGNotification> list) {
        Optional<PGNotification> reduce = list.stream().filter(pGNotification -> {
            return PgConstants.CHANNEL_BLACKLIST_CHANGE.equals(pGNotification.getName());
        }).reduce((pGNotification2, pGNotification3) -> {
            return pGNotification3;
        });
        if (!reduce.isPresent()) {
            return list.stream();
        }
        PGNotification pGNotification4 = reduce.get();
        return list.stream().filter(pGNotification5 -> {
            return !PgConstants.CHANNEL_BLACKLIST_CHANGE.equals(pGNotification5.getName()) || pGNotification5 == pGNotification4;
        });
    }

    @VisibleForTesting
    void post(@NonNull StoreNotification storeNotification) {
        Objects.requireNonNull(storeNotification, "n is marked non-null but is null");
        if (this.running.get()) {
            log.trace("posting to eventBus: {}", storeNotification);
            this.eventBus.post(storeNotification);
        }
    }

    @VisibleForTesting
    protected PGNotification[] receiveNotifications(PgConnection pgConnection) throws SQLException {
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationBlockingWaitTimeInMillis());
        if (notifications == null || notifications.length == 0) {
            notifications = checkDatabaseConnectionHealthy(pgConnection);
        }
        return notifications;
    }

    @VisibleForTesting
    protected PGNotification[] checkDatabaseConnectionHealthy(PgConnection pgConnection) throws SQLException {
        long nanoTime = System.nanoTime();
        pgConnection.prepareCall(PgConstants.NOTIFY_ROUNDTRIP_SQL).execute();
        PGNotification[] notifications = pgConnection.getNotifications(this.props.getFactNotificationMaxRoundTripLatencyInMillis());
        if (notifications == null || notifications.length == 0) {
            this.pgMetrics.counter(StoreMetrics.EVENT.MISSED_ROUNDTRIP).increment();
            throw new SQLException("Missed roundtrip notification from channel '" + PgConstants.CHANNEL_ROUNDTRIP + "'");
        }
        this.pgMetrics.timer(StoreMetrics.OP.NOTIFY_ROUNDTRIP).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
        return notifications;
    }

    public void afterPropertiesSet() {
        listen();
    }

    public void destroy() {
        this.running.set(false);
        if (this.listenerThread != null) {
            this.listenerThread.interrupt();
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public PgListener(@NonNull PgConnectionSupplier pgConnectionSupplier, @NonNull EventBus eventBus, @NonNull StoreConfigurationProperties storeConfigurationProperties, @NonNull PgMetrics pgMetrics) {
        Objects.requireNonNull(pgConnectionSupplier, "pgConnectionSupplier is marked non-null but is null");
        Objects.requireNonNull(eventBus, "eventBus is marked non-null but is null");
        Objects.requireNonNull(storeConfigurationProperties, "props is marked non-null but is null");
        Objects.requireNonNull(pgMetrics, "pgMetrics is marked non-null but is null");
        this.pgConnectionSupplier = pgConnectionSupplier;
        this.eventBus = eventBus;
        this.props = storeConfigurationProperties;
        this.pgMetrics = pgMetrics;
    }
}
