醋醋百科网

Good Luck To You!

Spring Boot 中使用 InfluxDB (InfluxQL 方式) 详解

Spring Boot 中使用 InfluxDB (InfluxQL 方式) 详解

InfluxDB 是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等。下面详细介绍如何在 Spring Boot 项目中集成 InfluxDB 并使用 InfluxQL 进行数据操作。

1. 添加依赖

首先,在 pom.xml 中添加必要的依赖:

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- InfluxDB Java Client -->
    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.23</version>
    </dependency>
    
    <!-- 可选,用于测试 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. 配置 InfluxDB 连接

在 application.properties 或 application.yml 中配置 InfluxDB 连接信息:

# application.properties
influxdb.url=http://localhost:8086
influxdb.username=admin
influxdb.password=admin
influxdb.database=mydb

或者 YAML 格式:

# application.yml
influxdb:
  url: http://localhost:8086
  username: admin
  password: admin
  database: mydb

3. 创建配置类

创建一个配置类来初始化 InfluxDB 连接:

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InfluxDbConfig {
    
    @Value("${influxdb.url}")
    private String influxDbUrl;
    
    @Value("${influxdb.username}")
    private String username;
    
    @Value("${influxdb.password}")
    private String password;
    
    @Value("${influxdb.database}")
    private String database;
    
    @Bean
    public InfluxDB influxDB() {
        InfluxDB influxDB = InfluxDBFactory.connect(influxDbUrl, username, password);
        influxDB.setDatabase(database);
        // 设置日志级别
        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
        return influxDB;
    }
}

4. 创建实体类

InfluxDB 中的数据点由 measurement(类似表)、tags(索引字段)、fields(数据字段)和时间戳组成:

import java.util.concurrent.TimeUnit;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

@Measurement(name = "cpu_usage", timeUnit = TimeUnit.MILLISECONDS)
public class CpuUsage {
    
    @Column(name = "time")
    private String time;
    
    @Column(name = "host", tag = true)
    private String host;
    
    @Column(name = "region", tag = true)
    private String region;
    
    @Column(name = "usage")
    private Double usage;
    
    // getters and setters
    public String getTime() { return time; }
    public void setTime(String time) { this.time = time; }
    public String getHost() { return host; }
    public void setHost(String host) { this.host = host; }
    public String getRegion() { return region; }
    public void setRegion(String region) { this.region = region; }
    public Double getUsage() { return usage; }
    public void setUsage(Double usage) { this.usage = usage; }
}

5. 创建 Repository 类

创建一个操作 InfluxDB 的 Repository 类:

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Repository
public class InfluxDbRepository {
    
    @Autowired
    private InfluxDB influxDB;
    
    private final InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
    
    /**
     * 写入单条数据
     */
    public void insert(CpuUsage cpuUsage) {
        Point point = Point.measurement("cpu_usage")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .tag("host", cpuUsage.getHost())
                .tag("region", cpuUsage.getRegion())
                .addField("usage", cpuUsage.getUsage())
                .build();
        
        influxDB.write(point);
    }
    
    /**
     * 批量写入数据
     */
    public void batchInsert(List<CpuUsage> cpuUsages) {
        BatchPoints batchPoints = BatchPoints.database(influxDB.getDatabase())
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
        
        for (CpuUsage cpuUsage : cpuUsages) {
            Point point = Point.measurement("cpu_usage")
                    .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                    .tag("host", cpuUsage.getHost())
                    .tag("region", cpuUsage.getRegion())
                    .addField("usage", cpuUsage.getUsage())
                    .build();
            batchPoints.point(point);
        }
        
        influxDB.write(batchPoints);
    }
    
    /**
     * 查询数据 - 使用 InfluxQL
     */
    public List<CpuUsage> queryCpuUsage(String host, long startTime, long endTime) {
        String queryString = String.format("SELECT * FROM cpu_usage WHERE host='%s' AND time >= %dms AND time <= %dms", 
                host, startTime, endTime);
        
        Query query = new Query(queryString, influxDB.getDatabase());
        QueryResult queryResult = influxDB.query(query);
        
        return resultMapper.toPOJO(queryResult, CpuUsage.class);
    }
    
    /**
     * 创建数据库
     */
    public void createDatabase(String dbName) {
        influxDB.query(new Query("CREATE DATABASE " + dbName));
    }
    
    /**
     * 删除数据库
     */
    public void deleteDatabase(String dbName) {
        influxDB.query(new Query("DROP DATABASE " + dbName));
    }
    
    /**
     * 创建保留策略
     */
    public void createRetentionPolicy(String policyName, String duration, int replication) {
        String query = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %d DEFAULT", 
                policyName, influxDB.getDatabase(), duration, replication);
        influxDB.query(new Query(query));
    }
}

6. 创建 Service 类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CpuUsageService {
    
    @Autowired
    private InfluxDbRepository influxDbRepository;
    
    public void saveCpuUsage(CpuUsage cpuUsage) {
        influxDbRepository.insert(cpuUsage);
    }
    
    public void batchSaveCpuUsage(List<CpuUsage> cpuUsages) {
        influxDbRepository.batchInsert(cpuUsages);
    }
    
    public List<CpuUsage> getCpuUsageByHost(String host, long startTime, long endTime) {
        return influxDbRepository.queryCpuUsage(host, startTime, endTime);
    }
}

