Эх сурвалжийг харах

完善opc,多线程拉取,多线程保存

hfxc226 2 жил өмнө
parent
commit
d14997a48d

+ 2 - 0
platform-opc/src/main/java/com/platform/opc/OpcApplication.java

@@ -6,6 +6,7 @@ import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.scheduling.annotation.EnableAsync;
 
 import java.util.TimeZone;
 
@@ -20,6 +21,7 @@ import java.util.TimeZone;
 @SpringBootApplication(scanBasePackages = {"com.platform.common","com.platform.dao", "com.platform.opc"}, exclude = {
         DataSourceAutoConfiguration.class, PageHelperAutoConfiguration.class})
 @Slf4j
+@EnableAsync
 public class OpcApplication implements CommandLineRunner {
 
     public static void main(String[] args) {

+ 5 - 81
platform-opc/src/main/java/com/platform/opc/servie/OpcInitService.java → platform-opc/src/main/java/com/platform/opc/servie/OpcInit.java

@@ -1,17 +1,13 @@
 package com.platform.opc.servie;
 
-import com.alibaba.fastjson.JSON;
 import com.platform.common.util.RedisUtils;
-import com.platform.common.util.StringUtils;
 import com.platform.dao.entity.remote.RemoteOpc;
 import com.platform.dao.enums.YesNoEnum;
 import com.platform.dao.mapper.remote.RemoteOpcMapper;
-import com.platform.opc.entity.OpcResult;
 import com.platform.opc.util.OpcDAClient;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.openscada.opc.lib.da.AddFailedException;
-import org.openscada.opc.lib.da.Group;
 import org.openscada.opc.lib.da.Item;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -22,20 +18,17 @@ import tk.mybatis.mapper.weekend.Weekend;
 import tk.mybatis.mapper.weekend.WeekendCriteria;
 
 import javax.annotation.PostConstruct;
-import java.math.BigDecimal;
-import java.time.LocalDateTime;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
-@Service("opcInitService")
+@Service("opcInit")
 @DependsOn({"beanUtils", "redisTemplate"})
 @AllArgsConstructor
 @Slf4j
 @EnableScheduling   // 1.开启定时任务
-public class OpcInitService {
+public class OpcInit {
 
     private final RemoteOpcMapper remoteOpcMapper;
 
@@ -52,77 +45,8 @@ public class OpcInitService {
     @PostConstruct
     public void initAddAllItem() {
         RedisUtils.del(OpcDAClient.redis_ok);
-        RedisUtils.del(OpcDAClient.redis_opc_item_values);
         log.info("开始初始化分组");
         addGroupAndItems(findAllItems(true, null));
-        log.info("开始执行分组任务-拉取数据");
-        startFetchDataByGroupLine();
-    }
-
-    private void  startFetchDataByGroupLine() {
-        // 创建核心线程数量为分组数量的线程池
-        // 先判断是否分组完毕
-        String ok = RedisUtils.getString(OpcDAClient.redis_ok);
-        if(StringUtils.isNotBlank(ok)){
-            initAddAllItem();
-            return;
-        }
-        ScheduledExecutorService pool = Executors.newScheduledThreadPool(OpcDAClient.groupList.size());
-        for (int i = 0; i < OpcDAClient.groupList.size(); i++) {
-            Group group = OpcDAClient.groupList.get(i);
-            pool.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        log.info(Thread.currentThread().getName() + "-线程-" + group.getName() + new Date());
-                        saveValue(OpcDAClient.getItemValuesList(group));
-                    } catch (Exception e) {
-                        // 关闭线程池
-                        pool.shutdown();
-                        e.printStackTrace();
-                        log.error("批量获取数据异常:", e);
-                    }
-                }
-            }, 1, 2, TimeUnit.SECONDS);
-        }
-        /*// 任务1,延迟1秒执行,每5秒循环一次
-        pool.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                System.out.println(Thread.currentThread().getName() + "线程A" + new Date());
-            }
-        }, 1, 1, TimeUnit.SECONDS);
-
-        // 任务2,延迟3秒执行,每5秒循环一次
-        pool.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                System.out.println(Thread.currentThread().getName() + "线程B" + new Date());
-            }
-        }, 1, 1, TimeUnit.SECONDS);*/
-        // 关闭线程池
-
-    }
-
-    private void saveValue(List<OpcResult> resultList){
-        if (!CollectionUtils.isEmpty(resultList)) {
-            log.info("拉取数量:" + resultList.size());
-            RedisUtils.setString(OpcDAClient.redis_opc_item_values, JSON.toJSONString(resultList));
-            // 更新数据库实时数据
-            List<RemoteOpc> remoteOpcList = new ArrayList<>();
-            LocalDateTime localDateTime = LocalDateTime.now();
-            for (OpcResult result : resultList) {
-                RemoteOpc remoteOpc = new RemoteOpc();
-                remoteOpc.setResult(new BigDecimal(result.getValue()).setScale(2));
-                remoteOpc.setPositionNum(result.getId());
-                remoteOpc.setUpdateTime(localDateTime);
-                remoteOpcList.add(remoteOpc);
-            }
-            remoteOpcMapper.updateBatch(remoteOpcList);
-        } else {
-            log.info("初始化启动分组错误,等待下次重新启动分组");
-        }
-
     }
 
     /**

+ 105 - 0
platform-opc/src/main/java/com/platform/opc/servie/OpcService.java

@@ -0,0 +1,105 @@
+package com.platform.opc.servie;
+
+import com.alibaba.fastjson.JSON;
+import com.platform.common.util.DateUtils;
+import com.platform.common.util.RedisUtils;
+import com.platform.common.util.StringUtils;
+import com.platform.dao.entity.remote.RemoteOpcLog;
+import com.platform.dao.mapper.remote.RemoteOpcLogMapper;
+import com.platform.dao.mapper.remote.RemoteOpcMapper;
+import com.platform.opc.entity.OpcResult;
+import com.platform.opc.util.OpcDAClient;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jinterop.dcom.common.JIException;
+import org.openscada.opc.lib.da.Group;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import tk.mybatis.mapper.weekend.Weekend;
+import tk.mybatis.mapper.weekend.WeekendCriteria;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+@Service("opcService")
+@AllArgsConstructor
+@Slf4j
+public class OpcService {
+
+    private final RemoteOpcLogMapper remoteOpcLogMapper;
+
+    /**
+     * 1: 分组获取数据
+     * a:保存到redis,前端页面实时从数据库获取数据,5秒刷新一次
+     */
+    @Async
+    public void getValue(Group group) throws JIException {
+        log.info("拉取-" + group.getName());
+        log.info(Thread.currentThread().getName() + "-线程-" + group.getName() + new Date());
+        List<OpcResult> resultList = OpcDAClient.getItemValuesList(group);
+        RedisUtils.setString("opc-id-" + group.getName(), JSON.toJSONString(resultList));
+        log.info("结束拉取" + group.getName());
+    }
+
+    /**
+     *
+     */
+    @Async
+    public void saveValue(String id) {
+        log.info("保存-" + id);
+        String jsonStr = RedisUtils.getString(id);
+        if (StringUtils.isNotBlank(jsonStr)) {
+            List<OpcResult> resultList = JSON.parseArray(jsonStr, OpcResult.class);
+            List<RemoteOpcLog> addOpcLogList = new ArrayList<>();
+            List<RemoteOpcLog> updateRemoteOpcLogList = new ArrayList<>();
+            Weekend<RemoteOpcLog> weekend = new Weekend<>(RemoteOpcLog.class);
+            WeekendCriteria<RemoteOpcLog, Object> weekendCriteria = weekend.weekendCriteria();
+            // 查询当天是否已经存在了,存在则追加,否则更新
+            LocalDateTime time = LocalDateTime.now();
+            weekendCriteria.andIn(RemoteOpcLog::getPositionNum, resultList.stream().map(OpcResult::getId).collect(Collectors.toList()));
+            weekendCriteria.andEqualTo(RemoteOpcLog::getYear, time.getYear());
+            weekendCriteria.andEqualTo(RemoteOpcLog::getMonth, time.getMonthValue());
+            weekendCriteria.andEqualTo(RemoteOpcLog::getDay, time.getDayOfMonth());
+            List<RemoteOpcLog> checkList = remoteOpcLogMapper.selectByExample(weekend);
+            for (OpcResult result : resultList) {
+                RemoteOpcLog remoteOpcLog = new RemoteOpcLog();
+                remoteOpcLog.setPositionNum(result.getId());
+                remoteOpcLog.setResult(new BigDecimal(result.getValue()));
+                LocalDateTime localDateTime = DateUtils.strToLocalDateTime(result.getTime(), DateUtils.PATTERN_YMD_HMS);
+                remoteOpcLog.setCreatedTime(localDateTime);
+                remoteOpcLog.setYear(localDateTime.getYear());
+                remoteOpcLog.setMonth(localDateTime.getMonthValue());
+                remoteOpcLog.setDay(localDateTime.getDayOfMonth());
+                // remoteOpcLog.setHour(localDateTime.getHour());
+                // remoteOpcLog.setMinute(localDateTime.getMinute());
+                remoteOpcLog.setRemark(result.getTime().split(" ")[1] + "," + result.getValue() + ";");
+                List<RemoteOpcLog> findItemList = checkList.stream().filter(remoteOpcLog1 -> remoteOpcLog1.getPositionNum().equals(result.getId())).collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(findItemList) && findItemList.size() > 0) {
+                    updateRemoteOpcLogList.add(remoteOpcLog);
+                } else {
+                    addOpcLogList.add(remoteOpcLog);
+                }
+            }
+            if (!CollectionUtils.isEmpty(updateRemoteOpcLogList)) {
+                remoteOpcLogMapper.updateBatch(updateRemoteOpcLogList);
+                log.info("更新数量:" + resultList.size());
+            }
+            if (!CollectionUtils.isEmpty(addOpcLogList)) {
+                remoteOpcLogMapper.insertListforComplex(addOpcLogList);
+                log.info("写入数量:" + resultList.size());
+            }
+            log.info("写入/更新数据库数量:" + resultList.size());
+        }
+        log.info("结束保存-" + id);
+    }
+
+}

