Springboot使用influxDB时序数据库的实现

作者:从此寂静无声 时间:2024-01-18 13:42:10 

目录
  • 引入依赖

  • 配置

  • 构建实体类

  • 保存数据

  • 查询数据

项目中需要存放大量设备日志,且需要对其进行简单的数据分析,信息提取工作.

结合众多考量因素,项目决定使用时序数据库中的领头羊InfluxDB.

引入依赖

项目中使用influxdb-java,在pom文件中添加如下依赖(github地址:https://github.com/influxdata/influxdb-java):


   <dependency>
       <groupId>org.influxdb</groupId>
       <artifactId>influxdb-java</artifactId>
       <version>2.15</version>
   </dependency>

application.yaml文件配置如下所示(请按照实际情况填写):


spring:
 influx:
   url: *
   password: admin
   user: 123
   database: log_management

配置

(1) 创建配置类


@Configuration
public class InfluxDbConfig {

@Value("${spring.influx.url:''}")
   private String influxDBUrl;

@Value("${spring.influx.user:''}")
   private String userName;

@Value("${spring.influx.password:''}")
   private String password;

@Value("${spring.influx.database:''}")
   private String database;

@Bean
   public InfluxDbUtils influxDbUtils() {
       return new InfluxDbUtils(userName, password, influxDBUrl, database, "");
   }
}

@Data
public class InfluxDbUtils {
   private String userName;
   private String password;
   private String url;
   public String database;
   private String retentionPolicy;
   // InfluxDB实例
   private InfluxDB influxDB;

// 数据保存策略
   public static String policyNamePix = "logRetentionPolicy_";

public InfluxDbUtils(String userName, String password, String url, String database,
                        String retentionPolicy) {
       this.userName = userName;
       this.password = password;
       this.url = url;
       this.database = database;
       this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
       this.influxDB = influxDbBuild();
   }

/**
    * 连接数据库 ,若不存在则创建
    *
    * @return influxDb实例
    */
   private InfluxDB influxDbBuild() {
       if (influxDB == null) {
           influxDB = InfluxDBFactory.connect(url, userName, password);
       }
       try {
           createDB(database);
           influxDB.setDatabase(database);
       } catch (Exception e) {
           log.error("create influx db failed, error: {}", e.getMessage());
       } finally {
           influxDB.setRetentionPolicy(retentionPolicy);
       }
       influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
       return influxDB;
   }
}

构建实体类

InfluxDB中,measurement对应于传统关系型数据库中的table(database为配置文件中的log_management).
InfluxDB里存储的数据称为时间序列数据,时序数据有零个或多个数据点.
数据点包括time(一个时间戳),measurement(例如logInfo),零个或多个tag,其对应于level,module,device_id),至少一个field(即日志内容,msg=something error).
InfluxDB会根据tag数值建立时间序列(因此tag数值不能选取诸如UUID作为特征值,易导致时间序列过多,导致InfluxDB崩溃),并建立相应索引,以便优化诸如查询速度.


@Builder
@Data
@Measurement(name = "logInfo")
public class LogInfo {

// Column中的name为measurement中的列名
   // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
   @Column(name = "time")
   private String time;
   // 注解中添加tag = true,表示当前字段内容为tag内容
   @Column(name = "module", tag = true)
   private String module;
   @Column(name = "level", tag = true)
   private String level;
   @Column(name = "device_id", tag = true)
   private String deviceId;
   @Column(name = "msg")
   private String msg;
}

保存数据

以下代码为单条日志保存,influxdb-java亦支持批量保存(因为与InfluxDB通讯均是通过http,因此建议批量保存以减少性能损耗).


   LogInfo logInfo = LogInfo.builder()
       .level(jsonObject.getString("level"))
       .module(module)
       .deviceId(deviceId)
       .msg(jsonObject.getString("msg"))
       .build();
   Point point = Point.measurementByPOJO(logInfo.getClass())
       .addFieldsFromPOJO(logInfo)
       .time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS)
       .build();
   // 出于业务考量,设备可以设置不同的保存策略(策略名为固定前缀+设备ID)
   influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);

查询数据

因为代码与业务耦合比较厉害,因此此处仅截选做概要示范.


   // InfluxDB支持分页查询,因此可以设置分页查询条件
   String pageQuery = " LIMIT " + request.getPageSize() + " OFFSET " + ((request.getPageNum() - 1) * request.getPageSize());
   // 此处查询所有内容,如果
   String queryCmd = "SELECT * FROM "
       // 查询指定设备下的日志信息
       // 要指定从 RetentionPolicyName(保存策略前缀+设备ID).measurement(logInfo) 中查询指定数据)
       + InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo"
       // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
       + queryCondition
       // 查询结果需要按照时间排序
       + " ORDER BY time DESC"
       // 添加分页查询条件
       + pageQuery;

选择时序数据库,不建议使用删除以及更新操作,因此不做介绍.

可以通过创建或者RetentionPolicy,来添加或者更新数据的删除时间.

来源:https://www.cnblogs.com/jason1990/p/11076310.html

标签:Springboot,influxDB
0
投稿

猜你喜欢

  • uniapp实现人脸识别功能的具体实现代码

    2024-04-17 09:57:36
  • python时间日期操作方法实例小结

    2021-03-13 11:01:45
  • asp GetString的用法

    2008-06-12 13:46:00
  • ajax的responseText乱码的问题的解决方法

    2024-06-05 09:21:28
  • numpy数组的重塑和转置实现

    2022-11-10 10:43:18
  • python文件读写代码实例

    2023-08-09 19:29:52
  • windows下mysql忘记root密码的解决方法

    2024-01-13 05:05:28
  • 解决Python列表字符不区分大小写的问题

    2022-09-10 20:43:52
  • Python实现读取Properties配置文件的方法

    2021-01-05 17:28:21
  • 浏览器中的内存泄露

    2008-05-03 16:53:00
  • 详解mysql 组合查询

    2024-01-20 10:43:06
  • pytorch 自定义参数不更新方式

    2021-11-11 01:55:55
  • 解决Golang中goroutine执行速度的问题

    2023-08-25 20:12:12
  • 用python 制作图片转pdf工具

    2023-02-13 09:14:51
  • 用asp编写类似搜索引擎功能的代码

    2008-10-23 15:55:00
  • 如何把中文转换为UNICODE?

    2009-11-07 18:39:00
  • 我的论坛源代码(九)

    2023-11-15 05:50:05
  • Javascript 每日测试 - 第五期 callee及function

    2008-07-10 13:22:00
  • scrapy-splash简单使用详解

    2023-06-02 15:22:37
  • Python走楼梯问题解决方法示例

    2021-07-07 22:57:51
  • asp之家 网络编程 m.aspxhome.com