7. 创建 Controller

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CpuUsageService {
    
    @Autowired
    private InfluxDbRepository influxDbRepository;
    
    public void saveCpuUsage(CpuUsage cpuUsage) {
        influxDbRepository.insert(cpuUsage);
    }
    
    public void batchSaveCpuUsage(List<CpuUsage> cpuUsages) {
        influxDbRepository.batchInsert(cpuUsages);
    }
    
    public List<CpuUsage> getCpuUsageByHost(String host, long startTime, long endTime) {
        return influxDbRepository.queryCpuUsage(host, startTime, endTime);
    }
}

8. 常用 InfluxQL 查询示例

以下是一些常用的 InfluxQL 查询示例,可以在 Repository 中添加相应方法:

// 1. 查询最近一小时的数据
public List<CpuUsage> findRecentOneHour() {
    String query = "SELECT * FROM cpu_usage WHERE time > now() - 1h";
    QueryResult result = influxDB.query(new Query(query, database));
    return resultMapper.toPOJO(result, CpuUsage.class);
}

// 2. 按时间范围查询
public List<CpuUsage> findByTimeRange(long start, long end) {
    String query = String.format("SELECT * FROM cpu_usage WHERE time >= %dms AND time <= %dms", start, end);
    QueryResult result = influxDB.query(new Query(query, database));
    return resultMapper.toPOJO(result, CpuUsage.class);
}

// 3. 分组查询
public List<CpuUsage> findGroupByHost() {
    String query = "SELECT MEAN(usage) FROM cpu_usage GROUP BY host";
    QueryResult result = influxDB.query(new Query(query, database));
    return resultMapper.toPOJO(result, CpuUsage.class);
}

// 4. 条件查询
public List<CpuUsage> findByHostAndUsageGreaterThan(String host, double threshold) {
    String query = String.format("SELECT * FROM cpu_usage WHERE host='%s' AND usage > %f", host, threshold);
    QueryResult result = influxDB.query(new Query(query, database));
    return resultMapper.toPOJO(result, CpuUsage.class);
}

// 5. 分页查询
public List<CpuUsage> findWithLimit(int limit, int offset) {
    String query = String.format("SELECT * FROM cpu_usage LIMIT %d OFFSET %d", limit, offset);
    QueryResult result = influxDB.query(new Query(query, database));
    return resultMapper.toPOJO(result, CpuUsage.class);
}

9. 测试

创建一个测试类来验证功能:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Arrays;
import java.util.List;

@SpringBootTest
public class InfluxDbTest {
    
    @Autowired
    private CpuUsageService cpuUsageService;
    
    @Test
    public void testInsertAndQuery() {
        // 插入单条数据
        CpuUsage cpu1 = new CpuUsage();
        cpu1.setHost("server1");
        cpu1.setRegion("north");
        cpu1.setUsage(0.65);
        cpuUsageService.saveCpuUsage(cpu1);
        
        // 批量插入
        CpuUsage cpu2 = new CpuUsage();
        cpu2.setHost("server2");
        cpu2.setRegion("south");
        cpu2.setUsage(0.72);
        
        CpuUsage cpu3 = new CpuUsage();
        cpu3.setHost("server3");
        cpu3.setRegion("east");
        cpu3.setUsage(0.58);
        
        cpuUsageService.batchSaveCpuUsage(Arrays.asList(cpu2, cpu3));
        
        // 查询
        long now = System.currentTimeMillis();
        long oneHourAgo = now - 3600 * 1000;
        List<CpuUsage> results = cpuUsageService.getCpuUsageByHost("server1", oneHourAgo, now);
        
        System.out.println("查询结果:");
        results.forEach(System.out::println);
    }
}

10. 注意事项

  1. 时间戳处理:InfluxDB 中的时间戳非常重要,确保正确设置时间戳和时区。
  2. 批量写入:对于大量数据,使用批量写入可以提高性能。
  3. 保留策略:根据数据保留需求设置合适的保留策略。
  4. 连接管理:生产环境中需要处理连接池和连接超时等问题。
  5. 错误处理:添加适当的错误处理逻辑,特别是网络问题和查询语法错误。
  6. 性能优化:对于高频写入场景,可以启用批处理和缓冲机制。

11. 高级用法

11.1 连续查询(Continuous Queries)

public void createContinuousQuery(String cqName, String sourceMeasurement, 
                                String targetMeasurement, String interval) {
    String query = String.format(
            "CREATE CONTINUOUS QUERY \"%s\" ON \"%s\" " +
            "BEGIN SELECT MEAN(usage) INTO \"%s\" FROM \"%s\" GROUP BY time(%s) END",
            cqName, database, targetMeasurement, sourceMeasurement, interval);
    influxDB.query(new Query(query));
}

11.2 用户管理

public void createUser(String username, String password, boolean admin) {
    String query = String.format("CREATE USER \"%s\" WITH PASSWORD '%s'", username, password);
    if (admin) {
        query += " WITH ALL PRIVILEGES";
    }
    influxDB.query(new Query(query));
}

11.3 数据备份与恢复

// 备份数据
public void backup(String backupPath) {
    String query = String.format("BACKUP DATABASE \"%s\" TO \"%s\"", database, backupPath);
    influxDB.query(new Query(query));
}

// 恢复数据
public void restore(String backupPath) {
    String query = String.format("RESTORE FROM \"%s\"", backupPath);
    influxDB.query(new Query(query));
}

通过以上步骤,您已经可以在 Spring Boot 项目中成功集成 InfluxDB 并使用 InfluxQL 进行数据操作了。根据实际需求,您可以进一步扩展和优化这些基础功能。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言