+ 59 - 0
platform-opc/src/main/java/com/platform/opc/servie/OpcTask.java

@@ -0,0 +1,59 @@
+package com.platform.opc.servie;
+
+import com.platform.common.util.RedisUtils;
+import com.platform.common.util.StringUtils;
+import com.platform.dao.mapper.remote.RemoteOpcLogMapper;
+import com.platform.opc.util.OpcDAClient;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jinterop.dcom.common.JIException;
+import org.openscada.opc.lib.da.Group;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+@Service("opcTask")
+@AllArgsConstructor
+@Slf4j
+@EnableScheduling   // 1.开启定时任务
+public class OpcTask {
+
+    private final OpcService opcService;
+
+    /**
+     * 1: 分组获取数据
+     * a:保存到redis,前端页面实时从数据库获取数据,5秒刷新一次
+     */
+    @Scheduled(fixedDelay = 2000)  //间隔2秒
+    public void getValue() throws JIException {
+        String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
+        if(StringUtils.isBlank(key)){
+            for (int i = 0; i < OpcDAClient.groupList.size(); i++) {
+                Group group = OpcDAClient.groupList.get(i);
+                opcService.getValue(group);
+            }
+        }
+    }
+
+    /**
+     * 1: 保存获取的数据
+     * b: 开启新线程,队列写入数据库,每2分钟启动一次
+     * c: 每个点位,每条数据保存一天的
+     * 1)循环查询点位在当天是否存在记录,如果存在,则追加
+     * 2)批量写入数据库,每天的数据追加到一条里面
+     */
+    @Scheduled(fixedDelay = 300000)  //间隔300秒,5分钟保存一次数据到数据库,确保每天不超过700万数据
+    public void saveValue() throws JIException {
+        String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
+        if(StringUtils.isBlank(key)){
+            for (int i = 0; i < OpcDAClient.groupList.size(); i++) {
+                Group group = OpcDAClient.groupList.get(i);
+                opcService.saveValue(group.getName());
+            }
+        }
+    }
+
+}

