package org.apache.nifi.processors.influxdb;

import java.io.ByteArrayOutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBIOException;

@CapabilityDescription("Processor to write the content of a FlowFile in 'line protocol'.  Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/).   The flow file can contain single measurement point or multiple measurement points separated by line seperator.  The timestamp (last field) should be in nano-seconds resolution.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"influxdb", "measurement", "insert", "write", "put", "timeseries"})
@WritesAttributes({@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message")})
/* loaded from: input_file:org/apache/nifi/processors/influxdb/PutInfluxDB.class */
public class PutInfluxDB extends AbstractInfluxDBProcessor {
    public static AllowableValue CONSISTENCY_LEVEL_ALL = new AllowableValue("ALL", "All", "Return success when all nodes have responded with write success");
    public static AllowableValue CONSISTENCY_LEVEL_ANY = new AllowableValue("ANY", "Any", "Return success when any nodes have responded with write success");
    public static AllowableValue CONSISTENCY_LEVEL_ONE = new AllowableValue("ONE", "One", "Return success when one node has responded with write success");
    public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes have responded with write success");
    public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder().name("influxdb-consistency-level").displayName("Consistency Level").description("InfluxDB consistency level").required(true).defaultValue(CONSISTENCY_LEVEL_ONE.getValue()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).allowableValues(new AllowableValue[]{CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM}).build();
    public static final PropertyDescriptor RETENTION_POLICY = new PropertyDescriptor.Builder().name("influxdb-retention-policy").displayName("Retention Policy").description("Retention policy for the saving the records").defaultValue("autogen").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successful FlowFiles that are saved to InfluxDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles were not saved to InfluxDB are routed to this relationship").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles were not saved to InfluxDB due to retryable exception are routed to this relationship").build();
    static final Relationship REL_MAX_SIZE_EXCEEDED = new Relationship.Builder().name("failure-max-size").description("FlowFiles exceeding max records size are routed to this relationship").build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.influxdb.AbstractInfluxDBProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.maxRecordsSize = processContext.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        if (flowFile.getSize() == 0) {
            getLogger().error("Empty measurements");
            processSession.transfer(processSession.putAttribute(flowFile, AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, "Empty measurement size " + flowFile.getSize()), REL_FAILURE);
            return;
        }
        if (flowFile.getSize() > this.maxRecordsSize) {
            getLogger().error("Message size of records exceeded {} max allowed is {}", new Object[]{Long.valueOf(flowFile.getSize()), Long.valueOf(this.maxRecordsSize)});
            processSession.transfer(processSession.putAttribute(flowFile, AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, "Max records size exceeded " + flowFile.getSize()), REL_MAX_SIZE_EXCEEDED);
            return;
        }
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        String value = processContext.getProperty(CONSISTENCY_LEVEL).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(RETENTION_POLICY).evaluateAttributeExpressions(flowFile).getValue();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            processSession.exportTo(flowFile, byteArrayOutputStream);
            String str = new String(byteArrayOutputStream.toByteArray(), forName);
            writeToInfluxDB(processContext, value, value2, value3, str);
            long currentTimeMillis2 = System.currentTimeMillis();
            getLogger().debug("Records {} inserted", new Object[]{str});
            processSession.transfer(flowFile, REL_SUCCESS);
            processSession.getProvenanceReporter().send(flowFile, "influxdb://" + processContext.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue() + "/" + value2, currentTimeMillis2 - currentTimeMillis);
        } catch (Exception e) {
            getLogger().error("Failed to insert into influxDB due to {}", new Object[]{e.getLocalizedMessage(), e});
            processSession.transfer(processSession.putAttribute(flowFile, AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, String.valueOf(e.getMessage())), REL_FAILURE);
            processContext.yield();
        } catch (InfluxDBIOException e2) {
            FlowFile putAttribute = processSession.putAttribute(flowFile, AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, String.valueOf(e2.getMessage()));
            if (e2.getCause() instanceof SocketTimeoutException) {
                getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying", new Object[]{e2.getLocalizedMessage(), e2});
                processSession.transfer(putAttribute, REL_RETRY);
            } else {
                getLogger().error("Failed to insert into influxDB due to {}", new Object[]{e2.getLocalizedMessage(), e2});
                processSession.transfer(putAttribute, REL_FAILURE);
            }
            processContext.yield();
        }
    }

    protected void writeToInfluxDB(ProcessContext processContext, String str, String str2, String str3, String str4) {
        getInfluxDB(processContext).write(str2, str3, InfluxDB.ConsistencyLevel.valueOf(str), str4);
    }

    @Override // org.apache.nifi.processors.influxdb.AbstractInfluxDBProcessor
    @OnStopped
    public void close() {
        super.close();
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        hashSet.add(REL_MAX_SIZE_EXCEEDED);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList();
        arrayList.add(DB_NAME);
        arrayList.add(INFLUX_DB_URL);
        arrayList.add(INFLUX_DB_CONNECTION_TIMEOUT);
        arrayList.add(USERNAME);
        arrayList.add(PASSWORD);
        arrayList.add(CHARSET);
        arrayList.add(CONSISTENCY_LEVEL);
        arrayList.add(RETENTION_POLICY);
        arrayList.add(MAX_RECORDS_SIZE);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
