package org.apache.nifi.processor.util.bin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

/* loaded from: input_file:org/apache/nifi/processor/util/bin/BinFiles.class */
public abstract class BinFiles extends AbstractSessionFactoryProcessor {
    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder().name("Minimum Group Size").description("The minimum size for the bundle").required(true).defaultValue("0 B").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder().name("Maximum Group Size").description("The maximum size for the bundle. If not specified, there is no maximum.").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder().name("Minimum Number of Entries").description("The minimum number of files to include in a bundle").required(true).defaultValue("1").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder().name("Maximum Number of Entries").description("The maximum number of files to include in a bundle").defaultValue("1000").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder().name("Maximum number of Bins").description("Specifies the maximum number of bins that can be held in memory at any one time").defaultValue("5").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder().name("Max Bin Age").description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours").required(false).addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, 2147483647L, TimeUnit.SECONDS)).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
    private final BinManager binManager = new BinManager();
    private final Queue<Bin> readyBins = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processor/util/bin/BinFiles$BinningResult.class */
    public static class BinningResult {
        private final int flowFilesBinned;
        private final boolean newBinNeeded;
        public static BinningResult EMPTY = new BinningResult(0, false);

        public BinningResult(int i, boolean z) {
            this.flowFilesBinned = i;
            this.newBinNeeded = z;
        }

        public int getFlowFilesBinned() {
            return this.flowFilesBinned;
        }

        public boolean isNewBinNeeded() {
            return this.newBinNeeded;
        }
    }

    @OnStopped
    public final void resetState() {
        this.binManager.purge();
        while (true) {
            Bin poll = this.readyBins.poll();
            if (poll == null) {
                return;
            } else {
                poll.getSession().rollback();
            }
        }
    }

    protected abstract FlowFile preprocessFlowFile(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile);

    protected abstract String getGroupId(ProcessContext processContext, FlowFile flowFile, ProcessSession processSession);

    protected abstract void setUpBinManager(BinManager binManager, ProcessContext processContext);

    protected abstract BinProcessingResult processBin(Bin bin, ProcessContext processContext) throws ProcessException;

    protected Collection<ValidationResult> additionalCustomValidation(ValidationContext validationContext) {
        return new ArrayList();
    }

