package cern.nxcals.data.access.api;

import cern.nxcals.common.SystemFields;
import cern.nxcals.common.domain.EntityResourcesData;
import cern.nxcals.common.domain.ResourceData;
import cern.nxcals.common.domain.SchemaData;
import cern.nxcals.common.domain.TimeWindow;
import cern.nxcals.common.utils.IllegalCharacterConverter;
import cern.nxcals.common.utils.TimeUtils;
import cern.nxcals.data.access.ExceptionsFactory;
import cern.nxcals.data.access.api.exception.IncompatibleSchemaPromotionException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/nxcals-data-access-0.1.129.jar:cern/nxcals/data/access/api/DataAccessServiceImpl.class */
public class DataAccessServiceImpl implements DataAccessService {
    private final InternalDataAccessService<ResourcesBasedQuery<HbaseResource>> hbaseService;
    private final InternalDataAccessService<HdfsResourceBasedQuery> hdfsService;

    @VisibleForTesting
    static final String EXPANDED_FIELD_TARGET_ERROR_MESSAGE_FORMAT = "No available schema definition for the expanded field '%s' that is present on the latest schema version! Please, adjust your query time window to target an older period.";
    private UnaryOperator<DataAccessField> sanitizeField = dataAccessField -> {
        String fieldName = dataAccessField.getFieldName();
        return IllegalCharacterConverter.isLegal(fieldName) ? dataAccessField : new DataAccessFieldBuilder(IllegalCharacterConverter.get().convertToLegal(fieldName)).dataType(dataAccessField.getDataType()).fieldSchema(dataAccessField.getFieldSchema()).alias(dataAccessField.getAlias()).build();
    };
    private static final Logger log = LoggerFactory.getLogger(DataAccessServiceImpl.class);

    @VisibleForTesting
    static final Pattern EXPANDED_FIELD_PATTERN = Pattern.compile("(.+?)(_(neg)?\\d+){3}$");

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataAccessServiceImpl(InternalDataAccessService<ResourcesBasedQuery<HbaseResource>> internalDataAccessService, InternalDataAccessService<HdfsResourceBasedQuery> internalDataAccessService2) {
        this.hbaseService = (InternalDataAccessService) Objects.requireNonNull(internalDataAccessService);
        this.hdfsService = (InternalDataAccessService) Objects.requireNonNull(internalDataAccessService2);
    }

    @Override // cern.nxcals.data.access.api.DataAccessService
    public Dataset<Row> createDataSetFor(Set<ResourceQueryData> set) {
        log.debug("Create dataset for queries = {}", set);
        if (set.isEmpty()) {
            throw new IllegalStateException("QueryData is empty and the request cannot be performed");
        }
        if (hasNoEntityResources(set)) {
            return getEmptyDataset(set);
        }
        Collection<DataAccessField> requiredSchemaFieldsOrThrow = getRequiredSchemaFieldsOrThrow(set);
        log.debug("Fields to be extracted = {}", requiredSchemaFieldsOrThrow);
        DataSourceTimeWindow timeWindows = DataSourceTimeWindow.getTimeWindows(startTime(set), endTime(set));
        log.debug("HBase time window = {}, HDFS time window = {}", timeWindows.getHbaseTimeWindow(), timeWindows.getHdfsTimeWindow());
        ArrayList arrayList = new ArrayList();
        timeWindows.getHbaseTimeWindow().ifPresent(timeWindow -> {
            arrayList.add(getHBaseData(set, requiredSchemaFieldsOrThrow, timeWindow));
        });
        timeWindows.getHdfsTimeWindow().ifPresent(timeWindow2 -> {
            arrayList.add(getHDFSData(set, requiredSchemaFieldsOrThrow, timeWindow2));
        });
        return (Dataset) arrayList.stream().reduce((v0, v1) -> {
            return v0.union(v1);
        }).orElseThrow(() -> {
            return new IllegalArgumentException("No Spark dataSet have been created providing the requested queryDataSet=" + set);
        });
    }

    private Dataset<Row> getEmptyDataset(Set<ResourceQueryData> set) {
        return this.hdfsService.getDataFor(new HdfsResourceBasedQuery(Collections.emptySet(), getAllFieldsFrom(set), null));
    }

