|
@@ -4,20 +4,15 @@ import com.alibaba.fastjson.JSON;
|
|
|
import com.platform.common.util.DateUtils;
|
|
|
import com.platform.common.util.RedisUtils;
|
|
|
import com.platform.common.util.StringUtils;
|
|
|
+import com.platform.dao.dto.check.CheckProjectDTO;
|
|
|
import com.platform.dao.entity.remote.RemoteOpc;
|
|
|
import com.platform.dao.entity.remote.RemoteOpcLog;
|
|
|
-import com.platform.dao.enums.YesNoEnum;
|
|
|
import com.platform.dao.mapper.remote.RemoteOpcLogMapper;
|
|
|
import com.platform.dao.mapper.remote.RemoteOpcMapper;
|
|
|
import com.platform.opc.entity.OpcResult;
|
|
|
import com.platform.opc.util.OpcDAClient;
|
|
|
-import org.openscada.opc.lib.da.AddFailedException;
|
|
|
-import org.openscada.opc.lib.da.Item;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springframework.context.annotation.DependsOn;
|
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
|
-import org.springframework.scheduling.annotation.EnableAsync;
|
|
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -25,15 +20,13 @@ import org.springframework.util.CollectionUtils;
|
|
|
import tk.mybatis.mapper.weekend.Weekend;
|
|
|
import tk.mybatis.mapper.weekend.WeekendCriteria;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
+import java.math.BigDecimal;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Service("opcTaskService")
|
|
|
-@DependsOn({"beanUtils", "redisTemplate"})
|
|
|
@AllArgsConstructor
|
|
|
@Slf4j
|
|
|
@EnableScheduling // 1.开启定时任务
|
|
@@ -42,84 +35,35 @@ public class OpcTaskService {
|
|
|
private final RemoteOpcMapper remoteOpcMapper;
|
|
|
private final RemoteOpcLogMapper remoteOpcLogMapper;
|
|
|
|
|
|
- /**
|
|
|
- * 初始化redis和点位分组数据,并启动循环判断
|
|
|
- */
|
|
|
- @PostConstruct
|
|
|
- public void initClients() {
|
|
|
- log.info("开始初始化");
|
|
|
- // 判断开发环境还是本地环境
|
|
|
- // 启动分组,按照车间line分组,选择已经在opc server中配置的
|
|
|
- Weekend<RemoteOpc> weekend = new Weekend<>(RemoteOpc.class);
|
|
|
- WeekendCriteria<RemoteOpc, Object> weekendCriteria = weekend.weekendCriteria();
|
|
|
- weekendCriteria.andEqualTo(RemoteOpc::getCreatedFlag, YesNoEnum.YES.getValue());
|
|
|
- List<RemoteOpc> remoteOpcList = remoteOpcMapper.selectByExample(weekend);
|
|
|
- log.info("remoteOpcList: " + remoteOpcList.size());
|
|
|
- Map<String, List<RemoteOpc>> listMap = remoteOpcList.stream().collect(Collectors.groupingBy(RemoteOpc::getLine));
|
|
|
- OpcDAClient.connect();
|
|
|
- OpcDAClient.findAllItem();
|
|
|
- List<RemoteOpc> remoteOpcFailList = new ArrayList<>();
|
|
|
- AddFailedException exception = OpcDAClient.addGroupList(listMap);
|
|
|
- if (exception != null) {
|
|
|
- Map<String, Integer> failedItems = exception.getErrors();
|
|
|
- Map<String, Item> addItems = exception.getItems();
|
|
|
- if (failedItems != null) {// 有不存在的item,需要更新对应的点位信息
|
|
|
- for (Map.Entry<String, Integer> entry : failedItems.entrySet()) {
|
|
|
- RemoteOpc remoteOpc = new RemoteOpc();
|
|
|
- // 因为有些标签是:PT_9836_AV,不能用 entry.getKey().split("_")[0],需要找到最后一个_
|
|
|
- int index = entry.getKey().lastIndexOf("_");
|
|
|
- remoteOpc.setPositionNum(entry.getKey());
|
|
|
- remoteOpc.setCreatedFlag(0);
|
|
|
- remoteOpc.setRemark("opc server未找到改点位。可能原因1:AV/DV配置错误,2:opc server中未配置");
|
|
|
- log.error("opc server未找到该点位。key: " + remoteOpc.getPositionNum() + ", value: " + entry.getValue());
|
|
|
- remoteOpcFailList.add(remoteOpc);
|
|
|
- }
|
|
|
- }
|
|
|
- if (addItems != null) {// 有不存在的item,需要更新对应的点位信息
|
|
|
- for (Map.Entry<String, Item> entry : addItems.entrySet()) {
|
|
|
- RemoteOpc remoteOpc = new RemoteOpc();
|
|
|
- // 因为有些标签是:PT_9836_AV,不能用 entry.getKey().split("_")[0],需要找到最后一个_
|
|
|
- // int index = entry.getKey().lastIndexOf("_");
|
|
|
- remoteOpc.setPositionNum(entry.getKey());
|
|
|
- remoteOpc.setCreatedFlag(1);
|
|
|
- remoteOpc.setRemark("opc server已配置,AV/DV配置正确");
|
|
|
- log.error("opc server已配置。key: " + remoteOpc.getPositionNum() + ", value: " + entry.getValue());
|
|
|
- remoteOpcFailList.add(remoteOpc);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (!CollectionUtils.isEmpty(remoteOpcFailList)) {
|
|
|
- remoteOpcMapper.updateBatch(remoteOpcFailList);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 1: 分组获取数据
|
|
|
* a:保存到redis,前端页面实时从数据库获取数据,5秒刷新一次
|
|
|
*/
|
|
|
- @Scheduled(fixedDelay = 5000) //间隔5秒
|
|
|
+ @Scheduled(fixedDelay = 2000) //间隔2秒
|
|
|
public void getValue() {
|
|
|
- log.info("开始定时任务");
|
|
|
- List<OpcResult> resultList = OpcDAClient.getItemValuesList();
|
|
|
- if (!CollectionUtils.isEmpty(resultList)) {
|
|
|
- //log.info("resultList数量:" + resultList.size());
|
|
|
- RedisUtils.setString("opcList", JSON.toJSONString(resultList));
|
|
|
- // 更新数据库实时数据
|
|
|
- List<RemoteOpc> remoteOpcList = new ArrayList<>();
|
|
|
- LocalDateTime localDateTime = LocalDateTime.now();
|
|
|
- for (OpcResult result : resultList) {
|
|
|
- RemoteOpc remoteOpc = new RemoteOpc();
|
|
|
- remoteOpc.setResult(result.getValue());
|
|
|
- remoteOpc.setPositionNum(result.getId());
|
|
|
- remoteOpc.setUpdateTime(localDateTime);
|
|
|
- remoteOpcList.add(remoteOpc);
|
|
|
+ String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
|
|
|
+ if(StringUtils.isBlank(key)){
|
|
|
+ log.info("开始拉取数据");
|
|
|
+ List<OpcResult> resultList = OpcDAClient.getItemValuesList();
|
|
|
+ if (!CollectionUtils.isEmpty(resultList)) {
|
|
|
+ log.info("拉取数量:" + resultList.size());
|
|
|
+ RedisUtils.setString(OpcDAClient.redis_opc_item_values, JSON.toJSONString(resultList));
|
|
|
+ // 更新数据库实时数据
|
|
|
+ List<RemoteOpc> remoteOpcList = new ArrayList<>();
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.now();
|
|
|
+ for (OpcResult result : resultList) {
|
|
|
+ RemoteOpc remoteOpc = new RemoteOpc();
|
|
|
+ remoteOpc.setResult(new BigDecimal(result.getValue()));
|
|
|
+ remoteOpc.setPositionNum(result.getId());
|
|
|
+ remoteOpc.setUpdateTime(localDateTime);
|
|
|
+ remoteOpcList.add(remoteOpc);
|
|
|
+ }
|
|
|
+ remoteOpcMapper.updateBatch(remoteOpcList);
|
|
|
+ } else {
|
|
|
+ log.info("初始化启动分组错误,等待下次重新启动分组");
|
|
|
}
|
|
|
- remoteOpcMapper.updateBatch(remoteOpcList);
|
|
|
- }else{
|
|
|
- log.info("初始化启动分组错误,重新启动分组");
|
|
|
- initClients();
|
|
|
+ log.info("结束拉取数据");
|
|
|
}
|
|
|
- log.info("结束定时任务");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -131,54 +75,54 @@ public class OpcTaskService {
|
|
|
*/
|
|
|
@Scheduled(fixedDelay = 300000) //间隔300秒,5分钟保存一次数据到数据库,确保每天不超过700万数据
|
|
|
public void saveValue() {
|
|
|
- log.info("开始读取redis1");
|
|
|
- String jsonStr = RedisUtils.getString("opcList");
|
|
|
- if (StringUtils.isNotBlank(jsonStr)) {
|
|
|
- List<OpcResult> resultList = JSON.parseArray(jsonStr, OpcResult.class);
|
|
|
- /*for(OpcResult result: resultList){
|
|
|
- log.info("id: " + result.getId() + ", value: " + result.getValue() + ", time: " + result.getTime());
|
|
|
- }*/
|
|
|
- // 保存入库
|
|
|
- // 查询当天是否已经存在了,存在则追加,否则则更新
|
|
|
- Weekend<RemoteOpcLog> weekend = new Weekend<>(RemoteOpcLog.class);
|
|
|
- WeekendCriteria<RemoteOpcLog, Object> weekendCriteria = weekend.weekendCriteria();
|
|
|
- OpcResult opcResult = resultList.get(0);
|
|
|
- LocalDateTime time = DateUtils.strToLocalDateTime(opcResult.getTime(), DateUtils.PATTERN_YMD_HMS);
|
|
|
- weekendCriteria.andEqualTo(RemoteOpcLog::getPositionNum, opcResult.getId());
|
|
|
- weekendCriteria.andEqualTo(RemoteOpcLog::getYear, time.getYear());
|
|
|
- weekendCriteria.andEqualTo(RemoteOpcLog::getMonth, time.getMonthValue());
|
|
|
- weekendCriteria.andEqualTo(RemoteOpcLog::getDay, time.getDayOfMonth());
|
|
|
- int count = remoteOpcLogMapper.selectCountByExample(weekend);
|
|
|
- List<RemoteOpcLog> remoteOpcLogList = new ArrayList<>();
|
|
|
- for (OpcResult result : resultList) {
|
|
|
- RemoteOpcLog remoteOpcLog = new RemoteOpcLog();
|
|
|
- remoteOpcLog.setPositionNum(result.getId());
|
|
|
- remoteOpcLog.setResult(result.getValue());
|
|
|
- LocalDateTime localDateTime = DateUtils.strToLocalDateTime(result.getTime(), DateUtils.PATTERN_YMD_HMS);
|
|
|
- remoteOpcLog.setCreatedTime(localDateTime);
|
|
|
- remoteOpcLog.setYear(localDateTime.getYear());
|
|
|
- remoteOpcLog.setMonth(localDateTime.getMonthValue());
|
|
|
- remoteOpcLog.setDay(localDateTime.getDayOfMonth());
|
|
|
- // remoteOpcLog.setHour(localDateTime.getHour());
|
|
|
- // remoteOpcLog.setMinute(localDateTime.getMinute());
|
|
|
- remoteOpcLog.setRemark(result.getTime().split(" ")[1] + "," + result.getValue() + ";");
|
|
|
- remoteOpcLogList.add(remoteOpcLog);
|
|
|
- //log.info("id: " + result.getId() + ", value: " + result.getValue() + ", time: " + result.getTime());
|
|
|
- }
|
|
|
-
|
|
|
- // TODO:判断remoteOpcLogList是否大于5000,大于5000,就分多次存入
|
|
|
- log.info("count:" + count);
|
|
|
- if (count > 0) {
|
|
|
- remoteOpcLogMapper.updateBatch(remoteOpcLogList);
|
|
|
- } else {
|
|
|
- remoteOpcLogMapper.insertListforComplex(remoteOpcLogList);
|
|
|
+ String key = RedisUtils.getString(OpcDAClient.redis_opc_update_flag);
|
|
|
+ if(StringUtils.isBlank(key)){
|
|
|
+ log.info("开始保存点位");
|
|
|
+ String jsonStr = RedisUtils.getString(OpcDAClient.redis_opc_item_values);
|
|
|
+ if (StringUtils.isNotBlank(jsonStr)) {
|
|
|
+ List<OpcResult> resultList = JSON.parseArray(jsonStr, OpcResult.class);
|
|
|
+ List<RemoteOpcLog> addOpcLogList = new ArrayList<>();
|
|
|
+ List<RemoteOpcLog> updateRemoteOpcLogList = new ArrayList<>();
|
|
|
+ Weekend<RemoteOpcLog> weekend = new Weekend<>(RemoteOpcLog.class);
|
|
|
+ WeekendCriteria<RemoteOpcLog, Object> weekendCriteria = weekend.weekendCriteria();
|
|
|
+ // 查询当天是否已经存在了,存在则追加,否则更新
|
|
|
+ LocalDateTime time = LocalDateTime.now();
|
|
|
+ weekendCriteria.andIn(RemoteOpcLog::getPositionNum, resultList.stream().map(OpcResult::getId).collect(Collectors.toList()));
|
|
|
+ weekendCriteria.andEqualTo(RemoteOpcLog::getYear, time.getYear());
|
|
|
+ weekendCriteria.andEqualTo(RemoteOpcLog::getMonth, time.getMonthValue());
|
|
|
+ weekendCriteria.andEqualTo(RemoteOpcLog::getDay, time.getDayOfMonth());
|
|
|
+ List<RemoteOpcLog> checkList = remoteOpcLogMapper.selectByExample(weekend);
|
|
|
+ for (OpcResult result : resultList) {
|
|
|
+ RemoteOpcLog remoteOpcLog = new RemoteOpcLog();
|
|
|
+ remoteOpcLog.setPositionNum(result.getId());
|
|
|
+ remoteOpcLog.setResult(new BigDecimal(result.getValue()));
|
|
|
+ LocalDateTime localDateTime = DateUtils.strToLocalDateTime(result.getTime(), DateUtils.PATTERN_YMD_HMS);
|
|
|
+ remoteOpcLog.setCreatedTime(localDateTime);
|
|
|
+ remoteOpcLog.setYear(localDateTime.getYear());
|
|
|
+ remoteOpcLog.setMonth(localDateTime.getMonthValue());
|
|
|
+ remoteOpcLog.setDay(localDateTime.getDayOfMonth());
|
|
|
+ // remoteOpcLog.setHour(localDateTime.getHour());
|
|
|
+ // remoteOpcLog.setMinute(localDateTime.getMinute());
|
|
|
+ remoteOpcLog.setRemark(result.getTime().split(" ")[1] + "," + result.getValue() + ";");
|
|
|
+ List<RemoteOpcLog> findItemList = checkList.stream().filter(remoteOpcLog1 -> remoteOpcLog1.getPositionNum().equals(result.getId())).collect(Collectors.toList());
|
|
|
+ if (!CollectionUtils.isEmpty(findItemList) && findItemList.size()>0) {
|
|
|
+ updateRemoteOpcLogList.add(remoteOpcLog);
|
|
|
+ } else {
|
|
|
+ addOpcLogList.add(remoteOpcLog);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(!CollectionUtils.isEmpty(updateRemoteOpcLogList)){
|
|
|
+ remoteOpcLogMapper.updateBatch(updateRemoteOpcLogList);
|
|
|
+ log.info("更新数量:" + resultList.size());
|
|
|
+ }
|
|
|
+ if(!CollectionUtils.isEmpty(addOpcLogList)){
|
|
|
+ remoteOpcLogMapper.insertListforComplex(addOpcLogList);
|
|
|
+ log.info("写入数量:" + resultList.size());
|
|
|
+ }
|
|
|
+ log.info("写入/更新数据库数量:" + resultList.size());
|
|
|
}
|
|
|
+ log.info("结束保存点位");
|
|
|
}
|
|
|
- log.info("结束读取redis1");
|
|
|
}
|
|
|
|
|
|
- public static void main(String[] args) {
|
|
|
- String time = "2022-11-12 12:04:06";
|
|
|
- System.out.println(time.split(" ")[1]);
|
|
|
- }
|
|
|
}
|