package cern.nxcals.api.ingestion;

import cern.cmw.datax.ImmutableData;
import cern.nxcals.api.domain.SystemSpec;
import cern.nxcals.api.extraction.metadata.InternalServiceClientFactory;
import cern.nxcals.api.extraction.metadata.SystemSpecService;
import cern.nxcals.api.ingestion.BufferedPublisher;
import cern.nxcals.common.converters.TimeConverterImpl;
import cern.nxcals.common.utils.ConfigHolder;
import cern.nxcals.internal.extraction.metadata.InternalEntityService;
import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.4.45.jar:cern/nxcals/api/ingestion/PublisherFactory.class */
public final class PublisherFactory {
    private static final int DEFAULT_MAX_RECORD_SIZE = 10485760;
    private final Supplier<Producer<byte[], byte[]>> kafkaProducerSupplier;
    private InternalEntityService entityService;
    private SystemSpecService systemSpecService;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PublisherFactory.class);
    private static final Supplier<Producer<byte[], byte[]>> DEFAULT_KAFKA_PRODUCER_SUPPLIER = () -> {
        Map<String, Object> kafkaConfig = getKafkaConfig();
        log.debug("Creating KafkaProducer with config={}", kafkaConfig);
        return new KafkaProducer(kafkaConfig);
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.4.45.jar:cern/nxcals/api/ingestion/PublisherFactory$Closer.class */
    public static class Closer {
        private static final Logger log = LoggerFactory.getLogger((Class<?>) Closer.class);
        private final LinkedList<ExecutorService> executors = new LinkedList<>();
        private final LinkedList<Closeable> closeables = new LinkedList<>();

        Closer() {
        }

        void register(ExecutorService executorService) {
            if (executorService != null) {
                this.executors.add(executorService);
            }
        }

        void register(Closeable closeable) {
            if (closeable != null) {
                this.closeables.add(closeable);
            }
        }

        void shutdown() {
            while (!this.executors.isEmpty()) {
                ExecutorService removeFirst = this.executors.removeFirst();
                log.debug("Shutting down {}", removeFirst);
                removeFirst.shutdown();
            }
        }

        void close() {
            while (!this.closeables.isEmpty()) {
                Closeable removeFirst = this.closeables.removeFirst();
                try {
                    log.debug("Closing {}", removeFirst);
                    removeFirst.close();
                } catch (IOException e) {
                    log.error("Error while closing {}", removeFirst, e);
                }
            }
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.4.45.jar:cern/nxcals/api/ingestion/PublisherFactory$WrappedPublisher.class */
    static class WrappedPublisher<V> implements Publisher<V> {

        @NonNull
        private final Publisher<V> delegate;

        @NonNull
        private final Closer closer;

        @Override // cern.nxcals.api.ingestion.Publisher
        public CompletableFuture<Result> publishAsync(V v, Executor executor) {
            return this.delegate.publishAsync(v, executor);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.delegate.close();
            this.closer.shutdown();
            this.closer.close();
        }

        public WrappedPublisher(@NonNull Publisher<V> publisher, @NonNull Closer closer) {
            if (publisher == null) {
                throw new NullPointerException("delegate is marked @NonNull but is null");
            }
            if (closer == null) {
                throw new NullPointerException("closer is marked @NonNull but is null");
            }
            this.delegate = publisher;
            this.closer = closer;
        }

        @NonNull
        public Publisher<V> getDelegate() {
            return this.delegate;
        }

        @NonNull
        public Closer getCloser() {
            return this.closer;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof WrappedPublisher)) {
                return false;
            }
            WrappedPublisher wrappedPublisher = (WrappedPublisher) obj;
            if (!wrappedPublisher.canEqual(this)) {
                return false;
            }
            Publisher<V> delegate = getDelegate();
            Publisher<V> delegate2 = wrappedPublisher.getDelegate();
            if (delegate == null) {
                if (delegate2 != null) {
                    return false;
                }
            } else if (!delegate.equals(delegate2)) {
                return false;
            }
            Closer closer = getCloser();
            Closer closer2 = wrappedPublisher.getCloser();
            return closer == null ? closer2 == null : closer.equals(closer2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof WrappedPublisher;
        }

        public int hashCode() {
            Publisher<V> delegate = getDelegate();
            int hashCode = (1 * 59) + (delegate == null ? 43 : delegate.hashCode());
            Closer closer = getCloser();
            return (hashCode * 59) + (closer == null ? 43 : closer.hashCode());
        }

        public String toString() {
            return "PublisherFactory.WrappedPublisher(delegate=" + getDelegate() + ", closer=" + getCloser() + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    private PublisherFactory() {
        this(InternalServiceClientFactory.createEntityService(), InternalServiceClientFactory.createSystemSpecService(), DEFAULT_KAFKA_PRODUCER_SUPPLIER);
    }

    @VisibleForTesting
    PublisherFactory(InternalEntityService internalEntityService, SystemSpecService systemSpecService, Supplier<Producer<byte[], byte[]>> supplier) {
        this.entityService = (InternalEntityService) Objects.requireNonNull(internalEntityService);
        this.systemSpecService = (SystemSpecService) Objects.requireNonNull(systemSpecService);
        this.kafkaProducerSupplier = (Supplier) Objects.requireNonNull(supplier);
    }

    private static Map<String, Object> getKafkaConfig() {
        Config config = ConfigHolder.getConfig().getConfig("kafka.producer");
        if (config.isEmpty()) {
            throw new IllegalStateException("Cannot find kafka producer properties");
        }
        return (Map) config.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((ConfigValue) entry.getValue()).unwrapped();
        }));
    }

    public <V> Publisher<V> createPublisher(String str, Function<V, ImmutableData> function) {
        PublisherHelper<V> createHelper = createHelper(str, function);
        return new WrappedPublisher(new PublisherImpl(createHelper), createCloser(createHelper.getDataSink(), null));
    }

    public <V> Publisher<V> createBufferedPublisher(String str, Function<V, ImmutableData> function, Duration duration, Duration duration2) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(((Integer) ConfigHolder.getProperty("publisher.buffer.processor.threads", 2)).intValue());
        DelayedQueueBuffer delayedQueueBuffer = new DelayedQueueBuffer();
        PublisherHelper<V> createHelper = createHelper(str, function);
        BufferWriter<V> createWriter = createWriter(createHelper, delayedQueueBuffer, duration);
        BufferReader createReader = createReader(createHelper.getDataSink(), delayedQueueBuffer, newFixedThreadPool);
        return new WrappedPublisher(new BufferedPublisher(createWriter, createReader, duration2), createCloser(createHelper.getDataSink(), newFixedThreadPool));
    }

    private Closer createCloser(DataSink<RecordData, DefaultCallback> dataSink, ExecutorService executorService) {
        Closer closer = new Closer();
        closer.register(dataSink);
        closer.register(executorService);
        return closer;
    }

    private BufferReader createReader(DataSink<RecordData, DefaultCallback> dataSink, Buffer<BufferedPublisher.DelayedMessage> buffer, ExecutorService executorService) {
        return new BufferReader(dataSink, buffer, ((Integer) ConfigHolder.getProperty("publisher.buffer.entities.per.round", 500)).intValue(), executorService);
    }

    private <V> BufferWriter<V> createWriter(PublisherHelper<V> publisherHelper, Buffer<BufferedPublisher.DelayedMessage> buffer, Duration duration) {
        return new BufferWriter<>(buffer, publisherHelper, duration, Clock.systemDefaultZone());
    }

    private <V> PublisherHelper<V> createHelper(String str, Function<V, ImmutableData> function) {
        Objects.requireNonNull(str, "System name cannot be null");
        Objects.requireNonNull(function, "Converting function cannot be null");
        SystemSpec orElseThrow = this.systemSpecService.findByName(str).orElseThrow(() -> {
            return new IllegalArgumentException("No such system name " + str);
        });
        DataServiceEncoderImpl createDataToAvroServiceEncoder = createDataToAvroServiceEncoder(orElseThrow);
        return PublisherHelper.builder().converter(function).dataSink(createInternalDataSink(orElseThrow, createDataToAvroServiceEncoder)).encoder(createDataToAvroServiceEncoder).entityService(this.entityService).systemId(orElseThrow.getId()).build();
    }

    private DataServiceEncoderImpl createDataToAvroServiceEncoder(SystemSpec systemSpec) {
        String recordVersionKeyDefinitions = systemSpec.getRecordVersionKeyDefinitions();
        Schema schema = null;
        if (recordVersionKeyDefinitions != null) {
            schema = new Schema.Parser().parse(recordVersionKeyDefinitions);
        }
        return new DataServiceEncoderImpl(new Schema.Parser().parse(systemSpec.getEntityKeyDefinitions()), new Schema.Parser().parse(systemSpec.getPartitionKeyDefinitions()), new Schema.Parser().parse(systemSpec.getTimeKeyDefinitions()), schema, new TimeConverterImpl());
    }

    private DataSink<RecordData, DefaultCallback> createInternalDataSink(SystemSpec systemSpec, Function<RecordData, byte[]> function) {
        return new KafkaDataSink((String) ConfigHolder.getProperty("publisher.topic_name", systemSpec.getName()), this.kafkaProducerSupplier.get(), function, ((Integer) ConfigHolder.getProperty("publisher.max_record_size", Integer.valueOf(DEFAULT_MAX_RECORD_SIZE))).intValue());
    }

    public static PublisherFactory newInstance() {
        return new PublisherFactory();
    }
}