    private Dataset<Row> getHDFSData(Set<ResourceQueryData> set, Collection<DataAccessField> collection, TimeWindow timeWindow) {
        ResourcesBasedQuery resourceBasedQuery = getResourceBasedQuery(getDataSourceQueryData(set, timeWindow), collection, resourceQueryData -> {
            return resourceQueryData.getEntityResources().orElseThrow(() -> {
                return ExceptionsFactory.createValueNotPresentException(EntityResourcesData.class, resourceQueryData);
            }).getResourcesData().getHdfsPaths();
        });
        return this.hdfsService.getDataFor(new HdfsResourceBasedQuery(resourceBasedQuery.getResources(), resourceBasedQuery.getFields(), ServiceUtils.getTimestampFieldName(set.iterator().next().getEntityResources().orElseThrow(() -> {
            return ExceptionsFactory.createValueNotPresentException(EntityResourcesData.class, set);
        }).getSystemData().getTimeKeyDefinitions())));
    }

    private Dataset<Row> getHBaseData(Set<ResourceQueryData> set, Collection<DataAccessField> collection, TimeWindow timeWindow) {
        return this.hbaseService.getDataFor(getResourceBasedQuery(getDataSourceQueryData(set, timeWindow), collection, resourceQueryData -> {
            ResourceData resourcesData = resourceQueryData.getEntityResources().orElseThrow(() -> {
                return ExceptionsFactory.createValueNotPresentException(EntityResourcesData.class, resourceQueryData);
            }).getResourcesData();
            HashSet hashSet = new HashSet();
            Iterator<String> it = resourcesData.getHbaseTableNames().iterator();
            while (it.hasNext()) {
                hashSet.add(new HbaseResource(it.next(), resourcesData.getHbaseNamespace()));
            }
            return hashSet;
        }));
    }

