浏览代码

dcs数据保存到influxDB

hfxc226 2 年之前
父节点
当前提交
ebbe253a5a

+ 5 - 5
platform-dao/src/main/java/com/platform/dao/influxdb/InfluxDBService.java

@@ -1,6 +1,7 @@
 package com.platform.dao.influxdb;
 
 import com.influxdb.query.FluxTable;
+import com.platform.dao.vo.query.remote.RemoteOpcVO;
 
 import java.time.LocalDateTime;
 import java.util.List;
@@ -9,14 +10,13 @@ import java.util.Map;
 public interface InfluxDBService {
 
     /**
-     * *
      *
-     * @param tagName     标签名称
-     * @param fields      字段
      */
-    void writeOne(String tagName, Map<String, Object> fields);
+    void writeOne(RemoteOpcVO vo);
+
+    void writeBatch(List<RemoteOpcVO> resultList);
 
     void delete(LocalDateTime start, LocalDateTime stop);
 
-    void select(String tagName, LocalDateTime start, LocalDateTime stop);
+    RemoteOpcVO select(String id, Integer type);
 }

+ 69 - 0
platform-dao/src/main/java/com/platform/dao/influxdb/builder/InfluxDbBuilder.java

@@ -0,0 +1,69 @@
+package com.platform.dao.influxdb.builder;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import com.influxdb.query.FluxRecord;
+import com.influxdb.query.FluxTable;
+import com.platform.dao.util.influxDB.InfluxDBFluxExpression;
+import com.platform.dao.vo.query.remote.RemoteOpcVO;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class InfluxDbBuilder {
+
+    // 数据转化为可以存储的
+    public static Point bulidOnePoint(RemoteOpcVO vo){
+        Instant now = Instant.now();
+        Point point = Point
+                .measurement(InfluxDBFluxExpression.measurement)
+                .addTag(InfluxDBFluxExpression.tag, vo.getPositionNum())
+                .addField(InfluxDBFluxExpression.field_result, vo.getResult())
+                .time(now, WritePrecision.NS);
+        return point;
+    }
+
+    // 数据转化为可以存储的
+    public static List<Point> bulidBatchPoint(List<RemoteOpcVO> resultList){
+        List<Point> points = new ArrayList<>();
+        Instant now = Instant.now();
+        for(RemoteOpcVO vo: resultList){
+            Point point = Point
+                    .measurement(InfluxDBFluxExpression.measurement)
+                    .addTag(InfluxDBFluxExpression.tag, vo.getPositionNum())
+                    .addField(InfluxDBFluxExpression.field_result, vo.getResult())
+                    .time(now, WritePrecision.NS);
+            points.add(point);
+        }
+        return points;
+    }
+
+    /**
+     * 数据转化为可以表格展示、chart展示的
+     * 只能查询某一个点位的历史数据
+     * @param tables
+     * @return
+     */
+    public static String bulidRemoteOpcVO(List<FluxTable> tables ){
+        JSONArray array = new JSONArray();
+        Instant now = Instant.now();
+        for (FluxTable table : tables) {
+            List<FluxRecord> records = table.getRecords();
+            for (FluxRecord record : records) {
+                RemoteOpcVO vo = new RemoteOpcVO();
+                log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
+                JSONObject object = new JSONObject();
+                object.put("time",record.getTime().toString() );
+                object.put("value",new BigDecimal(record.getValue().toString()));
+                array.add(object);
+            }
+        }
+        return array.toJSONString();
+    }
+}

+ 67 - 35
platform-dao/src/main/java/com/platform/dao/influxdb/impl/InfluxDBServiceImpl.java

@@ -10,9 +10,14 @@ import com.influxdb.client.write.Point;
 import com.influxdb.query.FluxRecord;
 import com.influxdb.query.FluxTable;
 import com.platform.common.exception.BusinessException;
+import com.platform.common.util.BeanConverterUtil;
 import com.platform.common.util.DateUtils;
+import com.platform.dao.entity.remote.RemoteOpc;
 import com.platform.dao.influxdb.InfluxDBService;
+import com.platform.dao.influxdb.builder.InfluxDbBuilder;
+import com.platform.dao.mapper.remote.RemoteOpcMapper;
 import com.platform.dao.util.influxDB.InfluxDBFluxExpression;
