package com.purbon.kafka.topology.backend;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.backend.kafka.KafkaBackendConsumer;
import com.purbon.kafka.topology.backend.kafka.KafkaBackendProducer;
import com.purbon.kafka.topology.backend.kafka.RecordReceivedCallback;
import io.vertx.core.eventbus.DeliveryOptions;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/purbon/kafka/topology/backend/KafkaBackend.class */
public class KafkaBackend implements Backend, RecordReceivedCallback {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) KafkaBackend.class);
    private KafkaBackendConsumer consumer;
    private KafkaBackendProducer producer;
    private AtomicReference<BackendState> latest;
    private String instanceId;
    private Thread thread;
    private boolean isCompleted = false;
    private AtomicBoolean shouldWaitForLoad = new AtomicBoolean(true);

    /* loaded from: input_file:com/purbon/kafka/topology/backend/KafkaBackend$JulieKafkaConsumerThread.class */
    private static class JulieKafkaConsumerThread implements Runnable {
        private KafkaBackend callback;
        private KafkaBackendConsumer consumer;

        public JulieKafkaConsumerThread(KafkaBackend kafkaBackend, KafkaBackendConsumer kafkaBackendConsumer) {
            this.callback = kafkaBackend;
            this.consumer = kafkaBackendConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.consumer.start();
            try {
                this.consumer.retrieve(this.callback);
            } catch (WakeupException e) {
                KafkaBackend.LOGGER.trace(e);
            }
        }
    }

    @Override // com.purbon.kafka.topology.backend.Backend
    public void configure(Configuration configuration) {
        this.instanceId = configuration.getJulieInstanceId();
        this.latest = new AtomicReference<>(new BackendState());
        this.shouldWaitForLoad.set(true);
        this.consumer = new KafkaBackendConsumer(configuration);
        this.consumer.configure();
        if (!this.consumer.listTopics().containsKey(configuration.getJulieKafkaConfigTopic())) {
            throw new IOException("The internal julie kafka configuration topic topic " + configuration.getJulieKafkaConfigTopic() + " should exist in the cluster");
        }
        this.producer = new KafkaBackendProducer(configuration);
        this.producer.configure();
        this.thread = new Thread(new JulieKafkaConsumerThread(this, this.consumer), "kafkaJulieConsumer");
        this.thread.start();
        waitForCompletion();
    }

    public synchronized void waitForCompletion() throws InterruptedException {
        while (!this.isCompleted) {
            wait(DeliveryOptions.DEFAULT_TIMEOUT);
        }
    }

    public synchronized void complete() {
        this.isCompleted = true;
        notify();
    }

    @Override // com.purbon.kafka.topology.backend.Backend
    public void save(BackendState backendState) throws IOException {
        this.producer.save(backendState);
    }

    @Override // com.purbon.kafka.topology.backend.Backend
    public BackendState load() throws IOException {
        do {
        } while (this.shouldWaitForLoad.get());
        return this.latest == null ? new BackendState() : this.latest.get();
    }

    public void initialLoadFinish() {
        this.shouldWaitForLoad.set(false);
    }

    @Override // com.purbon.kafka.topology.backend.Backend
    public void close() {
        this.consumer.stop();
        this.producer.stop();
        try {
            this.thread.join();
        } catch (InterruptedException e) {
            LOGGER.error(e);
        }
        this.latest = null;
        this.thread = null;
    }

    @Override // com.purbon.kafka.topology.backend.kafka.RecordReceivedCallback
    public void apply(ConsumerRecord<String, BackendState> consumerRecord) {
        if (!this.instanceId.equals(consumerRecord.key()) || this.latest == null) {
            return;
        }
        this.latest.set(consumerRecord.value());
    }
}
