package cern.c2mon.daq.opcua.connection;

import cern.c2mon.daq.opcua.MessageSender;
import cern.c2mon.daq.opcua.config.AppConfig;
import cern.c2mon.daq.opcua.config.AppConfigProperties;
import cern.c2mon.daq.opcua.exceptions.CommunicationException;
import cern.c2mon.daq.opcua.exceptions.ConfigurationException;
import cern.c2mon.daq.opcua.exceptions.EndpointDisconnectedException;
import cern.c2mon.daq.opcua.exceptions.ExceptionContext;
import cern.c2mon.daq.opcua.exceptions.OPCUAException;
import cern.c2mon.daq.opcua.mapping.ItemDefinition;
import cern.c2mon.daq.opcua.mapping.SubscriptionGroup;
import cern.c2mon.daq.opcua.mapping.TagSubscriptionReader;
import cern.c2mon.shared.common.datatag.SourceDataTagQuality;
import cern.c2mon.shared.common.datatag.ValueUpdate;
import cern.c2mon.shared.common.datatag.util.SourceDataTagQualityCode;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import io.micrometer.core.annotation.Timed;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.api.UaSession;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.sdk.client.model.nodes.objects.ServerRedundancyTypeNode;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscriptionManager;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
import org.eclipse.milo.opcua.stack.core.types.enumerated.DataChangeTrigger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
import org.eclipse.milo.opcua.stack.core.types.structured.CallMethodRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.CallMethodResult;
import org.eclipse.milo.opcua.stack.core.types.structured.DataChangeFilter;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:cern/c2mon/daq/opcua/connection/MiloEndpoint.class */
public class MiloEndpoint implements Endpoint, SessionActivityListener, UaSubscriptionManager.SubscriptionListener {
    private static final Logger log = LoggerFactory.getLogger(MiloEndpoint.class);
    private final SecurityModule securityModule;
    private final TagSubscriptionReader mapper;
    private final MessageSender messageSender;
    private final AppConfigProperties properties;
    private final AppConfig config;
    private OpcUaClient client;
    private boolean updateEquipmentStateOnSessionChanges;
    private String uri;
    private final AtomicLong disconnectedOn = new AtomicLong(0);
    private final BiMap<Integer, UaSubscription> subscriptionMap = HashBiMap.create();
    private final Collection<SessionActivityListener> sessionActivityListeners = new ArrayList();
    private MonitoringMode mode = MonitoringMode.Reporting;

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    @Timed("connect")
    public void initialize(String str) throws OPCUAException {
        log.info("Initializing Endpoint at {}", str);
        this.disconnectedOn.set(0L);
        this.uri = str;
        this.client = this.securityModule.createClient(str, (Collection) processSupplier(ExceptionContext.CONNECT, () -> {
            return DiscoveryClient.getEndpoints(str);
        }));
        this.client.addSessionActivityListener(this);
        this.sessionActivityListeners.add(this);
        OpcUaSubscriptionManager subscriptionManager = this.client.getSubscriptionManager();
        subscriptionManager.addSubscriptionListener(this);
        subscriptionManager.resumeDelivery();
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public void manageSessionActivityListener(boolean z, SessionActivityListener sessionActivityListener) {
        if (z && !this.sessionActivityListeners.contains(sessionActivityListener)) {
            this.sessionActivityListeners.add(sessionActivityListener);
            this.client.addSessionActivityListener(sessionActivityListener);
            if (this.disconnectedOn.get() <= 1) {
                sessionActivityListener.onSessionActive((UaSession) null);
                return;
            }
            return;
        }
        if (z || !this.sessionActivityListeners.contains(sessionActivityListener)) {
            log.info("Nothing to do.");
        } else {
            this.sessionActivityListeners.remove(sessionActivityListener);
            this.client.removeSessionActivityListener(sessionActivityListener);
        }
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public void setUpdateEquipmentStateOnSessionChanges(boolean z) {
        this.updateEquipmentStateOnSessionChanges = z;
        if (!z || this.disconnectedOn.get() > 1) {
            return;
        }
        this.messageSender.onEquipmentStateUpdate(MessageSender.EquipmentState.OK);
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public void disconnect() {
        log.info("Disconnecting endpoint at {}", this.uri);
        if (this.client != null) {
            try {
                this.client.getSubscriptionManager().clearSubscriptions();
                this.client.getSubscriptionManager().removeSubscriptionListener(this);
                this.sessionActivityListeners.forEach(sessionActivityListener -> {
                    this.client.removeSessionActivityListener(sessionActivityListener);
                });
                ExceptionContext exceptionContext = ExceptionContext.DISCONNECT;
                OpcUaClient opcUaClient = this.client;
                Objects.requireNonNull(opcUaClient);
                retryOnConnection(exceptionContext, opcUaClient::disconnect);
            } catch (OPCUAException e) {
                log.debug("Disconnection failed with exception: ", e);
                log.error("Error disconnecting from endpoint with uri {}: ", this.uri);
            }
        } else {
            log.info("Client not connected, skipping disconnection attempt.");
        }
        this.sessionActivityListeners.clear();
        this.subscriptionMap.clear();
        this.disconnectedOn.set(-1L);
        this.updateEquipmentStateOnSessionChanges = false;
        log.info("Completed disconnecting endpoint {}", this.uri);
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public Map<Integer, SourceDataTagQuality> subscribe(SubscriptionGroup subscriptionGroup, Collection<ItemDefinition> collection) throws OPCUAException {
        try {
            log.info("Subscribing definitions with publishing interval {}.", Integer.valueOf(subscriptionGroup.getPublishInterval()));
            return subscribeWithCallback(subscriptionGroup.getPublishInterval(), collection, this::defaultSubscriptionCallback);
        } catch (ConfigurationException | EndpointDisconnectedException e) {
            throw e;
        } catch (OPCUAException e2) {
            log.error("Tags with IDs {} could not be subscribed on endpoint with uri {}. ", new Object[]{(String) collection.stream().map(itemDefinition -> {
                return this.mapper.getTagId(itemDefinition.getClientHandle()).toString();
            }).collect(Collectors.joining(", ")), getUri(), e2});
            return (Map) collection.stream().map(itemDefinition2 -> {
                return new AbstractMap.SimpleEntry(Integer.valueOf(itemDefinition2.getClientHandle()), new SourceDataTagQuality(SourceDataTagQualityCode.DATA_UNAVAILABLE));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public void recreateAllSubscriptions() throws CommunicationException {
        boolean z;
        Collection<SubscriptionGroup> collection = (Collection) this.mapper.getGroups().stream().filter(subscriptionGroup -> {
            return subscriptionGroup.size() > 0;
        }).collect(Collectors.toList());
        boolean isEmpty = collection.isEmpty();
        for (SubscriptionGroup subscriptionGroup2 : collection) {
            if (!isEmpty) {
                try {
                } catch (EndpointDisconnectedException e) {
                    log.debug("Failed with exception: ", e);
                    log.info("Session was closed, abort subscription recreation process.");
                    return;
                } catch (OPCUAException e2) {
                    log.info("Could not resubscribe group with time Deadband {}.", Integer.valueOf(subscriptionGroup2.getPublishInterval()), e2);
                }
                if (!resubscribeGroupsAndReportSuccess(subscriptionGroup2)) {
                    z = false;
                    isEmpty = z;
                }
            }
            z = true;
            isEmpty = z;
        }
        if (!isEmpty) {
            log.error("Could not recreate any subscriptions. Connect to next server... ");
            throw new CommunicationException(ExceptionContext.NO_REDUNDANT_SERVER);
        }
        log.info("Recreated subscriptions on server {}.", this.uri);
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public Map<Integer, SourceDataTagQuality> subscribeWithCallback(int i, Collection<ItemDefinition> collection, Consumer<UaMonitoredItem> consumer) throws OPCUAException {
        UaSubscription orCreateSubscription = getOrCreateSubscription(i);
        List list = (List) collection.stream().map(this::toMonitoredItemCreateRequest).collect(Collectors.toList());
        return (Map) ((List) retryOnConnection(ExceptionContext.CREATE_MONITORED_ITEM, () -> {
            return orCreateSubscription.createMonitoredItems(TimestampsToReturn.Both, list, (uaMonitoredItem, num) -> {
                consumer.accept(uaMonitoredItem);
            });
        })).stream().collect(Collectors.toMap(uaMonitoredItem -> {
            return Integer.valueOf(uaMonitoredItem.getClientHandle().intValue());
        }, uaMonitoredItem2 -> {
            return MiloMapper.getDataTagQuality(uaMonitoredItem2.getStatusCode());
        }));
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public boolean deleteItemFromSubscription(int i, int i2) {
        UaSubscription uaSubscription = (UaSubscription) this.subscriptionMap.get(Integer.valueOf(i2));
        if (uaSubscription == null) {
            log.info("Item cannot be mapped to a subscription. Skipping deletion.");
            return false;
        }
        try {
            if (uaSubscription.getMonitoredItems().size() <= 1) {
                deleteSubscription(i2);
                return true;
            }
            List list = (List) uaSubscription.getMonitoredItems().stream().filter(uaMonitoredItem -> {
                return uaMonitoredItem.getClientHandle().intValue() == i;
            }).collect(Collectors.toList());
            return ((List) retryOnConnection(ExceptionContext.DELETE_MONITORED_ITEM, () -> {
                return uaSubscription.deleteMonitoredItems(list);
            })).stream().allMatch((v0) -> {
                return v0.isGood();
            });
        } catch (OPCUAException e) {
            log.error("Tag with ID {} could not be completed successfully on endpoint {}.", new Object[]{this.mapper.getTagId(i), getUri(), e});
            return false;
        }
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public Map.Entry<ValueUpdate, SourceDataTagQuality> read(NodeId nodeId) throws OPCUAException {
        DataValue dataValue = (DataValue) retryOnConnection(ExceptionContext.READ, () -> {
            return this.client.readValue(0.0d, TimestampsToReturn.Both, nodeId);
        });
        if (dataValue == null) {
            throw new ConfigurationException(ExceptionContext.READ);
        }
        return new AbstractMap.SimpleEntry(MiloMapper.toValueUpdate(dataValue, this.properties.getTimeRecordMode()), MiloMapper.getDataTagQuality(dataValue.getStatusCode()));
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public boolean write(NodeId nodeId, Object obj) throws OPCUAException {
        DataValue dataValue = new DataValue(new Variant(obj), (StatusCode) null, (DateTime) null);
        StatusCode statusCode = (StatusCode) retryOnConnection(ExceptionContext.WRITE, () -> {
            return this.client.writeValue(nodeId, dataValue);
        });
        log.info("Writing value {} to node {} yielded status code {}.", new Object[]{obj, nodeId, statusCode});
        return statusCode.isGood();
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public Map.Entry<Boolean, Object[]> callMethod(ItemDefinition itemDefinition, Object obj) throws OPCUAException {
        return itemDefinition.getMethodNodeId() == null ? callMethod(getParentObjectNodeId(itemDefinition.getNodeId()), itemDefinition.getNodeId(), obj) : callMethod(itemDefinition.getNodeId(), itemDefinition.getMethodNodeId(), obj);
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public ServerRedundancyTypeNode getServerRedundancyNode() throws OPCUAException {
        return (ServerRedundancyTypeNode) retryOnConnection(ExceptionContext.SERVER_NODE, () -> {
            return this.client.getAddressSpace().getObjectNode(Identifiers.Server_ServerRedundancy, ServerRedundancyTypeNode.class);
        });
    }

    public void onSessionActive(UaSession uaSession) {
        log.info("Session activated");
        if (this.updateEquipmentStateOnSessionChanges) {
            this.messageSender.onEquipmentStateUpdate(MessageSender.EquipmentState.OK);
        }
        this.disconnectedOn.getAndUpdate(j -> {
            return Math.min(j, 0L);
        });
    }

    public void onSessionInactive(UaSession uaSession) {
        log.info("Session deactivated");
        if (this.updateEquipmentStateOnSessionChanges) {
            this.messageSender.onEquipmentStateUpdate(MessageSender.EquipmentState.CONNECTION_LOST);
        }
        this.disconnectedOn.getAndUpdate(j -> {
            return j < 0 ? j : System.currentTimeMillis();
        });
    }

    public void onStatusChanged(UaSubscription uaSubscription, StatusCode statusCode) {
        log.info("onStatusChanged event for {} : StatusCode {}", uaSubscription.toString(), statusCode);
        if (statusCode.isBad() && statusCode.getValue() == 2148139008L) {
            recreate(uaSubscription);
        }
    }

    public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
        log.info("onSubscriptionTransferFailed event for {} : StatusCode {}", uaSubscription.toString(), statusCode);
        recreate(uaSubscription);
    }

    private NodeId getParentObjectNodeId(NodeId nodeId) throws OPCUAException {
        BrowseDescription browseDescription = new BrowseDescription(nodeId, BrowseDirection.Inverse, Identifiers.References, true, Unsigned.uint(NodeClass.Object.getValue()), Unsigned.uint(BrowseResultMask.All.getValue()));
        BrowseResult browseResult = (BrowseResult) retryOnConnection(ExceptionContext.BROWSE, () -> {
            return this.client.browse(browseDescription);
        });
        if (browseResult.getReferences() != null && browseResult.getReferences().length > 0) {
            Optional local = browseResult.getReferences()[0].getNodeId().local(this.client.getNamespaceTable());
            if (browseResult.getStatusCode().isGood() && local.isPresent()) {
                return (NodeId) local.get();
            }
        }
        throw new ConfigurationException(ExceptionContext.OBJ_INVALID);
    }

    private void deleteSubscription(int i) throws OPCUAException {
        UaSubscription uaSubscription = (UaSubscription) this.subscriptionMap.remove(Integer.valueOf(i));
        if (uaSubscription != null) {
            retryOnConnection(ExceptionContext.DELETE_SUBSCRIPTION, () -> {
                return this.client.getSubscriptionManager().deleteSubscription(uaSubscription.getSubscriptionId());
            });
        }
    }

    private boolean resubscribeGroupsAndReportSuccess(SubscriptionGroup subscriptionGroup) throws OPCUAException {
        Map<Integer, SourceDataTagQuality> subscribe = subscribe(subscriptionGroup, subscriptionGroup.getTagIds().values());
        return subscribe != null && subscribe.values().stream().anyMatch((v0) -> {
            return v0.isValid();
        });
    }

    private Map.Entry<Boolean, Object[]> callMethod(NodeId nodeId, NodeId nodeId2, Object obj) throws OPCUAException {
        Variant[] variantArr = obj == null ? null : new Variant[]{new Variant(obj)};
        CallMethodResult callMethodResult = (CallMethodResult) retryOnConnection(ExceptionContext.METHOD, () -> {
            return this.client.call(new CallMethodRequest(nodeId, nodeId2, variantArr));
        });
        StatusCode statusCode = callMethodResult.getStatusCode();
        log.info("Calling method {} on object {} returned status code {}.", new Object[]{nodeId2, nodeId, statusCode});
        return new AbstractMap.SimpleEntry(Boolean.valueOf(statusCode.isGood()), MiloMapper.toObject(callMethodResult.getOutputArguments()));
    }

    private void recreate(UaSubscription uaSubscription) {
        log.info("Attempt to recreate the subscription.");
        Integer num = (Integer) this.subscriptionMap.inverse().getOrDefault(uaSubscription, -1);
        SubscriptionGroup group = this.mapper.getGroup(num.intValue());
        if (group == null || group.size() == 0) {
            log.info("The subscription cannot be recreated, since it cannot be associated with any DataTags.");
            return;
        }
        try {
            deleteSubscription(num.intValue());
        } catch (OPCUAException e) {
            log.error("Could not delete subscription. Proceed with recreation.", e);
        }
        try {
            this.config.exceptionClassifierTemplate(this.properties).execute(retryContext -> {
                if (resubscribeGroupsAndReportSuccess(group)) {
                    return null;
                }
                throw new CommunicationException(ExceptionContext.CREATE_SUBSCRIPTION);
            });
        } catch (OPCUAException e2) {
            log.error("Subscription recreation aborted: ", e2);
        }
    }

    private UaSubscription getOrCreateSubscription(int i) throws OPCUAException {
        UaSubscription uaSubscription = (UaSubscription) this.subscriptionMap.get(Integer.valueOf(i));
        if (uaSubscription == null || !this.client.getSubscriptionManager().getSubscriptions().contains(uaSubscription)) {
            uaSubscription = (UaSubscription) retryOnConnection(ExceptionContext.CREATE_SUBSCRIPTION, () -> {
                return this.client.getSubscriptionManager().createSubscription(i * 1000);
            });
            this.subscriptionMap.put(Integer.valueOf(i), uaSubscription);
        }
        return uaSubscription;
    }

    private void defaultSubscriptionCallback(UaMonitoredItem uaMonitoredItem) {
        Long tagId = this.mapper.getTagId(uaMonitoredItem.getClientHandle().intValue());
        if (tagId == null) {
            log.info("Receives a value update that could not be associated with a DataTag.");
        } else {
            uaMonitoredItem.setValueConsumer(dataValue -> {
                if (dataValue == null) {
                    log.info("Received a null update.");
                    return;
                }
                this.messageSender.onValueUpdate(tagId.longValue(), MiloMapper.getDataTagQuality(dataValue.getStatusCode()), MiloMapper.toValueUpdate(dataValue, this.properties.getTimeRecordMode()));
            });
        }
    }

    private MonitoredItemCreateRequest toMonitoredItemCreateRequest(ItemDefinition itemDefinition) {
        return new MonitoredItemCreateRequest(new ReadValueId(itemDefinition.getNodeId(), AttributeId.Value.uid(), (String) null, QualifiedName.NULL_VALUE), this.mode, new MonitoringParameters(UInteger.valueOf(itemDefinition.getClientHandle()), Double.valueOf(0.0d), ExtensionObject.encode(this.client.getSerializationContext(), DataChangeFilter.builder().trigger(DataChangeTrigger.StatusValue).deadbandType(Unsigned.uint(itemDefinition.getValueDeadbandType().getValue())).deadbandValue(Double.valueOf(itemDefinition.getValueDeadband())).build()), Unsigned.uint(this.properties.getQueueSize()), true));
    }

    private <T> T retryOnConnection(ExceptionContext exceptionContext, Supplier<CompletableFuture<T>> supplier) throws OPCUAException {
        return (T) this.config.simpleRetryPolicy(this.properties).execute(retryContext -> {
            if (this.disconnectedOn.get() >= 0) {
                return processSupplier(exceptionContext, supplier);
            }
            log.info("Endpoint was stopped, cease retries.");
            throw new EndpointDisconnectedException(exceptionContext);
        });
    }

    private <T> T processSupplier(ExceptionContext exceptionContext, Supplier<CompletableFuture<T>> supplier) throws OPCUAException {
        try {
            return supplier.get().get(this.properties.getRequestTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.debug("Execution {} failed with interrupted exception; ", exceptionContext.name(), e);
            Thread.currentThread().interrupt();
            throw new EndpointDisconnectedException(exceptionContext, e.getCause());
        } catch (ExecutionException | TimeoutException e2) {
            log.debug("Execution {} failed with exception; ", exceptionContext.name(), e2);
            throw OPCUAException.of(exceptionContext, e2.getCause(), false);
        } catch (Exception e3) {
            log.info("An unexpected exception occurred during {}.", exceptionContext.name(), e3);
            throw OPCUAException.of(exceptionContext, e3, false);
        }
    }

    public MiloEndpoint(SecurityModule securityModule, TagSubscriptionReader tagSubscriptionReader, MessageSender messageSender, AppConfigProperties appConfigProperties, AppConfig appConfig) {
        this.securityModule = securityModule;
        this.mapper = tagSubscriptionReader;
        this.messageSender = messageSender;
        this.properties = appConfigProperties;
        this.config = appConfig;
    }

    @Override // cern.c2mon.daq.opcua.connection.Endpoint
    public String getUri() {
        return this.uri;
    }

    public void setMode(MonitoringMode monitoringMode) {
        this.mode = monitoringMode;
    }
}
