|
@@ -1,196 +0,0 @@
|
|
|
-package com.platform.service.opc.impl;
|
|
|
-
|
|
|
-import com.google.common.collect.ImmutableList;
|
|
|
-import com.platform.service.opc.OpcUAClientService;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
|
|
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
|
|
|
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
|
|
|
-import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
|
|
|
-import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
|
|
|
-import org.eclipse.milo.opcua.stack.core.AttributeId;
|
|
|
-import org.eclipse.milo.opcua.stack.core.UaException;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
|
|
|
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-
|
|
|
-@Service("opcUAClientService")
|
|
|
-@Slf4j
|
|
|
-public class OpcUAClientServiceImpl implements OpcUAClientService {
|
|
|
- private static AtomicInteger atomicInteger = new AtomicInteger(0);
|
|
|
- /**
|
|
|
- * 读取数据支持单个,多个节点数据
|
|
|
- */
|
|
|
- @Override
|
|
|
- public void readNodeList(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
|
|
|
- // 同步建立连接
|
|
|
- client.connect().get();
|
|
|
- // 异步读取数据
|
|
|
- NodeId nodeId_Tag1 = new NodeId(2, "Channel1.Device1.Tag1");
|
|
|
- NodeId nodeId_Tag2 = new NodeId(2, "Channel1.Device1.Tag2");
|
|
|
- List<NodeId> nodeIds = ImmutableList.of(nodeId_Tag1, nodeId_Tag2);
|
|
|
- CompletableFuture<List<DataValue>> listCompletableFuture = client.readValues(0.0, TimestampsToReturn.Both, nodeIds);
|
|
|
- listCompletableFuture.thenAccept(values -> {
|
|
|
- DataValue dataValue1 = values.get(0);
|
|
|
- DataValue dataValue2 = values.get(1);
|
|
|
- System.out.println("#########Tag1=" + dataValue1.getValue().getValue());
|
|
|
- System.out.println("#########Tag2=" + dataValue2.getValue().getValue());
|
|
|
- future.complete(client);
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 写入单个节点数据
|
|
|
- *
|
|
|
- * @param client
|
|
|
- * @throws Exception
|
|
|
- */
|
|
|
- private static void writeNodeValue(OpcUaClient client) throws Exception {
|
|
|
- //节点
|
|
|
- NodeId nodeId = new NodeId(2, "TD-01.SB-01.AG-01");
|
|
|
- short i = 3;
|
|
|
- //创建数据对象,此处的数据对象一定要定义类型,不然会出现类型错误,导致无法写入
|
|
|
- DataValue nowValue = new DataValue(new Variant(i), null, null);
|
|
|
- //写入节点数据
|
|
|
- StatusCode statusCode = client.writeValue(nodeId, nowValue).join();
|
|
|
- System.out.println("结果:" + statusCode.isGood());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 订阅(单个)
|
|
|
- *
|
|
|
- * @param client
|
|
|
- * @throws Exception
|
|
|
- */
|
|
|
- private static void subscribe(OpcUaClient client) throws Exception {
|
|
|
- //创建发布间隔1000ms的订阅对象
|
|
|
- client.getSubscriptionManager()
|
|
|
- .createSubscription(1000.0)
|
|
|
- .thenAccept(t -> {
|
|
|
- //节点
|
|
|
- NodeId nodeId = new NodeId(2, "TD-01.SB-01.AG-01");
|
|
|
- ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
|
|
|
- //创建监控的参数
|
|
|
- MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(atomicInteger.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
|
|
|
- //创建监控项请求
|
|
|
- //该请求最后用于创建订阅。
|
|
|
- MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
|
|
|
- List<MonitoredItemCreateRequest> requests = new ArrayList<>();
|
|
|
- requests.add(request);
|
|
|
- //创建监控项,并且注册变量值改变时候的回调函数。
|
|
|
- t.createMonitoredItems(
|
|
|
- TimestampsToReturn.Both,
|
|
|
- requests,
|
|
|
- (item, id) -> item.setValueConsumer((it, val) -> {
|
|
|
- System.out.println("nodeid :" + it.getReadValueId().getNodeId());
|
|
|
- System.out.println("value :" + val.getValue().getValue());
|
|
|
- })
|
|
|
- );
|
|
|
- }).get();
|
|
|
-
|
|
|
- //持续订阅
|
|
|
- Thread.sleep(Long.MAX_VALUE);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 批量订阅
|
|
|
- *
|
|
|
- * @param client
|
|
|
- * @throws Exception
|
|
|
- */
|
|
|
- private static void managedSubscriptionEvent(OpcUaClient client) throws Exception {
|
|
|
- final CountDownLatch eventLatch = new CountDownLatch(1);
|
|
|
- //添加订阅监听器,用于处理断线重连后的订阅问题
|
|
|
- client.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener(client));
|
|
|
- //处理订阅业务
|
|
|
- handlerNode(client);
|
|
|
- //持续监听
|
|
|
- eventLatch.await();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 处理订阅业务
|
|
|
- *
|
|
|
- * @param client OPC UA客户端
|
|
|
- */
|
|
|
- private static void handlerNode(OpcUaClient client) {
|
|
|
- try {
|
|
|
- //创建订阅
|
|
|
- ManagedSubscription subscription = ManagedSubscription.create(client);
|
|
|
-
|
|
|
- //你所需要订阅的key
|
|
|
- List<String> key = new ArrayList<>();
|
|
|
- key.add("TD-01.SB-01.AG-01");
|
|
|
- key.add("TD-01.SB-01.AG-02");
|
|
|
-
|
|
|
- List<NodeId> nodeIdList = new ArrayList<>();
|
|
|
- for (String s : key) {
|
|
|
- nodeIdList.add(new NodeId(2, s));
|
|
|
- }
|
|
|
-
|
|
|
- //监听
|
|
|
- List<ManagedDataItem> dataItemList = subscription.createDataItems(nodeIdList);
|
|
|
- for (ManagedDataItem managedDataItem : dataItemList) {
|
|
|
- managedDataItem.addDataValueListener((t) -> {
|
|
|
- System.out.println(managedDataItem.getNodeId().getIdentifier().toString() + ":" + t.getValue().getValue().toString());
|
|
|
- });
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 自定义订阅监听
|
|
|
- */
|
|
|
- private static class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
|
|
|
-
|
|
|
- private OpcUaClient client;
|
|
|
-
|
|
|
- CustomSubscriptionListener(OpcUaClient client) {
|
|
|
- this.client = client;
|
|
|
- }
|
|
|
-
|
|
|
- public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
|
|
|
- log.debug("onKeepAlive");
|
|
|
- }
|
|
|
-
|
|
|
- public void onStatusChanged(UaSubscription subscription, StatusCode status) {
|
|
|
- log.debug("onStatusChanged");
|
|
|
- }
|
|
|
-
|
|
|
- public void onPublishFailure(UaException exception) {
|
|
|
- log.debug("onPublishFailure");
|
|
|
- }
|
|
|
-
|
|
|
- public void onNotificationDataLost(UaSubscription subscription) {
|
|
|
- log.debug("onNotificationDataLost");
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 重连时 尝试恢复之前的订阅失败时 会调用此方法
|
|
|
- * @param uaSubscription 订阅
|
|
|
- * @param statusCode 状态
|
|
|
- */
|
|
|
- public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) {
|
|
|
- log.debug("恢复订阅失败 需要重新订阅");
|
|
|
- //在回调方法中重新订阅
|
|
|
- handlerNode(client);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-}
|
|
|
-
|