+ 0 - 130
platform-opc/src/main/java/com/platform/opc/servie/OpcTaskService.java

@@ -1,130 +0,0 @@
-package com.platform.opc.servie;
-
-import com.alibaba.fastjson.JSON;
-import com.platform.common.util.DateUtils;
-import com.platform.common.util.RedisUtils;
-import com.platform.common.util.StringUtils;
-import com.platform.dao.dto.check.CheckProjectDTO;
-import com.platform.dao.entity.remote.RemoteOpc;
-import com.platform.dao.entity.remote.RemoteOpcLog;
-import com.platform.dao.mapper.remote.RemoteOpcLogMapper;
-import com.platform.dao.mapper.remote.RemoteOpcMapper;
-import com.platform.opc.entity.OpcResult;
-import com.platform.opc.util.OpcDAClient;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-import tk.mybatis.mapper.weekend.Weekend;
-import tk.mybatis.mapper.weekend.WeekendCriteria;
-
-import java.math.BigDecimal;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Service("opcTaskService")
-@AllArgsConstructor
-@Slf4j
-@EnableScheduling   // 1.开启定时任务
-public class OpcTaskService {
-
-    private final RemoteOpcMapper remoteOpcMapper;
-    private final RemoteOpcLogMapper remoteOpcLogMapper;
-
-    /**
-     * 1: 分组获取数据
-     * a:保存到redis,前端页面实时从数据库获取数据,5秒刷新一次
-     */
-    @Scheduled(fixedDelay = 2000)  //间隔2秒
-    public void getValue() {
-        log.info("启动拉取");
-        String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
-        if(StringUtils.isBlank(key)){
-            log.info("开始拉取数据");
-            List<OpcResult> resultList = OpcDAClient.getItemValuesList();
-            if (!CollectionUtils.isEmpty(resultList)) {
-                log.info("拉取数量:" + resultList.size());
-                RedisUtils.setString(OpcDAClient.redis_opc_item_values, JSON.toJSONString(resultList));
-                // 更新数据库实时数据
-                List<RemoteOpc> remoteOpcList = new ArrayList<>();
-                LocalDateTime localDateTime = LocalDateTime.now();
-                for (OpcResult result : resultList) {
-                    RemoteOpc remoteOpc = new RemoteOpc();
-                    remoteOpc.setResult(new BigDecimal(result.getValue()).setScale(2));
-                    remoteOpc.setPositionNum(result.getId());
-                    remoteOpc.setUpdateTime(localDateTime);
-                    remoteOpcList.add(remoteOpc);
-                }
-                remoteOpcMapper.updateBatch(remoteOpcList);
-            } else {
-                log.info("初始化启动分组错误,等待下次重新启动分组");
-            }
-            log.info("结束拉取数据");
-        }
-    }
-
-    /**
-     * 1: 保存获取的数据
-     * b: 开启新线程,队列写入数据库,每2分钟启动一次
-     * c: 每个点位,每条数据保存一天的
-     * 1)循环查询点位在当天是否存在记录,如果存在,则追加
-     * 2)批量写入数据库,每天的数据追加到一条里面
-     */
-    @Scheduled(fixedDelay = 300000)  //间隔300秒,5分钟保存一次数据到数据库,确保每天不超过700万数据
-    public void saveValue() {
-        log.info("启动保存");
-        String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
-        if(StringUtils.isBlank(key)){
-            log.info("开始保存点位");
-            String jsonStr = RedisUtils.getString(OpcDAClient.redis_opc_item_values);
-            if (StringUtils.isNotBlank(jsonStr)) {
-                List<OpcResult> resultList = JSON.parseArray(jsonStr, OpcResult.class);
-                List<RemoteOpcLog> addOpcLogList = new ArrayList<>();
-                List<RemoteOpcLog> updateRemoteOpcLogList = new ArrayList<>();
-                Weekend<RemoteOpcLog> weekend = new Weekend<>(RemoteOpcLog.class);
-                WeekendCriteria<RemoteOpcLog, Object> weekendCriteria = weekend.weekendCriteria();
-                // 查询当天是否已经存在了,存在则追加,否则更新
-                LocalDateTime time = LocalDateTime.now();
-                weekendCriteria.andIn(RemoteOpcLog::getPositionNum, resultList.stream().map(OpcResult::getId).collect(Collectors.toList()));
-                weekendCriteria.andEqualTo(RemoteOpcLog::getYear, time.getYear());
-                weekendCriteria.andEqualTo(RemoteOpcLog::getMonth, time.getMonthValue());
-                weekendCriteria.andEqualTo(RemoteOpcLog::getDay, time.getDayOfMonth());
-                List<RemoteOpcLog> checkList = remoteOpcLogMapper.selectByExample(weekend);
-                for (OpcResult result : resultList) {
-                    RemoteOpcLog remoteOpcLog = new RemoteOpcLog();
-                    remoteOpcLog.setPositionNum(result.getId());
-                    remoteOpcLog.setResult(new BigDecimal(result.getValue()));
-                    LocalDateTime localDateTime = DateUtils.strToLocalDateTime(result.getTime(), DateUtils.PATTERN_YMD_HMS);
-                    remoteOpcLog.setCreatedTime(localDateTime);
-                    remoteOpcLog.setYear(localDateTime.getYear());
-                    remoteOpcLog.setMonth(localDateTime.getMonthValue());
-                    remoteOpcLog.setDay(localDateTime.getDayOfMonth());
-                    // remoteOpcLog.setHour(localDateTime.getHour());
-                    // remoteOpcLog.setMinute(localDateTime.getMinute());
-                    remoteOpcLog.setRemark(result.getTime().split(" ")[1] + "," + result.getValue() + ";");
-                    List<RemoteOpcLog> findItemList = checkList.stream().filter(remoteOpcLog1 -> remoteOpcLog1.getPositionNum().equals(result.getId())).collect(Collectors.toList());
-                    if (!CollectionUtils.isEmpty(findItemList) && findItemList.size()>0) {
-                        updateRemoteOpcLogList.add(remoteOpcLog);
-                    } else {
-                        addOpcLogList.add(remoteOpcLog);
-                    }
-                }
-                if(!CollectionUtils.isEmpty(updateRemoteOpcLogList)){
-                    remoteOpcLogMapper.updateBatch(updateRemoteOpcLogList);
-                    log.info("更新数量:" + resultList.size());
-                }
-                if(!CollectionUtils.isEmpty(addOpcLogList)){
-                    remoteOpcLogMapper.insertListforComplex(addOpcLogList);
-                    log.info("写入数量:" + resultList.size());
-                }
-                log.info("写入/更新数据库数量:" + resultList.size());
-            }
-            log.info("结束保存点位");
-        }
-    }
-
-}