package com.purbon.kafka.topology.backend.kafka;

import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.backend.BackendState;
import com.purbon.kafka.topology.backend.KafkaBackend;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:com/purbon/kafka/topology/backend/kafka/KafkaBackendConsumer.class */
public class KafkaBackendConsumer {
    private Configuration config;
    private KafkaConsumer<String, BackendState> consumer;
    private AtomicBoolean running = new AtomicBoolean(false);

    public KafkaBackendConsumer(Configuration configuration) {
        this.config = configuration;
    }

    public void configure() {
        Properties asProperties = this.config.asProperties();
        asProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
        asProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer(BackendState.class).getClass());
        asProperties.put("group.id", this.config.getKafkaBackendConsumerGroupId());
        this.consumer = new KafkaConsumer<>(asProperties);
        List singletonList = Collections.singletonList(new TopicPartition(this.config.getJulieKafkaConfigTopic(), 0));
        this.consumer.assign(singletonList);
        this.consumer.seekToBeginning(singletonList);
    }

    public void retrieve(KafkaBackend kafkaBackend) {
        int i = 0;
        while (this.running.get()) {
            ConsumerRecords<String, BackendState> poll = this.consumer.poll(Duration.ofSeconds(10L));
            kafkaBackend.complete();
            Iterator<ConsumerRecord<String, BackendState>> it = poll.iterator();
            while (it.hasNext()) {
                kafkaBackend.apply(it.next());
            }
            if (poll.count() > 0 || i >= this.config.getKafkaBackendConsumerRetries().intValue()) {
                kafkaBackend.initialLoadFinish();
            }
            i++;
        }
    }

    public void stop() {
        this.running.set(false);
        this.consumer.wakeup();
    }

    public void start() {
        this.running.set(true);
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.consumer.listTopics();
    }
}
