|
@@ -6,39 +6,42 @@ import com.influxdb.client.domain.WritePrecision;
|
|
|
import com.influxdb.client.write.Point;
|
|
|
import com.influxdb.query.FluxRecord;
|
|
|
import com.influxdb.query.FluxTable;
|
|
|
+import com.platform.dao.enums.RemoteOpcTypeEnum;
|
|
|
import com.platform.dao.util.influxDB.InfluxDBFluxExpression;
|
|
|
import com.platform.dao.vo.query.remote.RemoteOpcVO;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang.math.RandomUtils;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
+import java.math.RoundingMode;
|
|
|
import java.time.Instant;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
public class InfluxDbBuilder {
|
|
|
-
|
|
|
// 数据转化为可以存储的
|
|
|
- public static Point bulidOnePoint(RemoteOpcVO vo){
|
|
|
+ public static Point bulidOnePoint(RemoteOpcVO vo) {
|
|
|
Instant now = Instant.now();
|
|
|
Point point = Point
|
|
|
.measurement(InfluxDBFluxExpression.measurement)
|
|
|
.addTag(InfluxDBFluxExpression.tag, vo.getPositionNum())
|
|
|
.addField(InfluxDBFluxExpression.field_result, vo.getResult())
|
|
|
- .time(now, WritePrecision.NS);
|
|
|
+ .time(vo.getTestInstant(), WritePrecision.NS);
|
|
|
return point;
|
|
|
}
|
|
|
|
|
|
// 数据转化为可以存储的
|
|
|
- public static List<Point> bulidBatchPoint(List<RemoteOpcVO> resultList){
|
|
|
+ public static List<Point> bulidBatchPoint(List<RemoteOpcVO> resultList) {
|
|
|
List<Point> points = new ArrayList<>();
|
|
|
- Instant now = Instant.now();
|
|
|
- for(RemoteOpcVO vo: resultList){
|
|
|
+ for (RemoteOpcVO vo : resultList) {
|
|
|
Point point = Point
|
|
|
.measurement(InfluxDBFluxExpression.measurement)
|
|
|
.addTag(InfluxDBFluxExpression.tag, vo.getPositionNum())
|
|
|
.addField(InfluxDBFluxExpression.field_result, vo.getResult())
|
|
|
- .time(now, WritePrecision.NS);
|
|
|
+ .time(vo.getTestInstant(), WritePrecision.NS);
|
|
|
points.add(point);
|
|
|
}
|
|
|
return points;
|
|
@@ -46,24 +49,58 @@ public class InfluxDbBuilder {
|
|
|
|
|
|
/**
|
|
|
* 数据转化为可以表格展示、chart展示的
|
|
|
+ * 最多5000个点位,取样点位,但是导出的时候不取样
|
|
|
* 只能查询某一个点位的历史数据
|
|
|
+ *
|
|
|
* @param tables
|
|
|
* @return
|
|
|
*/
|
|
|
- public static String bulidRemoteOpcVO(List<FluxTable> tables ){
|
|
|
- JSONArray array = new JSONArray();
|
|
|
- Instant now = Instant.now();
|
|
|
+ public static String bulidRemoteOpcVO(List<FluxTable> tables) {
|
|
|
+ List<RemoteOpcVO> list = new ArrayList<>();
|
|
|
for (FluxTable table : tables) {
|
|
|
List<FluxRecord> records = table.getRecords();
|
|
|
for (FluxRecord record : records) {
|
|
|
RemoteOpcVO vo = new RemoteOpcVO();
|
|
|
- log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
|
|
|
- JSONObject object = new JSONObject();
|
|
|
+ vo.setTime(record.getTime().toString());
|
|
|
+ vo.setResult(new BigDecimal(record.getValue().toString()));
|
|
|
+ list.add(vo);
|
|
|
+ // log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
|
|
|
+ /*JSONObject object = new JSONObject();
|
|
|
object.put("time",record.getTime().toString() );
|
|
|
object.put("value",new BigDecimal(record.getValue().toString()));
|
|
|
- array.add(object);
|
|
|
+ array.add(object);*/
|
|
|
}
|
|
|
}
|
|
|
+ if(list.size()>5000){
|
|
|
+ log.info("collect.size(): " + list.size());
|
|
|
+ Collections.shuffle(list);
|
|
|
+ list = list.stream().limit(5000).collect(Collectors.toList());
|
|
|
+ log.info("collect.size(): " + list.size());
|
|
|
+ list.sort(Comparator.comparing(RemoteOpcVO::getTime));
|
|
|
+ }
|
|
|
+ JSONArray array = new JSONArray();
|
|
|
+ for (RemoteOpcVO record : list) {
|
|
|
+ JSONObject object = new JSONObject();
|
|
|
+ object.put("time", record.getTime());
|
|
|
+ object.put("value", record.getResult());
|
|
|
+ array.add(object);
|
|
|
+ }
|
|
|
return array.toJSONString();
|
|
|
}
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ int total= 1000000;
|
|
|
+ List<RemoteOpcVO> voList = new ArrayList<>();
|
|
|
+ for(int i =0;i<total;i++){
|
|
|
+ RemoteOpcVO test = new RemoteOpcVO();
|
|
|
+ test.setPositionNum("11111111");
|
|
|
+ float value = RandomUtils.nextFloat();
|
|
|
+ test.setResult(new BigDecimal(value).setScale(2, RoundingMode.HALF_UP));
|
|
|
+ voList.add(test);
|
|
|
+ }
|
|
|
+ log.info("collect.size(): " + voList.size());
|
|
|
+ Collections.shuffle(voList);
|
|
|
+ List<RemoteOpcVO> newList1 =voList.stream().limit(5000).collect(Collectors.toList());
|
|
|
+ log.info("collect.size(): " + newList1.size());
|
|
|
+ }
|
|
|
}
|