package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Monitors the flow for activity and sends out an indicator when the flow has not had any data for some specified amount of time and again when the flow's activity is restored")
@WritesAttributes({@WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"), @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
@Stateful(scopes = {Scope.CLUSTER, Scope.LOCAL}, description = "MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide. If 'Copy Attribute' is set to true, then flow file attributes are also persisted. In local scope, it stores last known activity timestamp if the flow is inactive.")
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"monitor", "flow", "active", "inactive", "activity", "detection"})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/standard/MonitorActivity.class */
public class MonitorActivity extends AbstractProcessor {
    public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO = "CommonFlowActivityInfo.lastSuccessfulTransfer";
    public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = "LocalFlowActivityInfo.lastSuccessfulTransfer";
    public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
    public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
    public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
    public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary");
    public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder().name("Threshold Duration").description("Determines how much time must elapse before considering the flow to be inactive").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("5 min").build();
    public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder().name("Continually Send Messages").description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder().name("Activity Restored Message").description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder().name("Wait for Activity").description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. Otherwise send an inactive indicator even if there had not been activity beforehand.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor RESET_STATE_ON_RESTART = new PropertyDescriptor.Builder().name("Reset State on Restart").description("When the processor gets started or restarted, if set to true, the initial state will always be active. Otherwise, the last reported flow state will be preserved.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder().name("Inactivity Message").description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Lacking activity as of time: ${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder().name("Copy Attributes").description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder().name("Monitoring Scope").description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately. It can be useful if DFM expects each node should receive flow files in a distributed manner. With 'cluster', it defines the flow is active while at least one node receives flow files actively. If NiFi is running as standalone mode, this should be set as 'node', if it's 'cluster', NiFi logs a warning message and act as 'node' scope.").required(true).allowableValues(new DescribedValue[]{SCOPE_NODE, SCOPE_CLUSTER}).defaultValue(SCOPE_NODE.getValue()).build();
    public static final PropertyDescriptor REPORTING_NODE = new PropertyDescriptor.Builder().name("Reporting Node").description("Specify which node should send notification flow-files to inactive and activity.restored relationships. With 'all', every node in this cluster send notification flow-files. 'primary' means flow-files will be sent only from a primary node. If NiFi is running as standalone mode, this should be set as 'all', even if it's 'primary', NiFi act as 'all'.").required(true).allowableValues(new DescribedValue[]{REPORT_NODE_ALL, REPORT_NODE_PRIMARY}).dependsOn(MONITORING_SCOPE, new AllowableValue[]{SCOPE_CLUSTER}).defaultValue(REPORT_NODE_ALL.getValue()).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(THRESHOLD, CONTINUALLY_SEND_MESSAGES, INACTIVITY_MESSAGE, ACTIVITY_RESTORED_MESSAGE, WAIT_FOR_ACTIVITY, RESET_STATE_ON_RESTART, COPY_ATTRIBUTES, MONITORING_SCOPE, REPORTING_NODE);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_INACTIVE, REL_ACTIVITY_RESTORED);
    private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
    private final AtomicLong lastInactiveMessage = new AtomicLong();
    private final AtomicLong inactivityStartMillis = new AtomicLong(nowMillis());
    private final AtomicBoolean wasActive = new AtomicBoolean(true);
    private volatile LocalFlowActivityInfo localFlowActivityInfo;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/MonitorActivity$CommonFlowActivityInfo.class */
    public static class CommonFlowActivityInfo {
        private final StateManager stateManager;
        private final StateMap storedState;
        private final Map<String, String> newState = new HashMap();

        public CommonFlowActivityInfo(ProcessContext processContext) {
            this.stateManager = processContext.getStateManager();
            try {
                this.storedState = this.stateManager.getState(Scope.CLUSTER);
            } catch (IOException e) {
                throw new ProcessException("Cannot load common flow activity info.", e);
            }
        }

        public boolean hasSuccessfulTransfer() {
            return this.storedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO) != null;
        }

        public long getLastSuccessfulTransfer() {
            return Long.parseLong(this.storedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
        }

        public Map<String, String> getLastSuccessfulTransferAttributes() {
            HashMap hashMap = new HashMap(this.storedState.toMap());
            hashMap.remove(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
            return hashMap;
        }

        public void update(LocalFlowActivityInfo localFlowActivityInfo) {
            if (localFlowActivityInfo.hasSuccessfulTransfer()) {
                long lastSuccessfulTransfer = localFlowActivityInfo.getLastSuccessfulTransfer();
                if (!hasSuccessfulTransfer() || lastSuccessfulTransfer > getLastSuccessfulTransfer()) {
                    this.newState.putAll(localFlowActivityInfo.getLastSuccessfulTransferAttributes());
                    this.newState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(lastSuccessfulTransfer));
                    try {
                        if (!this.stateManager.replace(this.storedState, this.newState, Scope.CLUSTER)) {
                            throw new SaveSharedFlowStateException("Failed to save state. Probably there was a concurrent update.");
                        }
                    } catch (IOException e) {
                        throw new SaveSharedFlowStateException("Caught exception while saving state.", e);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/MonitorActivity$LocalFlowActivityInfo.class */
    public class LocalFlowActivityInfo {
        private static final long NO_VALUE = 0;
        private static final int TIMES_SYNC_WITHIN_THRESHOLD = 3;
        private final long startupTimeMillis;
        private final long thresholdMillis;
        private final boolean saveAttributes;
        private final long syncPeriodMillis;
        private long nextSyncMillis;
        private long lastSuccessfulTransfer;
        private Map<String, String> lastSuccessfulTransferAttributes;

        public LocalFlowActivityInfo(long j, long j2, boolean z) {
            this.nextSyncMillis = NO_VALUE;
            this.lastSuccessfulTransfer = NO_VALUE;
            this.lastSuccessfulTransferAttributes = new HashMap();
            this.startupTimeMillis = j;
            this.thresholdMillis = j2;
            this.saveAttributes = z;
            this.syncPeriodMillis = j2 / 3;
        }

        public LocalFlowActivityInfo(MonitorActivity monitorActivity, long j, long j2, boolean z, long j3) {
            this(j, j2, z);
            this.lastSuccessfulTransfer = j3;
        }

        public boolean syncNeeded() {
            return this.nextSyncMillis <= MonitorActivity.this.nowMillis();
        }

        public void setNextSyncMillis() {
            this.nextSyncMillis = MonitorActivity.this.nowMillis() + this.syncPeriodMillis;
        }

        public void forceSync() {
            this.nextSyncMillis = MonitorActivity.this.nowMillis();
        }

        public boolean isActive() {
            return hasSuccessfulTransfer() ? MonitorActivity.this.nowMillis() < this.lastSuccessfulTransfer + this.thresholdMillis : MonitorActivity.this.nowMillis() < this.startupTimeMillis + this.thresholdMillis;
        }

        public boolean hasSuccessfulTransfer() {
            return this.lastSuccessfulTransfer != NO_VALUE;
        }

        public long getLastSuccessfulTransfer() {
            return this.lastSuccessfulTransfer;
        }

        public long getLastActivity() {
            return hasSuccessfulTransfer() ? this.lastSuccessfulTransfer : this.startupTimeMillis;
        }

        public Map<String, String> getLastSuccessfulTransferAttributes() {
            return this.lastSuccessfulTransferAttributes;
        }

        public void update(FlowFile flowFile) {
            long nowMillis = MonitorActivity.this.nowMillis();
            if (nowMillis - getLastActivity() > this.syncPeriodMillis) {
                forceSync();
            }
            this.lastSuccessfulTransfer = nowMillis;
            if (this.saveAttributes) {
                this.lastSuccessfulTransferAttributes = new HashMap(flowFile.getAttributes());
                this.lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
            }
        }

        public void update(CommonFlowActivityInfo commonFlowActivityInfo) {
            if (commonFlowActivityInfo.hasSuccessfulTransfer()) {
                long lastSuccessfulTransfer = commonFlowActivityInfo.getLastSuccessfulTransfer();
                if (lastSuccessfulTransfer <= getLastSuccessfulTransfer()) {
                    return;
                }
                this.lastSuccessfulTransfer = lastSuccessfulTransfer;
                if (this.saveAttributes) {
                    this.lastSuccessfulTransferAttributes = commonFlowActivityInfo.getLastSuccessfulTransferAttributes();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/MonitorActivity$SaveSharedFlowStateException.class */
    public static class SaveSharedFlowStateException extends ProcessException {
        public SaveSharedFlowStateException(String str) {
            super(str);
        }

        public SaveSharedFlowStateException(String str, Throwable th) {
            super(str, th);
        }
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        isClusterScope(processContext, true);
        long longValue = processContext.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        boolean booleanValue = processContext.getProperty(COPY_ATTRIBUTES).asBoolean().booleanValue();
        String tryLoadLastSuccessfulTransfer = processContext.getProperty(RESET_STATE_ON_RESTART).asBoolean().booleanValue() ? null : tryLoadLastSuccessfulTransfer(processContext);
        if (tryLoadLastSuccessfulTransfer == null) {
            this.localFlowActivityInfo = new LocalFlowActivityInfo(getStartupTime(), longValue, booleanValue);
            this.wasActive.set(true);
        } else {
            this.localFlowActivityInfo = new LocalFlowActivityInfo(this, getStartupTime(), longValue, booleanValue, Long.parseLong(tryLoadLastSuccessfulTransfer));
            this.wasActive.set(this.localFlowActivityInfo.isActive());
            this.inactivityStartMillis.set(this.localFlowActivityInfo.getLastActivity());
        }
    }

    @OnStopped
    public void onStopped(ProcessContext processContext) {
        if (getNodeTypeProvider().isConfiguredForClustering() && processContext.isConnectedToCluster()) {
            try {
                processContext.getStateManager().clear(Scope.CLUSTER);
            } catch (IOException e) {
                getLogger().error("Failed to clear cluster state", e);
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        ComponentLog logger = getLogger();
        boolean isClusterScope = isClusterScope(processContext, false);
        boolean isConnectedToCluster = processContext.isConnectedToCluster();
        boolean z = this.wasActive.get();
        List list = processSession.get(50);
        if (list.isEmpty()) {
            processContext.yield();
        } else {
            boolean z2 = !z || (!this.localFlowActivityInfo.hasSuccessfulTransfer());
            this.localFlowActivityInfo.update((FlowFile) list.getFirst());
            if (isClusterScope && z2) {
                this.localFlowActivityInfo.forceSync();
            }
            processSession.transfer(list, REL_SUCCESS);
            logger.info("Transferred {} FlowFiles to 'success'", new Object[]{Integer.valueOf(list.size())});
        }
        if (isClusterScope) {
            if (!z || !this.localFlowActivityInfo.isActive()) {
                this.localFlowActivityInfo.forceSync();
            }
            synchronizeState(processContext);
        }
        long longValue = processContext.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        boolean booleanValue = processContext.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean().booleanValue();
        boolean booleanValue2 = processContext.getProperty(WAIT_FOR_ACTIVITY).asBoolean().booleanValue();
        boolean z3 = this.localFlowActivityInfo.isActive() || !list.isEmpty();
        long lastActivity = this.localFlowActivityInfo.getLastActivity();
        long j = this.inactivityStartMillis.get();
        boolean z4 = this.lastInactiveMessage.get() + longValue <= nowMillis();
        boolean z5 = (isClusterScope && !isConnectedToCluster && list.isEmpty()) ? false : true;
        boolean z6 = !booleanValue2 || this.localFlowActivityInfo.hasSuccessfulTransfer();
        if (!z5 || !z6) {
            logger.trace("State transition is blocked, because we are not connected to the cluster.");
            return;
        }
        if (z3) {
            onTriggerActiveFlow(processContext, processSession, z, isClusterScope, j);
        } else if (z || (booleanValue && z4)) {
            onTriggerInactiveFlow(processContext, processSession, isClusterScope, lastActivity);
        }
        this.wasActive.set(z3);
        this.inactivityStartMillis.set(lastActivity);
    }

    protected long nowMillis() {
        return System.currentTimeMillis();
    }

    protected long getStartupTime() {
        return nowMillis();
    }

    protected final long getLastSuccessfulTransfer() {
        return this.localFlowActivityInfo.getLastSuccessfulTransfer();
    }

    private String tryLoadLastSuccessfulTransfer(ProcessContext processContext) {
        try {
            return processContext.getStateManager().getState(Scope.LOCAL).get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
        } catch (IOException e) {
            throw new ProcessException("Failed to load local state due to " + String.valueOf(e), e);
        }
    }

    private void synchronizeState(ProcessContext processContext) {
        ComponentLog logger = getLogger();
        boolean isConnectedToCluster = processContext.isConnectedToCluster();
        if (isReconnectedToCluster(isConnectedToCluster)) {
            this.localFlowActivityInfo.forceSync();
            this.connectedWhenLastTriggered.set(true);
        }
        if (!isConnectedToCluster) {
            this.connectedWhenLastTriggered.set(false);
            return;
        }
        if (this.localFlowActivityInfo.syncNeeded()) {
            CommonFlowActivityInfo commonFlowActivityInfo = new CommonFlowActivityInfo(processContext);
            this.localFlowActivityInfo.update(commonFlowActivityInfo);
            try {
                commonFlowActivityInfo.update(this.localFlowActivityInfo);
                this.localFlowActivityInfo.setNextSyncMillis();
            } catch (SaveSharedFlowStateException e) {
                logger.debug("Failed to update common state.", e);
            }
        }
    }

    private void onTriggerInactiveFlow(ProcessContext processContext, ProcessSession processSession, boolean z, long j) {
        ComponentLog logger = getLogger();
        if (shouldThisNodeReport(z, processContext)) {
            sendInactivityMarker(processContext, processSession, j, logger);
        }
        this.lastInactiveMessage.set(nowMillis());
        setInactivityFlag(processContext.getStateManager());
    }

    private void onTriggerActiveFlow(ProcessContext processContext, ProcessSession processSession, boolean z, boolean z2, long j) {
        ComponentLog logger = getLogger();
        boolean shouldThisNodeReport = shouldThisNodeReport(z2, processContext);
        if (z) {
            return;
        }
        if (shouldThisNodeReport) {
            sendActivationMarker(processContext, processSession, this.localFlowActivityInfo.getLastSuccessfulTransferAttributes(), j, logger);
        }
        clearInactivityFlag(processContext.getStateManager());
    }

    private void setInactivityFlag(StateManager stateManager) {
        try {
            stateManager.setState(Collections.singletonMap(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, String.valueOf(this.localFlowActivityInfo.getLastActivity())), Scope.LOCAL);
        } catch (IOException e) {
            getLogger().error("Failed to set local state", e);
        }
    }

    private void clearInactivityFlag(StateManager stateManager) {
        try {
            stateManager.clear(Scope.LOCAL);
        } catch (IOException e) {
            throw new ProcessException("Failed to clear local state due to " + String.valueOf(e), e);
        }
    }

    private boolean isClusterScope(ProcessContext processContext, boolean z) {
        if (!SCOPE_CLUSTER.getValue().equals(processContext.getProperty(MONITORING_SCOPE).getValue())) {
            return false;
        }
        if (getNodeTypeProvider().isConfiguredForClustering()) {
            return true;
        }
        if (!z) {
            return false;
        }
        getLogger().warn("NiFi is running as a Standalone mode, but 'cluster' scope is set. Fallback to 'node' scope. Fix configuration to stop this message.");
        return false;
    }

    private boolean shouldReportOnlyOnPrimary(boolean z, ProcessContext processContext) {
        if (REPORT_NODE_PRIMARY.getValue().equals(processContext.getProperty(REPORTING_NODE).getValue())) {
            return z;
        }
        return false;
    }

    private boolean isReconnectedToCluster(boolean z) {
        return !this.connectedWhenLastTriggered.get() && z;
    }

    private boolean shouldThisNodeReport(boolean z, ProcessContext processContext) {
        return (z && shouldReportOnlyOnPrimary(z, processContext) && !getNodeTypeProvider().isPrimary()) ? false : true;
    }

    private void sendInactivityMarker(ProcessContext processContext, ProcessSession processSession, long j, ComponentLog componentLog) {
        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.create(), "inactivityStartMillis", String.valueOf(j)), "inactivityDurationMillis", String.valueOf(nowMillis() - j));
        byte[] bytes = processContext.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(putAttribute).getValue().getBytes(StandardCharsets.UTF_8);
        FlowFile write = processSession.write(putAttribute, outputStream -> {
            outputStream.write(bytes);
        });
        processSession.getProvenanceReporter().create(write);
        processSession.transfer(write, REL_INACTIVE);
        componentLog.info("Transferred {} to 'inactive'", new Object[]{write});
    }

    private void sendActivationMarker(ProcessContext processContext, ProcessSession processSession, Map<String, String> map, long j, ComponentLog componentLog) {
        FlowFile create = processSession.create();
        map.remove(CoreAttributes.UUID.key());
        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.putAllAttributes(create, map), "inactivityStartMillis", String.valueOf(j)), "inactivityDurationMillis", String.valueOf(nowMillis() - j));
        byte[] bytes = processContext.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(putAttribute).getValue().getBytes(StandardCharsets.UTF_8);
        FlowFile write = processSession.write(putAttribute, outputStream -> {
            outputStream.write(bytes);
        });
        processSession.getProvenanceReporter().create(write);
        processSession.transfer(write, REL_ACTIVITY_RESTORED);
        componentLog.info("Transferred {} to 'activity.restored'", new Object[]{write});
    }
}