+import com.platform.dao.vo.query.remote.RemoteOpcVO;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -21,6 +26,7 @@ import org.springframework.stereotype.Service;
 import java.text.ParseException;
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.List;
@@ -33,15 +39,16 @@ public class InfluxDBServiceImpl implements InfluxDBService {
 
     @Autowired
     private InfluxDBClient influxDBClient;
+    @Autowired
+    private RemoteOpcMapper remoteOpcMapper;
 
     /**
      * *
-     * @param tagName     标签名称:positionNum的值
-     * @param fields      字段
+     *
+     * @param vo
      */
     @Override
-    public void writeOne(String tagName, Map<String, Object> fields) {
-
+    public void writeOne(RemoteOpcVO vo) {
         System.out.println("=============开始插入数据=============");
         WriteOptions writeOptions = WriteOptions.builder()
                 .batchSize(5000)
@@ -51,13 +58,8 @@ public class InfluxDBServiceImpl implements InfluxDBService {
                 .retryInterval(5000)
                 .build();
         try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
-            Point point = Point
-                    .measurement(InfluxDBFluxExpression.measurement)
-                    .addTag(InfluxDBFluxExpression.tag, tagName)
-                    .addFields(fields)
-                    .time(Instant.now(), WritePrecision.NS);
-            writeApi.writePoint(InfluxDBFluxExpression.bucket, InfluxDBFluxExpression.org, point);
-        }catch(Exception e){
+            writeApi.writePoint(InfluxDBFluxExpression.bucket, InfluxDBFluxExpression.org, InfluxDbBuilder.bulidOnePoint(vo));
+        } catch (Exception e) {
             e.printStackTrace();
             throw new BusinessException("db写入报错");
 
@@ -67,14 +69,23 @@ public class InfluxDBServiceImpl implements InfluxDBService {
     /**
      * 批量数据写入
      */
-    public void writeBatch(List<Point> list){
-        /*Point point = Point
-                .measurement(InfluxDBFluxExpression.measurement)
-                .addTag(InfluxDBFluxExpression.tag, InfluxDBFluxExpression.tag)
-                .addField(InfluxDBFluxExpression.field, 23.43234543)
-                .time(Instant.now(), WritePrecision.NS);
-        list.add(point);*/
-        influxDBClient.getWriteApi().writePoints(list);
+    @Override
+    public void writeBatch(List<RemoteOpcVO> resultList) {
+        System.out.println("=============开始插入数据=============");
+        WriteOptions writeOptions = WriteOptions.builder()
+                .batchSize(5000)
+                .flushInterval(1000)
+                .bufferLimit(10000)
+                .jitterInterval(1000)
+                .retryInterval(5000)
+                .build();
+        try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
+            writeApi.writePoints(InfluxDBFluxExpression.bucket, InfluxDBFluxExpression.org, InfluxDbBuilder.bulidBatchPoint(resultList));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new BusinessException("db写入报错");
+
+        }
     }
 
     /**
@@ -84,37 +95,58 @@ public class InfluxDBServiceImpl implements InfluxDBService {
     public void delete(LocalDateTime start, LocalDateTime stop) {
         DeletePredicateRequest deletePredicateRequest = new DeletePredicateRequest();
         // deletePredicateRequest.start(start.atOffset(ZoneOffset.ofHours(0)).minusDays(5));
-        deletePredicateRequest.start(start.atOffset(ZoneOffset.ofHours(0)));
-        deletePredicateRequest.stop(stop.atOffset(ZoneOffset.ofHours(0)));
+        deletePredicateRequest.start(start.atOffset(ZoneOffset.ofHours(-8)).minusDays(5));
+        deletePredicateRequest.stop(stop.atOffset(ZoneOffset.ofHours(-8)));
         influxDBClient.getDeleteApi().delete(deletePredicateRequest, InfluxDBFluxExpression.bucket, InfluxDBFluxExpression.org);
     }
 
     /**
-     * 查询数据: 查询某个点位,某个时间段的数据*
-     * @param tagName positionNum值
-     * @param start 开始时间
-     * @param stop 结束时间
+     * 查询数据: 查询某个点位,某个时间段的数据*,表格数据,图表数据
+     *
+     * @param id   positionNum值
+     * @param type 1: 5分钟
      */
     @Override
-    public void select(String tagName, LocalDateTime start, LocalDateTime stop){
+    public RemoteOpcVO select(String id, Integer type) {
+        RemoteOpc remoteOpc = remoteOpcMapper.selectByPrimaryKey(id);
+        RemoteOpcVO vo = BeanConverterUtil.copyObjectProperties(remoteOpc, RemoteOpcVO.class);
         StringBuffer stringBuilder = new StringBuffer();
         try {
+
+            LocalDateTime start = LocalDateTime.now().minusHours(8);
+            LocalDateTime stop = LocalDateTime.now().minusHours(8);
+            if (type == 1) {// 5分钟
+                start = start.minusMinutes(5);
+            } else if (type == 2) {// 5分钟
+                start = start.minusMinutes(15);
+            }else if (type == 3) {// 5分钟
+                start = start.minusHours(1);
+            }else if (type == 4) {// 5分钟
+                start = start.minusHours(3);
+            }else if (type == 5) {// 5分钟
+                start = start.minusHours(6);
+            }else if (type == 6) {// 5分钟
+                start = start.minusHours(12);
+            }else if (type == 7) {// 5分钟
+                start = start.minusHours(24);
+            }else if (type == 8) {// 5分钟
+                start = start.minusDays(2);
+            }else if (type == 9) {// 5分钟
+                start = start.minusDays(7);
+            }else{
+                start = start.minusDays(30);
+            }
             InfluxDBFluxExpression.appendCommonFlux(stringBuilder, InfluxDBFluxExpression.bucket, InfluxDBFluxExpression.measurement, DateUtils.localDateTimeToUTC(start), DateUtils.localDateTimeToUTC(stop));
         } catch (ParseException e) {
             e.printStackTrace();
         }
-        InfluxDBFluxExpression.appendTagFlux(stringBuilder, tagName);
+        InfluxDBFluxExpression.appendTagFlux(stringBuilder, remoteOpc.getPositionNum());
         InfluxDBFluxExpression.appendTimeShiftFlux(stringBuilder);
         log.info("查询sql :{}", stringBuilder);
         // 通过时间分组  查询时间段的数据
-        List<FluxTable> tables = influxDBClient.getQueryApi().query(stringBuilder.toString());
-        List<Map<String, Object>> list = new ArrayList<>();
-        for (FluxTable table : tables) {
-            List<FluxRecord> records = table.getRecords();
-            for (FluxRecord record : records) {
-                log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
-            }
-        }
+        List<FluxTable> tables = influxDBClient.getQueryApi().query(stringBuilder.toString(), InfluxDBFluxExpression.org);
+        vo.setDataJsonStr(InfluxDbBuilder.bulidRemoteOpcVO(tables));
+        return vo;
     }
 
   /*  @Override

+ 1 - 1
platform-dao/src/main/java/com/platform/dao/util/influxDB/InfluxDBFluxExpression.java

@@ -21,7 +21,7 @@ public class InfluxDBFluxExpression {
     public static final String bucket = "qykh_dcs";
     public static final String measurement = "tb_dcs";
     public static final String tag = "positionNum";
-    public static final String field = "result";
+    public static final String field_result = "result";
 
     /**
      * 通用表达式

+ 8 - 1
platform-dao/src/main/java/com/platform/dao/vo/query/remote/RemoteOpcVO.java

@@ -175,6 +175,13 @@ public class RemoteOpcVO extends BaseVO implements Serializable {
      * 更新日期
      */
     private LocalDateTime updateTime;
-
+    /**
+     * 每日数据的json数组[{time:12:03,
+     *              data:20.0},
+     *            {time:12:03,
+     *              data:20.0}
+     *              *              ]
+     */
+    private String dataJsonStr;
 
 }

+ 3 - 47
platform-opc/src/main/java/com/platform/opc/servie/OpcService.java

@@ -6,30 +6,24 @@ import com.platform.common.util.DateUtils;
 import com.platform.common.util.RedisUtils;
 import com.platform.common.util.StringUtils;
 import com.platform.dao.entity.remote.RemoteOpcLog;
+import com.platform.dao.influxdb.InfluxDBService;
 import com.platform.dao.mapper.remote.RemoteOpcLogMapper;
-import com.platform.dao.mapper.remote.RemoteOpcMapper;
 import com.platform.dao.vo.query.remote.RemoteOpcVO;
-import com.platform.opc.entity.OpcResult;
 import com.platform.opc.util.OpcDAClient;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.jinterop.dcom.common.JIException;
 import org.openscada.opc.lib.da.Group;
 import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 import tk.mybatis.mapper.weekend.Weekend;
 import tk.mybatis.mapper.weekend.WeekendCriteria;
 
-import java.math.BigDecimal;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 @Service("opcService")
@@ -39,6 +33,7 @@ public class OpcService {
 
     private final RemoteOpcLogMapper remoteOpcLogMapper;
 
+    private final InfluxDBService influxDBService;
     /**
      * 1: 分组获取数据
      * a:保存到redis,前端页面实时从数据库获取数据,5秒刷新一次
@@ -60,47 +55,8 @@ public class OpcService {
         if (StringUtils.isNotBlank(jsonStr)) {
             log.info("保存-" + id);
             List<RemoteOpcVO> resultList = JSON.parseArray(jsonStr, RemoteOpcVO.class);
-            List<RemoteOpcLog> addOpcLogList = new ArrayList<>();
-            List<RemoteOpcLog> updateRemoteOpcLogList = new ArrayList<>();
-            Weekend<RemoteOpcLog> weekend = new Weekend<>(RemoteOpcLog.class);
-            WeekendCriteria<RemoteOpcLog, Object> weekendCriteria = weekend.weekendCriteria();
-            // 查询当天是否已经存在了,存在则追加,否则更新
-            LocalDateTime time = LocalDateTime.now();
-            weekendCriteria.andIn(RemoteOpcLog::getPositionNum, resultList.stream().map(RemoteOpcVO::getPositionNum).collect(Collectors.toList()));
-            weekendCriteria.andEqualTo(RemoteOpcLog::getYear, time.getYear());
-            weekendCriteria.andEqualTo(RemoteOpcLog::getMonth, time.getMonthValue());
-            weekendCriteria.andEqualTo(RemoteOpcLog::getDay, time.getDayOfMonth());
-            List<RemoteOpcLog> checkList = remoteOpcLogMapper.selectByExample(weekend);
-            for (RemoteOpcVO result : resultList) {
-                RemoteOpcLog remoteOpcLog = new RemoteOpcLog();
-                remoteOpcLog.setPositionNum(result.getPositionNum());
-                remoteOpcLog.setResult(result.getResult());
-                LocalDateTime localDateTime = DateUtils.strToLocalDateTime(result.getTime(), DateUtils.PATTERN_YMD_HMS);
-                remoteOpcLog.setCreatedTime(localDateTime);
-                remoteOpcLog.setYear(localDateTime.getYear());
-                remoteOpcLog.setMonth(localDateTime.getMonthValue());
-                remoteOpcLog.setDay(localDateTime.getDayOfMonth());
-                // remoteOpcLog.setHour(localDateTime.getHour());
-                // remoteOpcLog.setMinute(localDateTime.getMinute());
-                remoteOpcLog.setRemark(result.getTime().split(" ")[1] + "," + result.getResult() + ";");
-                List<RemoteOpcLog> findItemList = checkList.stream().filter(remoteOpcLog1 -> remoteOpcLog1.getPositionNum().equals(result.getPositionNum())).collect(Collectors.toList());
-                if (!CollectionUtils.isEmpty(findItemList) && findItemList.size() > 0) {
-                    updateRemoteOpcLogList.add(remoteOpcLog);
-                } else {
-                    addOpcLogList.add(remoteOpcLog);
-                }
-            }
-            if (!CollectionUtils.isEmpty(updateRemoteOpcLogList)) {
-                remoteOpcLogMapper.updateBatch(updateRemoteOpcLogList);
-                log.info("更新数量:" + resultList.size());
-            }
-            if (!CollectionUtils.isEmpty(addOpcLogList)) {
-                remoteOpcLogMapper.insertListforComplex(addOpcLogList);
-                log.info("写入数量:" + resultList.size());
-            }
-            log.info("写入/更新数据库数量:" + resultList.size());
+            influxDBService.writeBatch(resultList);
         }
-        log.info("结束保存-" + id);
     }
 
 }

+ 1 - 1
platform-opc/src/main/java/com/platform/opc/servie/OpcTask.java

@@ -108,7 +108,7 @@ public class OpcTask {
      * 1)循环查询点位在当天是否存在记录,如果存在,则追加
      * 2)批量写入数据库,每天的数据追加到一条里面
      */
-    @Scheduled(fixedDelay = 300000)  //间隔300秒,5分钟保存一次数据到数据库,确保每天不超过700万数据
+    @Scheduled(fixedDelay = 10000)  //间隔10秒,保存一次数据到数据库,确保每天不超过700万数据
     public void saveValue() throws JIException {
         String key = RedisUtils.getString(RedisKeyConstants.redis_opc_update_flag);
         String ok = RedisUtils.getString(RedisKeyConstants.redis_opc_update_flag);

+ 74 - 17
platform-rest/src/main/java/com/platform/rest/controller/influxdb/InfluxDBController.java

@@ -1,14 +1,32 @@
 package com.platform.rest.controller.influxdb;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.platform.common.util.BeanConverterUtil;
 import com.platform.common.util.R;
+import com.platform.common.validation.group.UpdateGroup;
+import com.platform.dao.dto.remote.RemoteOpcDTO;
+import com.platform.dao.dto.remote.RemoteOpcLogDTO;
+import com.platform.dao.entity.remote.RemoteOpc;
 import com.platform.dao.influxdb.InfluxDBService;
+import com.platform.dao.util.ExcelUtil;
+import com.platform.dao.vo.export.remote.ExportRemoteOpcLogOneDayVO;
+import com.platform.dao.vo.query.remote.RemoteOpcLogVO;
+import com.platform.dao.vo.query.remote.RemoteOpcVO;
 import com.platform.rest.log.annotation.SysLog;
+import com.platform.service.remote.RemoteOpcService;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.math.RandomUtils;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 
+import javax.servlet.http.HttpServletResponse;
+import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -24,35 +42,74 @@ import java.util.Map;
 public class InfluxDBController {
 
     private final InfluxDBService influxDBService;
+    private final RemoteOpcService remoteOpcService;
 
     /**
-     * 新增记录
+     * 新增点位记录
      *
+     * @param remoteMeasureDTO opc点位对应表DTO
      * @return R
      */
-    @SysLog("新增记录")
-    @PostMapping
-    public R save() {
-        String measurement = "tb_dcs";
-        String tag = "positionNum";
-        String tagName = "TE1001_AV";
-        Map<String, Object> fields = new HashMap<>();
+    @SysLog("新增点位记录")
+    @PutMapping("/{id}")
+    public R update(@PathVariable("id") String id, @Validated({UpdateGroup.class}) @RequestBody RemoteOpcDTO remoteMeasureDTO) {
+        RemoteOpcVO vo = BeanConverterUtil.copyObjectProperties(remoteOpcService.getModelById(id), RemoteOpcVO.class);
         float value = RandomUtils.nextFloat();
-        log.info(value + "");
-        fields.put("result", value);
-        influxDBService.writeOne(tagName, fields);
-        return new R<>("插入成功");
+        vo.setResult(new BigDecimal(value).setScale(2));
+        influxDBService.writeOne(vo);
+        return new R<>();
     }
 
     /**
-     * 获取列表
+     * 新增点位记录
+     * remoteMeasureDTO
      *
+     * @param remoteOpcDTO opc点位对应表DTO
      * @return R
      */
-    @GetMapping("")
-    public R query() {
-        // return new R<>();
-        return null;
+    @SysLog("新增点位记录")
+    @PutMapping("/batch")
+    public R updateBatch(@RequestBody RemoteOpcDTO remoteOpcDTO, @RequestParam(defaultValue = "1") int pageNum, @RequestParam(defaultValue = "20") int pageSize) {
+        List<RemoteOpcVO> voList = remoteOpcService.selectPageList(null, pageNum, pageSize).getRows();
+        for (RemoteOpcVO vo : voList) {
+            float value = RandomUtils.nextFloat();
+            vo.setResult(new BigDecimal(value).setScale(2));
+        }
+        influxDBService.writeBatch(voList);
+        return new R<>("批量新增点位记录成功");
+    }
+
+    /**
+     * 通过positionNum查询数据历史记录
+     *
+     * @param id 主键
+     * @return R
+     */
+    @GetMapping("/{id}/{type}")
+    public R<RemoteOpcVO> getById(@PathVariable("id") String id, @PathVariable("type") Integer type) {
+        return new R<>(influxDBService.select(id, type));
+    }
+
+    /**
+     * opc记录表导出-导出某个点位的某个时间段数据
+     * @return R
+     */
+    @GetMapping("/export/{id}/{type}")
+    @SysLog("opc导出某个点位的某天数据")
+    @PreAuthorize("@pms.hasPermission('remote-opc-logs-export')")
+    public void exportOneDay(HttpServletResponse response, @PathVariable("id") String id, @PathVariable("type") Integer type) {
+        RemoteOpcVO vo = influxDBService.select(id, type);
+        String str = vo.getDataJsonStr();
+        JSONArray array = JSONArray.parseArray(str);
+        List<ExportRemoteOpcLogOneDayVO> list = new ArrayList<>();
+        for (int i = 0; i < array.size(); i++) {
+            JSONObject jsonObject = array.getJSONObject(i);
+            ExportRemoteOpcLogOneDayVO data = new ExportRemoteOpcLogOneDayVO();
+            data.setTime(jsonObject.getString("time"));
+            data.setValue(jsonObject.getString("value"));
+            list.add(data);
+        }
+        ExcelUtil.exportResponseDict(response, ExportRemoteOpcLogOneDayVO.class, list, vo.getSbName());
     }
 
 }