    private List<DataAccessField> getAllFieldsFrom(Set<ResourceQueryData> set) {
        ArrayList arrayList = new ArrayList();
        Iterator<ResourceQueryData> it = set.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getFields());
        }
        return arrayList;
    }

    private boolean hasNoEntityResources(Set<ResourceQueryData> set) {
        return set.stream().noneMatch(resourceQueryData -> {
            return resourceQueryData.getEntityResources().isPresent();
        });
    }

    private Instant endTime(Set<ResourceQueryData> set) {
        return (Instant) set.stream().map((v0) -> {
            return v0.getEndTime();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(() -> {
            return new IllegalArgumentException("EndTime is not set for the requested NXCALS DataAccess query");
        });
    }

    private Instant startTime(Set<ResourceQueryData> set) {
        return (Instant) set.stream().map((v0) -> {
            return v0.getStartTime();
        }).min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(() -> {
            return new IllegalArgumentException("StartTime is not set for the requested NXCALS DataAccess query");
        });
    }

    private <T> ResourcesBasedQuery<T> getResourceBasedQuery(Set<ResourceQueryData> set, Collection<DataAccessField> collection, Function<ResourceQueryData, Set<T>> function) {
        HashSet hashSet = new HashSet();
        for (List<ResourceQueryData> list : getQueriesPerResource(set)) {
            Instant instant = Instant.MAX;
            Instant instant2 = Instant.MIN;
            HashSet hashSet2 = new HashSet();
            HashSet hashSet3 = new HashSet();
            Map<String, String> columnMappingsFor = getColumnMappingsFor(collection, list.get(0));
            for (ResourceQueryData resourceQueryData : list) {
                if (instant.isAfter(resourceQueryData.getStartTime())) {
                    instant = resourceQueryData.getStartTime();
                }
                if (instant2.isBefore(resourceQueryData.getEndTime())) {
                    instant2 = resourceQueryData.getEndTime();
                }
                hashSet2.addAll(function.apply(resourceQueryData));
                hashSet3.add(Long.valueOf(resourceQueryData.getEntityResources().orElseThrow(() -> {
                    return ExceptionsFactory.createValueNotPresentException(EntityResourcesData.class, resourceQueryData);
                }).getId()));
            }
            hashSet.add(new ResourceQuery(hashSet2, hashSet3, columnMappingsFor, TimeUtils.getNanosFromInstant(instant), TimeUtils.getNanosFromInstant(instant2)));
        }
        return new ResourcesBasedQuery<>(hashSet, collection);
    }

    private Collection<List<ResourceQueryData>> getQueriesPerResource(Set<ResourceQueryData> set) {
        HashMap hashMap = new HashMap();
        for (ResourceQueryData resourceQueryData : set) {
            EntityResourcesData orElseThrow = resourceQueryData.getEntityResources().orElseThrow(() -> {
                return ExceptionsFactory.createValueNotPresentException(EntityResourcesData.class, resourceQueryData);
            });
            hashMap.compute(orElseThrow.getPartitionData().getId() + "/" + orElseThrow.getSchemaData().getId(), (str, list) -> {
                if (list == null) {
                    list = new ArrayList();
                }
                list.add(resourceQueryData);
                return list;
            });
        }
        return hashMap.values();
    }

    private Set<ResourceQueryData> getDataSourceQueryData(Set<ResourceQueryData> set, TimeWindow timeWindow) {
        return (Set) set.stream().map(resourceQueryData -> {
            return limitTo(resourceQueryData, timeWindow);
        }).filter(resourceQueryData2 -> {
            return resourceQueryData2.getStartTime().compareTo(resourceQueryData2.getEndTime()) <= 0;
        }).collect(Collectors.toSet());
    }

    private ResourceQueryData limitTo(ResourceQueryData resourceQueryData, TimeWindow timeWindow) {
        TimeWindow intersect = TimeWindow.between(resourceQueryData.getStartTime(), resourceQueryData.getEndTime()).intersect(timeWindow);
        return (intersect.getStartTime().equals(resourceQueryData.getStartTime()) && intersect.getEndTime().equals(resourceQueryData.getEndTime())) ? resourceQueryData : new ResourceQueryData(resourceQueryData.getEntityResources(), intersect.getStartTime(), intersect.getEndTime(), resourceQueryData.getFields(), resourceQueryData.getVariableName());
    }

    private Collection<DataAccessField> getRequiredSchemaFieldsOrThrow(Set<ResourceQueryData> set) {
        try {
            return getRequiredSchemaFields((Set) set.stream().map(resourceQueryData -> {
                return resourceQueryData.getEntityResources().orElse(null);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).distinct().map((v0) -> {
                return v0.getSchemaData();
            }).collect(Collectors.toSet()), (Set) set.stream().flatMap(resourceQueryData2 -> {
                return resourceQueryData2.getFields().stream();
            }).collect(Collectors.toSet()));
        } catch (IncompatibleSchemaPromotionException e) {
            StringJoiner stringJoiner = new StringJoiner("; ", ". Please extract separate datasets for time windows : ", "");
            for (ResourceQueryData resourceQueryData3 : set) {
                stringJoiner.add(resourceQueryData3.getStartTime() + " - " + resourceQueryData3.getEndTime());
            }
            throw new IncompatibleSchemaPromotionException(e.getMessage() + stringJoiner.toString(), e);
        }
    }

    public Collection<DataAccessField> getRequiredSchemaFields(Set<SchemaData> set, Set<DataAccessField> set2) {
        if (set.isEmpty()) {
            throw new IllegalArgumentException("There is no schema present for the requested entity! Please contact the Logging support!");
        }
        log.debug("Check existence of the requested fields and type compatibility!");
        Set<DataAccessField> convertFieldNamesToLegal = convertFieldNamesToLegal(set2);
        Map<String, Set<Schema>> fieldSchemas = getFieldSchemas(set, convertFieldNamesToLegal);
        Set<DataAccessField> set3 = (Set) convertFieldNamesToLegal.stream().map(dataAccessField -> {
            Set<Schema> set4 = (Set) fieldSchemas.get(dataAccessField.getQualifiedName());
            checkIfExpandedFieldTarget(dataAccessField, set4, set);
            return FieldTypeResolver.INSTANCE.enrichFieldWithSchema(dataAccessField, set4);
        }).collect(Collectors.toSet());
        log.debug("Return verified set of requested fields");
        verifyNoneMissing(convertFieldNamesToLegal, set3);
        return sortFieldsByQualifiedName(set3);
    }

    private void checkIfExpandedFieldTarget(DataAccessField dataAccessField, Set<Schema> set, Set<SchemaData> set2) {
        if (CollectionUtils.isEmpty(set)) {
            String fieldName = dataAccessField.getFieldName();
            Matcher matcher = EXPANDED_FIELD_PATTERN.matcher(fieldName);
            if (matcher.matches()) {
                String group = matcher.group(1);
                if (set2.stream().flatMap(schemaData -> {
                    return new Schema.Parser().parse(schemaData.getSchemaJson()).getFields().stream();
                }).anyMatch(field -> {
                    return group.equals(field.name());
                })) {
                    throw new IllegalArgumentException(String.format(EXPANDED_FIELD_TARGET_ERROR_MESSAGE_FORMAT, fieldName));
                }
            }
        }
    }

    private void verifyNoneMissing(Set<DataAccessField> set, Set<DataAccessField> set2) {
        Sets.SetView difference = Sets.difference(set, set2);
        if (!difference.isEmpty()) {
            throw new IllegalArgumentException(String.format("Field(s) %s is(are) not present in any schema for the requested entity and time window!", difference.toString()));
        }
    }

    private Collection<DataAccessField> sortFieldsByQualifiedName(Collection<DataAccessField> collection) {
        return (Collection) collection.stream().sorted(Comparator.comparing((v0) -> {
            return v0.getQualifiedName();
        })).collect(Collectors.toCollection(() -> {
            return new TreeSet(Comparator.comparing((v0) -> {
                return v0.getQualifiedName();
            }));
        }));
    }

    private Map<String, Set<Schema>> getFieldSchemas(Set<SchemaData> set, Set<DataAccessField> set2) {
        Map map = (Map) set2.stream().collect(Collectors.toMap((v0) -> {
            return v0.getFieldName();
        }, Function.identity()));
        return (Map) set.stream().flatMap(schemaData -> {
            return new Schema.Parser().parse(schemaData.getSchemaJson()).getFields().stream();
        }).filter(field -> {
            return map.containsKey(field.name());
        }).collect(Collectors.groupingBy(field2 -> {
            return ((DataAccessField) map.get(field2.name())).getQualifiedName();
        }, Collectors.mapping((v0) -> {
            return v0.schema();
        }, Collectors.toSet())));
    }

    private Set<DataAccessField> convertFieldNamesToLegal(Set<DataAccessField> set) {
        return (Set) set.stream().map(this.sanitizeField).collect(Collectors.toSet());
    }

    private Map<String, String> getColumnMappingsFor(Collection<DataAccessField> collection, ResourceQueryData resourceQueryData) {
        TreeMap treeMap = new TreeMap();
        for (DataAccessField dataAccessField : collection) {
            getColumn(dataAccessField, resourceQueryData).ifPresent(column -> {
            });
        }
        return treeMap;
    }

    private Optional<Column> getColumn(DataAccessField dataAccessField, ResourceQueryData resourceQueryData) {
        return (dataAccessField.getFieldName().equals(SystemFields.NXC_EXTR_VARIABLE_NAME.getValue()) && resourceQueryData.getVariableName().isPresent()) ? createSparkColumn(dataAccessField, convertToStringDataValue(resourceQueryData.getVariableName().orElseThrow(AssertionError::new))) : resourceQueryData.getFields().stream().anyMatch(dataAccessField2 -> {
            return dataAccessField2.getFieldName().equals(dataAccessField.getFieldName());
        }) ? createSparkColumn(dataAccessField, dataAccessField.getFieldName()) : resourceQueryData.getFields().stream().noneMatch(dataAccessField3 -> {
            return dataAccessField3.hasAlias() && dataAccessField3.getAlias().equals(dataAccessField.getAlias());
        }) ? createSparkColumn(dataAccessField, convertToStringDataValue(null)) : Optional.empty();
    }

    private String convertToStringDataValue(String str) {
        return str == null ? "null" : "\"" + str + "\"";
    }

    private Optional<Column> createSparkColumn(DataAccessField dataAccessField, String str) {
        return Optional.of(new Column(str).cast(dataAccessField.getDataType()).alias(desanitizeIfNeeded(dataAccessField.getQualifiedName())));
    }

    private String desanitizeIfNeeded(String str) {
        return IllegalCharacterConverter.isEncoded(str) ? IllegalCharacterConverter.get().convertFromLegal(str) : str;
    }
}
