|
@@ -1,13 +1,17 @@
|
|
|
package com.platform.opc.servie;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
import com.platform.common.util.RedisUtils;
|
|
|
+import com.platform.common.util.StringUtils;
|
|
|
import com.platform.dao.entity.remote.RemoteOpc;
|
|
|
import com.platform.dao.enums.YesNoEnum;
|
|
|
import com.platform.dao.mapper.remote.RemoteOpcMapper;
|
|
|
+import com.platform.opc.entity.OpcResult;
|
|
|
import com.platform.opc.util.OpcDAClient;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.openscada.opc.lib.da.AddFailedException;
|
|
|
+import org.openscada.opc.lib.da.Group;
|
|
|
import org.openscada.opc.lib.da.Item;
|
|
|
import org.springframework.context.annotation.DependsOn;
|
|
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
@@ -18,9 +22,12 @@ import tk.mybatis.mapper.weekend.Weekend;
|
|
|
import tk.mybatis.mapper.weekend.WeekendCriteria;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Service("opcInitService")
|
|
@@ -44,26 +51,79 @@ public class OpcInitService {
|
|
|
*/
|
|
|
@PostConstruct
|
|
|
public void initAddAllItem() {
|
|
|
+ RedisUtils.del(OpcDAClient.redis_ok);
|
|
|
RedisUtils.del(OpcDAClient.redis_opc_item_values);
|
|
|
log.info("开始初始化分组");
|
|
|
addGroupAndItems(findAllItems(true, null));
|
|
|
+ log.info("开始执行分组任务-拉取数据");
|
|
|
+ startFetchDataByGroupLine();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 1: 新增的数据添加到分组里面
|
|
|
- * a: 之前已经新增,但是server里面没配置,现在server刚配置(修改createdFlag = 1)
|
|
|
- * b: 新增的点位,在server里面也配置了,createdFlag = 1
|
|
|
- * c: 新增的点位,设置了createdFlag = 1,但是加入到分组的时候,server里面就没配置,这个时候需要更新:createdFlag = 0
|
|
|
- * d: 之前点位配置到线路1,但是现在修改到了线路2,需要重新添加到分组,要从原有分组进行删除,不然2个组都获取到了数据*
|
|
|
- * <p>
|
|
|
- * 注意:获取的时候写入一个key,用于标识正在读,这个时候,不进行点位新增* * *
|
|
|
- */
|
|
|
- /*@Scheduled(fixedDelay = 300000) //间隔5分钟,300秒查询,需要同步新增的点位。
|
|
|
- public void addItems() {
|
|
|
- RedisUtils.setString(OpcDAClient.redis_opc_update_flag, "1");
|
|
|
- addGroupAndItems(findAllItems(true, null));
|
|
|
- RedisUtils.del(OpcDAClient.redis_opc_update_flag);
|
|
|
- }*/
|
|
|
+ private void startFetchDataByGroupLine() {
|
|
|
+ // 创建核心线程数量为分组数量的线程池
|
|
|
+ // 先判断是否分组完毕
|
|
|
+ String ok = RedisUtils.getString(OpcDAClient.redis_ok);
|
|
|
+ if(StringUtils.isNotBlank(ok)){
|
|
|
+ initAddAllItem();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ScheduledExecutorService pool = Executors.newScheduledThreadPool(OpcDAClient.groupList.size());
|
|
|
+ for (int i = 0; i < OpcDAClient.groupList.size(); i++) {
|
|
|
+ Group group = OpcDAClient.groupList.get(i);
|
|
|
+ pool.scheduleAtFixedRate(new TimerTask() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ log.info(Thread.currentThread().getName() + "-线程-" + group.getName() + new Date());
|
|
|
+ saveValue(OpcDAClient.getItemValuesList(group));
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 关闭线程池
|
|
|
+ pool.shutdown();
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error("批量获取数据异常:", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 1, 2, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+ /*// 任务1,延迟1秒执行,每5秒循环一次
|
|
|
+ pool.scheduleAtFixedRate(new TimerTask() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ System.out.println(Thread.currentThread().getName() + "线程A" + new Date());
|
|
|
+ }
|
|
|
+ }, 1, 1, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ // 任务2,延迟3秒执行,每5秒循环一次
|
|
|
+ pool.scheduleAtFixedRate(new TimerTask() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ System.out.println(Thread.currentThread().getName() + "线程B" + new Date());
|
|
|
+ }
|
|
|
+ }, 1, 1, TimeUnit.SECONDS);*/
|
|
|
+ // 关闭线程池
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveValue(List<OpcResult> resultList){
|
|
|
+ if (!CollectionUtils.isEmpty(resultList)) {
|
|
|
+ log.info("拉取数量:" + resultList.size());
|
|
|
+ RedisUtils.setString(OpcDAClient.redis_opc_item_values, JSON.toJSONString(resultList));
|
|
|
+ // 更新数据库实时数据
|
|
|
+ List<RemoteOpc> remoteOpcList = new ArrayList<>();
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.now();
|
|
|
+ for (OpcResult result : resultList) {
|
|
|
+ RemoteOpc remoteOpc = new RemoteOpc();
|
|
|
+ remoteOpc.setResult(new BigDecimal(result.getValue()).setScale(2));
|
|
|
+ remoteOpc.setPositionNum(result.getId());
|
|
|
+ remoteOpc.setUpdateTime(localDateTime);
|
|
|
+ remoteOpcList.add(remoteOpc);
|
|
|
+ }
|
|
|
+ remoteOpcMapper.updateBatch(remoteOpcList);
|
|
|
+ } else {
|
|
|
+ log.info("初始化启动分组错误,等待下次重新启动分组");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* 立即新增点位:在新增点位后,点击立即生效按钮,这个时间不能再执行上面的addItems()方法
|
|
@@ -137,6 +197,7 @@ public class OpcInitService {
|
|
|
List<RemoteOpc> remoteOpcFailList = new ArrayList<>();
|
|
|
AddFailedException exception = OpcDAClient.addGroupList(listMap);
|
|
|
if (exception != null) {
|
|
|
+ RedisUtils.setString(OpcDAClient.redis_ok, "0");
|
|
|
Map<String, Integer> failedItems = exception.getErrors();
|
|
|
Map<String, Item> addItems = exception.getItems();
|
|
|
if (failedItems != null) {// 有不存在的item,需要更新对应的点位信息
|
|
@@ -157,7 +218,7 @@ public class OpcInitService {
|
|
|
remoteOpc.setPositionNum(entry.getKey());
|
|
|
remoteOpc.setCreatedFlag(1);
|
|
|
remoteOpc.setRemark("opc server已配置,AV/DV配置正确");
|
|
|
- log.error("opc server已配置。key: " + remoteOpc.getPositionNum() + ", value: " + entry.getValue());
|
|
|
+ // log.error("opc server已配置。key: " + remoteOpc.getPositionNum() + ", value: " + entry.getValue());
|
|
|
remoteOpcFailList.add(remoteOpc);
|
|
|
}
|
|
|
}
|