package cern.nxcals.client;

import cern.cmw.datax.EntryType;
import cern.cmw.datax.ImmutableData;
import cern.cmw.datax.ImmutableEntry;
import cern.nxcals.common.SystemFields;
import cern.nxcals.common.domain.EntityData;
import cern.nxcals.common.utils.Tuple2;
import cern.nxcals.service.client.api.internal.InternalEntityService;
import cern.nxcals.service.client.domain.KeyValues;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-client-0.1.106.jar:cern/nxcals/client/PublisherImpl.class */
class PublisherImpl<V> implements Publisher<V> {
    private static final Logger log = LoggerFactory.getLogger(PublisherImpl.class);
    private static final Executor CURRENT_THREAD_EXECUTOR = (v0) -> {
        v0.run();
    };
    private static final int MAX_FIELDS = 1000;
    private final Function<V, ImmutableData> converter;
    private final InternalEntityService entityService;
    private final DataSink<RecordData, DefaultCallback> sink;
    private final long systemId;
    private final DataServiceEncoder<KeyValues, KeyValues, String, Long> encoder;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherImpl(long j, Function<V, ImmutableData> function, InternalEntityService internalEntityService, DataServiceEncoder<KeyValues, KeyValues, String, Long> dataServiceEncoder, DataSink<RecordData, DefaultCallback> dataSink) {
        this.systemId = j;
        this.entityService = (InternalEntityService) Objects.requireNonNull(internalEntityService);
        this.converter = (Function) Objects.requireNonNull(function);
        this.sink = (DataSink) Objects.requireNonNull(dataSink);
        this.encoder = (DataServiceEncoder) Objects.requireNonNull(dataServiceEncoder);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.sink.close();
    }

    @Override // cern.nxcals.client.Publisher
    public Result publish(V v) {
        return publishAsync(v, CURRENT_THREAD_EXECUTOR).join();
    }

    @Override // cern.nxcals.client.Publisher
    public CompletableFuture<Result> publishAsync(V v) {
        return publishAsync(v, ForkJoinPool.commonPool());
    }

    @Override // cern.nxcals.client.Publisher
    public CompletableFuture<Result> publishAsync(V v, Executor executor) {
        Objects.requireNonNull(executor);
        Objects.requireNonNull(v, "Input value must not be null");
        CompletableFuture<Result> completableFuture = new CompletableFuture<>();
        executor.execute(() -> {
            try {
                Tuple2<RecordData, Exception> convertToRecord = convertToRecord(v);
                if (convertToRecord.getRight() != null) {
                    completableFuture.completeExceptionally(convertToRecord.getRight());
                } else {
                    this.sink.send(convertToRecord.getLeft(), (result, exc) -> {
                        if (exc != null) {
                            completableFuture.completeExceptionally(exc);
                        } else {
                            completableFuture.complete(result);
                        }
                    });
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private Tuple2<RecordData, Exception> convertToRecord(V v) {
        return createRecordData(this.converter.apply(v));
    }

    private Tuple2<RecordData, Exception> createRecordData(ImmutableData immutableData) {
        if (immutableData.getEntryCount() > 1000) {
            return new Tuple2<>(null, new IllegalRecordRuntimeException("Record has " + immutableData.getEntryCount() + " fields with limit of 1000. This large number of fields cannot be processes with Spark at extraction. Please re-model your data."));
        }
        KeyValues encodeEntityKeyValues = this.encoder.encodeEntityKeyValues(immutableData);
        Long encodeTimeKeyValues = this.encoder.encodeTimeKeyValues(immutableData);
        return (encodeTimeKeyValues == null || encodeTimeKeyValues.longValue() <= 0) ? new Tuple2<>(null, new IllegalRecordRuntimeException("Illegal record timestamp for entityKey=" + encodeEntityKeyValues + " system=" + this.systemId + " timestamp=" + encodeTimeKeyValues)) : new Tuple2<>(new RecordData(findEntityData(immutableData, encodeEntityKeyValues, encodeTimeKeyValues), immutableData), null);
    }

    private EntityData findEntityData(ImmutableData immutableData, KeyValues keyValues, Long l) {
        Long l2 = null;
        Long id = getId(immutableData, SystemFields.NXC_ENTITY_ID.getValue());
        if (id != null) {
            l2 = getId(immutableData, SystemFields.NXC_PARTITION_ID.getValue());
        }
        if (id != null && l2 != null) {
            return this.entityService.findOrCreateEntityFor(this.systemId, id, l2, this.encoder.encodeRecordFieldDefinitions(immutableData), l);
        }
        if (id == null) {
            return this.entityService.findOrCreateEntityFor(this.systemId, keyValues, this.encoder.encodePartitionKeyValues(immutableData), this.encoder.encodeRecordFieldDefinitions(immutableData), l.longValue());
        }
        throw new IllegalRecordRuntimeException("Must have both entityId and partitionId set in a record for entityKey=" + keyValues.getKeyValues() + " system=" + this.systemId + " timestamp=" + l);
    }

    private Long getId(ImmutableData immutableData, String str) {
        ImmutableEntry entry = immutableData.getEntry(str);
        if (entry != null) {
            return (Long) entry.getAs(EntryType.INT64);
        }
        return null;
    }
}