    public final void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        BinningResult binningResult;
        int binCount = this.binManager.getBinCount() + this.readyBins.size();
        if (binCount <= processContext.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
            binningResult = binFlowFiles(processContext, processSessionFactory);
            getLogger().debug("Binned {} FlowFiles", new Object[]{Integer.valueOf(binningResult.getFlowFilesBinned())});
        } else {
            binningResult = BinningResult.EMPTY;
            getLogger().debug("Will not bin any FlowFiles because {} bins already exist; will wait until bins have been emptied before any more are created", new Object[]{Integer.valueOf(binCount)});
        }
        if (isScheduled()) {
            int migrateBins = migrateBins(processContext, binningResult.getFlowFilesBinned() == 0, binningResult.isNewBinNeeded());
            int processBins = processBins(processContext, processSessionFactory);
            if (binningResult.getFlowFilesBinned() == 0 && migrateBins == 0 && processBins == 0) {
                processContext.yield();
            }
        }
    }

    private int migrateBins(ProcessContext processContext, boolean z, boolean z2) {
        Bin removeOldestBin;
        int i = 0;
        Iterator<Bin> it = this.binManager.removeReadyBins(z).iterator();
        while (it.hasNext()) {
            this.readyBins.add(it.next());
            i++;
        }
        int binCount = this.binManager.getBinCount();
        int intValue = processContext.getProperty(MAX_BIN_COUNT).asInteger().intValue();
        if (i == 0 && ((binCount > intValue || (binCount == intValue && z2)) && (removeOldestBin = this.binManager.removeOldestBin()) != null)) {
            i++;
            removeOldestBin.setEvictionReason(EvictionReason.BIN_MANAGER_FULL);
            this.readyBins.add(removeOldestBin);
        }
        return i;
    }

    protected Queue<Bin> getReadyBins() {
        return this.readyBins;
    }

    protected int processBins(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        Bin poll;
        ComponentLog logger = getLogger();
        int i = 0;
        while (isScheduled() && (poll = this.readyBins.poll()) != null) {
            try {
                BinProcessingResult processBin = processBin(poll, processContext);
                if (!processBin.isCommitted()) {
                    ProcessSession session = poll.getSession();
                    poll.getContents().forEach(flowFile -> {
                        session.putAllAttributes(flowFile, processBin.getAttributes());
                    });
                    session.transfer(poll.getContents(), REL_ORIGINAL);
                    session.commitAsync();
                }
                i++;
            } catch (ProcessException e) {
                logger.error("Failed to process bundle of {} files", new Object[]{Integer.valueOf(poll.getContents().size()), e});
                ProcessSession session2 = poll.getSession();
                Iterator<FlowFile> it = poll.getContents().iterator();
                while (it.hasNext()) {
                    session2.transfer(it.next(), REL_FAILURE);
                }
                session2.commitAsync();
            } catch (Exception e2) {
                logger.error("Failed to process bundle of {} files; rolling back sessions", new Object[]{Integer.valueOf(poll.getContents().size()), e2});
                poll.getSession().rollback();
            }
        }
        return i;
    }

    private BinningResult binFlowFiles(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        int i = 0;
        ProcessSession createSession = processSessionFactory.createSession();
        int intValue = processContext.getProperty(MAX_BIN_COUNT).asInteger().intValue();
        boolean z = false;
        while (this.binManager.getBinCount() <= intValue && isScheduled()) {
            List list = createSession.get(1000);
            if (list.isEmpty()) {
                break;
            }
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                FlowFile preprocessFlowFile = preprocessFlowFile(processContext, createSession, (FlowFile) it.next());
                try {
                    ((List) linkedHashMap.computeIfAbsent(getGroupId(processContext, preprocessFlowFile, createSession), str -> {
                        return new ArrayList();
                    })).add(preprocessFlowFile);
                } catch (Exception e) {
                    getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[]{preprocessFlowFile, e});
                    createSession.transfer(preprocessFlowFile, REL_FAILURE);
                    createSession.commitAsync();
                }
            }
            for (Map.Entry entry : linkedHashMap.entrySet()) {
                Set<FlowFile> offer = this.binManager.offer((String) entry.getKey(), (Collection<FlowFile>) entry.getValue(), createSession, processSessionFactory);
                if (!offer.isEmpty()) {
                    z = true;
                }
                for (FlowFile flowFile : offer) {
                    Bin bin = new Bin(processSessionFactory.createSession(), 0L, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
                    bin.offer(flowFile, createSession);
                    this.readyBins.add(bin);
                }
                i += ((List) entry.getValue()).size();
            }
        }
        return new BinningResult(i, z);
    }

    @OnScheduled
    public final void onScheduled(ProcessContext processContext) throws IOException {
        this.binManager.setMinimumSize(getMinBytes(processContext));
        this.binManager.setMaximumSize(getMaxBytes(processContext));
        this.binManager.setMaxBinAge(getMaxBinAgeSeconds(processContext));
        this.binManager.setMinimumEntries(getMinEntries(processContext));
        this.binManager.setMaximumEntries(getMaxEntries(processContext));
        setUpBinManager(this.binManager, processContext);
    }

    protected int getMaxBinAgeSeconds(PropertyContext propertyContext) {
        if (propertyContext.getProperty(MAX_BIN_AGE).isSet()) {
            return propertyContext.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue();
        }
        return Integer.MAX_VALUE;
    }

    protected long getMinBytes(PropertyContext propertyContext) {
        return propertyContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
    }

    protected long getMaxBytes(PropertyContext propertyContext) {
        if (propertyContext.getProperty(MAX_SIZE).isSet()) {
            return propertyContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue();
        }
        return Long.MAX_VALUE;
    }

    protected int getMinEntries(PropertyContext propertyContext) {
        return propertyContext.getProperty(MIN_ENTRIES).asInteger().intValue();
    }

    protected int getMaxEntries(PropertyContext propertyContext) {
        if (propertyContext.getProperty(MAX_ENTRIES).isSet()) {
            return propertyContext.getProperty(MAX_ENTRIES).asInteger().intValue();
        }
        return Integer.MAX_VALUE;
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        if (getMaxBytes(validationContext) < getMinBytes(validationContext)) {
            arrayList.add(new ValidationResult.Builder().subject(MIN_SIZE.getName()).input(validationContext.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size must be less than or equal to Max Size").build());
        }
        if (getMinEntries(validationContext) > getMaxEntries(validationContext)) {
            arrayList.add(new ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(validationContext.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min Entries must be less than or equal to Max Entries").build());
        }
        Collection<ValidationResult> additionalCustomValidation = additionalCustomValidation(validationContext);
        if (additionalCustomValidation != null) {
            arrayList.addAll(additionalCustomValidation);
        }
        return arrayList;
    }
}
