package cern.nxcals.api.extraction.data.spark;

import cern.nxcals.api.domain.TimeWindow;
import cern.nxcals.api.extraction.data.ExtractionUtils;
import cern.nxcals.common.SystemFields;
import com.google.common.annotations.VisibleForTesting;
import java.net.URI;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/nxcals-extraction-api-0.4.3.jar:cern/nxcals/api/extraction/data/spark/HdfsDatasetCreator.class */
public class HdfsDatasetCreator extends AbstractDatasetCreator<Set<URI>> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HdfsDatasetCreator.class);
    private final Predicate<URI> pathPredicate;
    private final Function<String, String> stampFieldProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HdfsDatasetCreator(SparkSession sparkSession, Predicate<URI> predicate) {
        this(sparkSession, predicate, ExtractionUtils::getTimestampFieldName);
    }

    @VisibleForTesting
    HdfsDatasetCreator(SparkSession sparkSession, Predicate<URI> predicate, Function<String, String> function) {
        super(sparkSession);
        this.pathPredicate = (Predicate) Objects.requireNonNull(predicate);
        this.stampFieldProvider = (Function) Objects.requireNonNull(function);
    }

    @Override // cern.nxcals.api.extraction.data.spark.AbstractDatasetCreator
    Dataset<Row> createDataset(DatasetCriteria<Set<URI>> datasetCriteria) {
        String[] validPaths = getValidPaths(datasetCriteria);
        if (validPaths.length == 0) {
            log.warn("Nothing to be queried for {} as the relevant paths {} do not exists", datasetCriteria, datasetCriteria.getResources());
            return createEmptyDataFrame(datasetCriteria.getProjection());
        }
        String[] columnsFrom = getColumnsFrom(datasetCriteria.getProjection());
        Column selection = getSelection(datasetCriteria.getSelectionCriteria());
        if (log.isDebugEnabled()) {
            log.debug("Querying {} with condition: {} and columns: {}", Arrays.toString(validPaths), selection, Arrays.toString(columnsFrom));
        }
        return getSparkSession().read().load(validPaths).selectExpr(columnsFrom).where(selection);
    }

    @VisibleForTesting
    String[] getValidPaths(DatasetCriteria<Set<URI>> datasetCriteria) {
        return (String[]) datasetCriteria.getResources().stream().filter(this.pathPredicate).map((v0) -> {
            return v0.toString();
        }).toArray(i -> {
            return new String[i];
        });
    }

    @VisibleForTesting
    Column getSelection(SelectionCriteria selectionCriteria) {
        TimeWindow timeWindow = selectionCriteria.getTimeWindow();
        return new Column(SystemFields.NXC_EXTR_ENTITY_ID.getValue()).isInCollection(selectionCriteria.getEntityIds()).and(new Column(this.stampFieldProvider.apply(selectionCriteria.getSystemSpec().getName())).between(Long.valueOf(timeWindow.getStartTimeNanos()), Long.valueOf(timeWindow.getEndTimeNanos())));
    }
}
