package cern.nxcals.api.ingestion;

import cern.cmw.datax.EntryType;
import cern.cmw.datax.ImmutableData;
import cern.cmw.datax.ImmutableEntry;
import cern.nxcals.api.domain.Entity;
import cern.nxcals.api.domain.KeyValues;
import cern.nxcals.common.SystemFields;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-ingestion-api-0.4.45.jar:cern/nxcals/api/ingestion/PublisherImpl.class */
class PublisherImpl<V> implements Publisher<V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PublisherImpl.class);

    @NonNull
    private final PublisherHelper<V> helper;

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // cern.nxcals.api.ingestion.Publisher
    public CompletableFuture<Result> publishAsync(V v, Executor executor) {
        Objects.requireNonNull(executor);
        Objects.requireNonNull(v, "Input value must not be null");
        CompletableFuture<Result> completableFuture = new CompletableFuture<>();
        executor.execute(() -> {
            try {
                ImmutableData apply = this.helper.getConverter().apply(v);
                if (apply.getEntryCount() > 1000) {
                    completableFuture.completeExceptionally(new IllegalRecordRuntimeException("Record has " + apply.getEntryCount() + " fields with limit of 1000. This large number of fields cannot be processes with Spark at extraction. Please re-model your data."));
                    return;
                }
                KeyValues encodeEntityKeyValues = this.helper.getEncoder().encodeEntityKeyValues(apply);
                Long encodeTimeKeyValues = this.helper.getEncoder().encodeTimeKeyValues(apply);
                if (encodeTimeKeyValues == null || encodeTimeKeyValues.longValue() <= 0) {
                    completableFuture.completeExceptionally(new IllegalRecordRuntimeException("Illegal record timestamp for entityKey=" + encodeEntityKeyValues + " system id=" + this.helper.getSystemId() + " timestamp=" + encodeTimeKeyValues));
                } else {
                    this.helper.getDataSink().send(new RecordData(findEntityData(apply, encodeEntityKeyValues, encodeTimeKeyValues), apply, encodeTimeKeyValues.longValue()), (result, exc) -> {
                        if (exc != null) {
                            completableFuture.completeExceptionally(exc);
                        } else {
                            completableFuture.complete(result);
                        }
                    });
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private Entity findEntityData(ImmutableData immutableData, KeyValues keyValues, Long l) {
        Long l2 = null;
        Long id = getId(immutableData, SystemFields.NXC_ENTITY_ID.getValue());
        if (id != null) {
            l2 = getId(immutableData, SystemFields.NXC_PARTITION_ID.getValue());
        }
        if (id != null && l2 != null) {
            return this.helper.getEntityService().findOrCreateEntityFor(this.helper.getSystemId(), id.longValue(), l2.longValue(), this.helper.getEncoder().encodeRecordFieldDefinitions(immutableData), l.longValue());
        }
        if (id == null) {
            return this.helper.getEntityService().findOrCreateEntityFor(this.helper.getSystemId(), keyValues, this.helper.getEncoder().encodePartitionKeyValues(immutableData), this.helper.getEncoder().encodeRecordFieldDefinitions(immutableData), l.longValue());
        }
        throw new IllegalRecordRuntimeException("Must have both entityId and partitionId set in a record for entityKey=" + keyValues.getKeyValues() + " system id=" + this.helper.getSystemId() + " timestamp=" + l);
    }

    private Long getId(ImmutableData immutableData, String str) {
        ImmutableEntry entry = immutableData.getEntry(str);
        if (entry != null) {
            return (Long) entry.getAs(EntryType.INT64);
        }
        return null;
    }

    public PublisherImpl(@NonNull PublisherHelper<V> publisherHelper) {
        if (publisherHelper == null) {
            throw new NullPointerException("helper is marked @NonNull but is null");
        }
        this.helper = publisherHelper;
    }
}
