package cern.nxcals.data.access.api;

import cern.nxcals.common.SystemFields;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/nxcals-data-access-0.1.129.jar:cern/nxcals/data/access/api/HdfsDataAccessServiceImpl.class */
public class HdfsDataAccessServiceImpl implements InternalDataAccessService<HdfsResourceBasedQuery> {
    private static final Logger log = LoggerFactory.getLogger(HdfsDataAccessServiceImpl.class);
    private final SparkSession sparkSession;
    private final FileSystem fileSystem;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HdfsDataAccessServiceImpl(SparkSession sparkSession, FileSystem fileSystem) {
        this.sparkSession = (SparkSession) Objects.requireNonNull(sparkSession);
        this.fileSystem = (FileSystem) Objects.requireNonNull(fileSystem);
    }

    @Override // cern.nxcals.data.access.api.InternalDataAccessService
    public Dataset<Row> getDataFor(HdfsResourceBasedQuery hdfsResourceBasedQuery) {
        Dataset<Row> dataset = null;
        String timeStampFieldName = getTimeStampFieldName(hdfsResourceBasedQuery.getTimestampField(), hdfsResourceBasedQuery.getFields());
        for (ResourceQuery<URI> resourceQuery : hdfsResourceBasedQuery.getResources()) {
            String[] columnsAsArray = resourceQuery.getColumnsAsArray();
            String[] strArr = (String[]) resourceQuery.getResources().stream().filter(this::pathExists).map((v0) -> {
                return v0.toString();
            }).toArray(i -> {
                return new String[i];
            });
            if (log.isDebugEnabled()) {
                log.debug("Calls getHdfsDataFor entity_ids = {} , timestampField = {}, timeWindow (startTime = {} : endTime = {}),  fields = {}", resourceQuery.getEntityIds(), timeStampFieldName, Long.valueOf(resourceQuery.getStartTime()), Long.valueOf(resourceQuery.getEndTime()), hdfsResourceBasedQuery.getFields());
                log.debug("Columns = {}", resourceQuery.getColumns());
            }
            if (strArr.length > 0) {
                log.debug("Start loading parquet files");
                Dataset<Row> selectExpr = this.sparkSession.read().load(strArr).selectExpr(columnsAsArray);
                log.debug("Loaded parquet files");
                Dataset<Row> where = selectExpr.where(createPredicate(resourceQuery.getEntityIds(), timeStampFieldName, resourceQuery.getStartTime(), resourceQuery.getEndTime()));
                log.debug("Filtered dataset by entity and timestamps");
                if (dataset != null) {
                    dataset = dataset.union(where);
                    log.debug("Performed union on resultSet.union(dataset) successfully!");
                } else {
                    dataset = where;
                    log.debug("Assigned first dataset");
                }
            }
        }
        return dataset != null ? dataset : createEmptyDataFrame(hdfsResourceBasedQuery);
    }

    private Dataset<Row> createEmptyDataFrame(HdfsResourceBasedQuery hdfsResourceBasedQuery) {
        return this.sparkSession.createDataFrame(Collections.emptyList(), SparkTypeUtils.getStructSchemaFor(hdfsResourceBasedQuery.getFields()));
    }

    private String getTimeStampFieldName(String str, Collection<DataAccessField> collection) {
        for (DataAccessField dataAccessField : collection) {
            if (dataAccessField.getFieldName().equals(str) && dataAccessField.hasAlias()) {
                return dataAccessField.getAlias();
            }
        }
        return str;
    }

    private boolean pathExists(URI uri) {
        Path path = new Path(uri);
        try {
            log.debug("Checking paths start");
            boolean z = false;
            if (this.fileSystem.exists(path.getParent())) {
                RemoteIterator<LocatedFileStatus> listFiles = this.fileSystem.listFiles(path.getParent(), false);
                z = listFiles.hasNext() && listFiles.next().getPath().getName().endsWith(".parquet");
            }
            log.debug("Checking paths finished");
            return z;
        } catch (IOException e) {
            throw new IllegalStateException("Cannot access hdfs file system to check for path " + path, e);
        }
    }

    private Column createPredicate(Set<Long> set, String str, long j, long j2) {
        Column column = null;
        if (set.isEmpty()) {
            throw new IllegalArgumentException("Empty list of ids is illegal");
        }
        Iterator<Long> it = set.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            column = column == null ? new Column(SystemFields.NXC_EXTR_ENTITY_ID.getValue()).equalTo(Long.valueOf(longValue)) : column.or(new Column(SystemFields.NXC_EXTR_ENTITY_ID.getValue()).equalTo(Long.valueOf(longValue)));
        }
        if (column != null) {
            return column.and(new Column(str).geq(Long.valueOf(j))).and(new Column(str).leq(Long.valueOf(j2)));
        }
        throw new IllegalStateException("ColumnIdPredicate is null");
    }
}
