package cern.nxcals.api.extraction.data.spark;

import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.ExtractionUtils;
import cern.nxcals.common.utils.AvroUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.avro.Schema;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/nxcals-extraction-api-0.4.3.jar:cern/nxcals/api/extraction/data/spark/HBaseDatasetCreator.class */
public class HBaseDatasetCreator extends AbstractDatasetCreator<HBaseResource> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HBaseDatasetCreator.class);
    private static final String PREFIX = "\"rowkey\":\"id\",\"columns\":{\"entityKey\":{\"cf\":\"rowkey\",\"col\":\"id\",\"type\":\"string\"},";
    private static final String SUFFIX = "}";
    private static final String HBASE_FORMAT = "org.apache.spark.sql.execution.datasources.hbase";
    private static final String ENTITY_KEY = "entityKey";
    private static final String KEY_DELIMITER = "__";
    public static final String CATALOG_FORMAT = "{\"table\": {\"namespace\": \"%s\", \"name\":\"%s\"},%s}";

    /* JADX INFO: Access modifiers changed from: package-private */
    public HBaseDatasetCreator(SparkSession sparkSession) {
        super(sparkSession);
    }

    @Override // cern.nxcals.api.extraction.data.spark.AbstractDatasetCreator
    Dataset<Row> createDataset(DatasetCriteria<HBaseResource> datasetCriteria) {
        List<ColumnMapping> projection = datasetCriteria.getProjection();
        HBaseResource resources = datasetCriteria.getResources();
        String[] columnsFrom = getColumnsFrom(projection);
        Map<String, String> tableCatalog = tableCatalog(resources, projection);
        Dataset<Row> dataset = null;
        for (Column column : getSelection(datasetCriteria.getSelectionCriteria())) {
            if (log.isDebugEnabled()) {
                log.debug("Querying {} with condition: {} and columns: {}", resources, column, Arrays.toString(columnsFrom));
            }
            Dataset<Row> where = getSparkSession().read().format(HBASE_FORMAT).options(tableCatalog).load().selectExpr(columnsFrom).where(column);
            dataset = dataset != null ? dataset.union(where) : where;
        }
        return dataset != null ? dataset : createEmptyDataFrame(projection);
    }

    private List<Column> getSelection(SelectionCriteria selectionCriteria) {
        ArrayList arrayList = new ArrayList();
        Iterator it = Iterables.partition(selectionCriteria.getEntityIds(), 100).iterator();
        while (it.hasNext()) {
            Column column = null;
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                Column createPeriodFilter = createPeriodFilter(((Long) it2.next()).longValue(), selectionCriteria.getTimeWindow());
                column = column != null ? column.or(createPeriodFilter) : createPeriodFilter;
            }
            arrayList.add(column);
        }
        return arrayList;
    }

    @VisibleForTesting
    Pair<String, String> getReverseTimestampRowKeyPredicatesFor(long j, TimeWindow timeWindow) {
        return Pair.of(buildReverseTimestampRowKeyPredicate(j, timeWindow.getStartTimeNanos() - 1), buildReverseTimestampRowKeyPredicate(j, timeWindow.getEndTimeNanos()));
    }

    private String buildReverseTimestampRowKeyPredicate(long j, long j2) {
        return String.format("%s__%s", Long.valueOf(j), Long.valueOf(Long.MAX_VALUE - j2));
    }

    private Column createPeriodFilter(long j, TimeWindow timeWindow) {
        Pair<String, String> reverseTimestampRowKeyPredicatesFor = getReverseTimestampRowKeyPredicatesFor(j, timeWindow);
        return new Column(ENTITY_KEY).geq(reverseTimestampRowKeyPredicatesFor.getRight()).and(new Column(ENTITY_KEY).lt(reverseTimestampRowKeyPredicatesFor.getLeft()));
    }

    @VisibleForTesting
    Map<String, String> tableCatalog(HBaseResource hBaseResource, Collection<ColumnMapping> collection) {
        String namespace = hBaseResource.getNamespace();
        String tableName = hBaseResource.getTableName();
        log.debug("Creating catalog for namespace = {} table = {} with fields = {})", namespace, tableName, collection);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        StringJoiner stringJoiner = new StringJoiner(",", PREFIX, "}");
        linkedHashMap.put(HBaseTableCatalog.tableCatalog(), null);
        for (ColumnMapping columnMapping : collection) {
            stringJoiner.add(headerRow(tableName, linkedHashMap, columnMapping, columnMapping.getFieldSchema().getType()));
        }
        linkedHashMap.put(HBaseTableCatalog.tableCatalog(), String.format(CATALOG_FORMAT, namespace, tableName, stringJoiner));
        if (log.isDebugEnabled()) {
            log.debug("Creating catalog for namespace = {} table = {}: {})", namespace, tableName, linkedHashMap);
        }
        return linkedHashMap;
    }

    private String headerRow(String str, Map<String, String> map, ColumnMapping columnMapping, Schema.Type type) {
        String fieldName = columnMapping.getFieldName();
        Schema fieldSchema = columnMapping.getFieldSchema();
        if (AvroUtils.isUnion(type)) {
            return headerRow(str, map, columnMapping, AvroUtils.getSchemaTypeFor(fieldSchema));
        }
        if (AvroUtils.isPrimitiveType(type)) {
            return String.format("\"%s\":{\"cf\":\"data\",\"col\":\"%s\",\"type\":\"%s\"}", fieldName, fieldName, ExtractionUtils.getHbaseTypeNameFor(fieldSchema));
        }
        String schemaName = schemaName(str, columnMapping);
        map.put(schemaName, fieldSchema.toString());
        return String.format("\"%s\":{\"cf\":\"data\",\"col\":\"%s\",\"avro\":\"%s\"}", fieldName, fieldName, schemaName);
    }

    private String schemaName(String str, ColumnMapping columnMapping) {
        return str + "__" + columnMapping.getQualifiedName().toLowerCase();
    }
}
