package cern.nxcals.client;

import cern.cmw.datax.ImmutableData;
import cern.nxcals.common.converters.TimeConverterImpl;
import cern.nxcals.common.domain.SystemData;
import cern.nxcals.common.utils.ConfigHolder;
import cern.nxcals.service.client.api.SystemService;
import cern.nxcals.service.client.api.internal.InternalEntityService;
import cern.nxcals.service.client.providers.InternalServiceClientFactory;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-client-0.1.106.jar:cern/nxcals/client/PublisherFactory.class */
public final class PublisherFactory {
    private static final int DEFAULT_MAX_RECORD_SIZE = 10485760;
    private final Supplier<Producer<byte[], byte[]>> kafkaProducerSupplier;
    private InternalEntityService entityService;
    private SystemService systemService;
    private static final Logger log = LoggerFactory.getLogger(PublisherFactory.class);
    private static final Supplier<Producer<byte[], byte[]>> DEFAULT_KAFKA_PRODUCER_SUPPLIER = () -> {
        Map<String, Object> kafkaConfig = getKafkaConfig();
        log.debug("Creating KafkaProducer with config={}", kafkaConfig);
        return new KafkaProducer(kafkaConfig);
    };

    private PublisherFactory() {
        this(InternalServiceClientFactory.createEntityService(), InternalServiceClientFactory.createSystemService(), DEFAULT_KAFKA_PRODUCER_SUPPLIER);
    }

    @VisibleForTesting
    PublisherFactory(InternalEntityService internalEntityService, SystemService systemService, Supplier<Producer<byte[], byte[]>> supplier) {
        this.entityService = (InternalEntityService) Objects.requireNonNull(internalEntityService);
        this.systemService = (SystemService) Objects.requireNonNull(systemService);
        this.kafkaProducerSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    private static Map<String, Object> getKafkaConfig() {
        Config config = ConfigHolder.getConfig().getConfig("kafka.producer");
        if (config.isEmpty()) {
            throw new IllegalStateException("Cannot find kafka producer properties");
        }
        return (Map) config.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((ConfigValue) entry.getValue()).unwrapped();
        }));
    }

    public <V> Publisher<V> createPublisher(String str, Function<V, ImmutableData> function) {
        Objects.requireNonNull(str, "System name cannot be null");
        Objects.requireNonNull(function, "Converting function cannot be null");
        SystemData findByName = this.systemService.findByName(str);
        if (findByName == null) {
            throw new IllegalArgumentException("Unknown system: " + str);
        }
        DataServiceEncoderImpl createDataToAvroServiceEncoder = createDataToAvroServiceEncoder(findByName);
        return new PublisherImpl(findByName.getId(), function, this.entityService, createDataToAvroServiceEncoder, createInternalDataSink(findByName, createDataToAvroServiceEncoder));
    }

    private DataServiceEncoderImpl createDataToAvroServiceEncoder(SystemData systemData) {
        String recordVersionKeyDefinitions = systemData.getRecordVersionKeyDefinitions();
        Schema schema = null;
        if (recordVersionKeyDefinitions != null) {
            schema = new Schema.Parser().parse(recordVersionKeyDefinitions);
        }
        return new DataServiceEncoderImpl(new Schema.Parser().parse(systemData.getEntityKeyDefinitions()), new Schema.Parser().parse(systemData.getPartitionKeyDefinitions()), new Schema.Parser().parse(systemData.getTimeKeyDefinitions()), schema, new TimeConverterImpl());
    }

    private DataSink<RecordData, DefaultCallback> createInternalDataSink(SystemData systemData, Function<RecordData, byte[]> function) {
        return new KafkaDataSink((String) ConfigHolder.getProperty("publisher.topic_name", systemData.getName()), this.kafkaProducerSupplier.get(), function, ((Integer) ConfigHolder.getProperty("publisher.max_record_size", Integer.valueOf(DEFAULT_MAX_RECORD_SIZE))).intValue());
    }

    public static PublisherFactory newInstance() {
        return new PublisherFactory();
    }
}
