package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.TopicSensors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;

/* loaded from: input_file:io/confluent/ksql/metrics/ConsumerCollector.class */
public class ConsumerCollector implements MetricCollector, ConsumerInterceptor<Object, Object> {
    public static final String CONSUMER_MESSAGES_PER_SEC = "consumer-messages-per-sec";
    public static final String CONSUMER_TOTAL_MESSAGES = "consumer-total-messages";
    public static final String CONSUMER_TOTAL_BYTES = "consumer-total-bytes";
    private final Map<String, TopicSensors<ConsumerRecord<Object, Object>>> topicSensors = new HashMap();
    private Metrics metrics;
    private String id;
    private String groupId;
    private Time time;

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        String str = (String) map.get("group.id");
        if (str != null) {
            this.groupId = str;
        }
        if (str == null) {
            str = (String) map.get("client.id");
        }
        if (str.contains("")) {
            configure(MetricCollectors.getMetrics(), MetricCollectors.addCollector(str, this), MetricCollectors.getTime());
        }
    }

    ConsumerCollector configure(Metrics metrics, String str, Time time) {
        this.id = str;
        this.metrics = metrics;
        this.time = time;
        return this;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public String getGroupId() {
        return this.groupId;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        collect(consumerRecords);
        return consumerRecords;
    }

    private void collect(ConsumerRecords<Object, Object> consumerRecords) {
        StreamSupport.stream(consumerRecords.spliterator(), false).forEach(consumerRecord -> {
            record(consumerRecord.topic().toLowerCase(), false, consumerRecord);
        });
    }

    private void record(String str, boolean z, ConsumerRecord<Object, Object> consumerRecord) {
        this.topicSensors.computeIfAbsent(getCounterKey(str), str2 -> {
            return new TopicSensors(str, buildSensors(str2));
        }).increment(consumerRecord, z);
    }

    private String getCounterKey(String str) {
        return str;
    }

    private List<TopicSensors.SensorMetric<ConsumerRecord<Object, Object>>> buildSensors(String str) {
        ArrayList arrayList = new ArrayList();
        synchronized (this.metrics) {
            addSensor(str, CONSUMER_MESSAGES_PER_SEC, new Rate(), arrayList, false);
            addSensor(str, CONSUMER_TOTAL_MESSAGES, new CumulativeSum(), arrayList, false);
            addSensor(str, CONSUMER_TOTAL_BYTES, new CumulativeSum(), arrayList, false, consumerRecord -> {
                return consumerRecord == null ? Double.valueOf(0.0d) : Double.valueOf(consumerRecord.serializedValueSize() + consumerRecord.serializedKeySize());
            });
        }
        return arrayList;
    }

    private void addSensor(String str, String str2, MeasurableStat measurableStat, List<TopicSensors.SensorMetric<ConsumerRecord<Object, Object>>> list, boolean z) {
        addSensor(str, str2, measurableStat, list, z, consumerRecord -> {
            return Double.valueOf(1.0d);
        });
    }

    private void addSensor(String str, String str2, MeasurableStat measurableStat, List<TopicSensors.SensorMetric<ConsumerRecord<Object, Object>>> list, boolean z, final Function<ConsumerRecord<Object, Object>, Double> function) {
        String str3 = "cons-" + str + "-" + str2 + "-" + this.id;
        MetricName metricName = new MetricName(str2, "consumer-metrics", "consumer-" + str3, ImmutableMap.of("key", str, "id", this.id));
        Sensor sensor = this.metrics.getSensor(str3);
        final Sensor sensor2 = this.metrics.sensor(str3);
        if (sensor == null || this.metrics.metrics().get(metricName) == null) {
            sensor2.add(metricName, measurableStat);
        }
        list.add(new TopicSensors.SensorMetric<ConsumerRecord<Object, Object>>(sensor2, this.metrics.metrics().get(metricName), this.time, z) { // from class: io.confluent.ksql.metrics.ConsumerCollector.1
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // io.confluent.ksql.metrics.TopicSensors.SensorMetric
            public void record(ConsumerRecord<Object, Object> consumerRecord) {
                sensor2.record(((Double) function.apply(consumerRecord)).doubleValue());
                super.record((AnonymousClass1) consumerRecord);
            }
        });
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerInterceptor, java.lang.AutoCloseable
    public void close() {
        MetricCollectors.remove(this.id);
        this.topicSensors.values().forEach(topicSensors -> {
            topicSensors.close(this.metrics);
        });
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public Collection<TopicSensors.Stat> stats(String str, boolean z) {
        return MetricUtils.stats(str, z, this.topicSensors.values());
    }

    @Override // io.confluent.ksql.metrics.MetricCollector
    public double aggregateStat(String str, boolean z) {
        return MetricUtils.aggregateStat(str, z, this.topicSensors.values());
    }

    public String toString() {
        return getClass().getSimpleName() + " id:" + this.id + " " + this.topicSensors.keySet();
    }
}
