package cern.nxcals.data.access.api;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/nxcals-data-access-0.1.129.jar:cern/nxcals/data/access/api/HbaseDataAccessServiceImpl.class */
public class HbaseDataAccessServiceImpl implements InternalDataAccessService<ResourcesBasedQuery<HbaseResource>> {
    private static final Logger log = LoggerFactory.getLogger(HbaseDataAccessServiceImpl.class);
    private static final String PREFIX = "\"rowkey\":\"id\",\"columns\":{\"entityKey\":{\"cf\":\"rowkey\",\"col\":\"id\",\"type\":\"string\"},";
    private static final String SUFFIX = "}";
    private static final String HBASE_FORMAT = "org.apache.spark.sql.execution.datasources.hbase";
    private static final String ENTITY_KEY = "entityKey";
    private static final String KEY_DELIMITER = "__";
    private final SparkSession sparkSession;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HbaseDataAccessServiceImpl(SparkSession sparkSession) {
        this.sparkSession = sparkSession;
    }

    @Override // cern.nxcals.data.access.api.InternalDataAccessService
    public Dataset<Row> getDataFor(ResourcesBasedQuery<HbaseResource> resourcesBasedQuery) {
        Dataset<Row> dataset = null;
        for (ResourceQuery<HbaseResource> resourceQuery : resourcesBasedQuery.getResources()) {
            String[] columnsAsArray = resourceQuery.getColumnsAsArray();
            log.debug("Starting querying hbase from {} to {} with fields columns [{}] for entities [{}].", Long.valueOf(resourceQuery.getStartTime()), Long.valueOf(resourceQuery.getEndTime()), columnsAsArray, resourceQuery.getEntityIds());
            Column createCondition = createCondition(resourceQuery);
            for (HbaseResource hbaseResource : resourceQuery.getResources()) {
                log.debug("Querying data for table {}", hbaseResource.getTableName());
                Dataset<Row> selectExpr = this.sparkSession.read().format(HBASE_FORMAT).options(tableCatalog(hbaseResource.getNamespace(), hbaseResource.getTableName(), resourcesBasedQuery.getFields())).load().selectExpr(columnsAsArray);
                if (createCondition != null) {
                    selectExpr = selectExpr.where(createCondition);
                }
                if (dataset != null) {
                    dataset = dataset.union(selectExpr);
                    log.debug("Performed union on resultDataFrame.union(dataset) successfully!");
                } else {
                    dataset = selectExpr;
                    log.debug("Assigned first dataset");
                }
            }
        }
        return dataset != null ? dataset : createEmptyDataFrame(resourcesBasedQuery);
    }

    private Dataset<Row> createEmptyDataFrame(ResourcesBasedQuery<HbaseResource> resourcesBasedQuery) {
        return this.sparkSession.createDataFrame(Collections.emptyList(), SparkTypeUtils.getStructSchemaFor(resourcesBasedQuery.getFields()));
    }

    private Column createCondition(ResourceQuery<HbaseResource> resourceQuery) {
        Column column = null;
        for (Long l : resourceQuery.getEntityIds()) {
            Column createPeriodFilteredColumn = createPeriodFilteredColumn(getKeyPredicateFor(l.longValue(), resourceQuery.getStartTime()), getKeyPredicateFor(l.longValue(), resourceQuery.getEndTime()));
            column = column == null ? createPeriodFilteredColumn : column.or(createPeriodFilteredColumn);
        }
        return column;
    }

    String getKeyPredicateFor(long j, long j2) {
        String format = String.format("%s__%s", Long.valueOf(j), Long.valueOf(Long.MAX_VALUE - j2));
        log.debug("getKeyPredicateFor(entityId = {} ,timestamp = {}) returns = {}", Long.valueOf(j), Long.valueOf(j2), format);
        return format;
    }

    private Column createPeriodFilteredColumn(String str, String str2) {
        return new Column(ENTITY_KEY).geq(str2).and(new Column(ENTITY_KEY).leq(str));
    }

    Map<String, String> tableCatalog(String str, String str2, Collection<DataAccessField> collection) {
        String str3;
        if (StringUtils.isBlank(str)) {
            log.warn("The provided namespace is blank string, will use 'default' namespace instead!");
            str3 = "default";
        } else {
            str3 = str;
        }
        validateFields(collection);
        log.debug("Create catalog for namespace = {} table = {} with fields = {})", str3, str2, collection);
        String catalogHeader = catalogHeader(str2, collection);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(HBaseTableCatalog.tableCatalog(), String.format("{\"table\": {\"namespace\": \"%s\", \"name\":\"%s\"},%s}", str3, str2, catalogHeader));
        for (DataAccessField dataAccessField : collection) {
            String schemaName = schemaName(str2, dataAccessField);
            linkedHashMap.put(schemaName, dataAccessField.getFieldSchema().toString());
            if (log.isDebugEnabled()) {
                log.debug("Add schema for field \"{}\" to tablesCatalog: {} -> {}", dataAccessField.getFieldName(), schemaName, dataAccessField.getFieldSchema());
            }
        }
        return linkedHashMap;
    }

    private String catalogHeader(String str, Collection<DataAccessField> collection) {
        StringJoiner stringJoiner = new StringJoiner(",", PREFIX, "}");
        for (DataAccessField dataAccessField : collection) {
            stringJoiner.add(String.format("\"%s\":{\"cf\":\"data\",\"col\":\"%s\",\"avro\":\"%s\"}", dataAccessField.getFieldName(), dataAccessField.getFieldName(), schemaName(str, dataAccessField)));
        }
        return stringJoiner.toString();
    }

    private String schemaName(String str, DataAccessField dataAccessField) {
        return str + "__" + dataAccessField.getFieldName().toLowerCase();
    }

    private void validateFields(Collection<DataAccessField> collection) {
        List list = (List) collection.stream().filter(dataAccessField -> {
            return Objects.isNull(dataAccessField.getFieldSchema());
        }).map((v0) -> {
            return v0.getFieldName();
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            throw new IllegalStateException("There is no schema present for fields = " + list);
        }
    }
}
