Influxdb基础
作者:互联网
Influxdb基础
InfluxDB是目前流行的时间序列数据库(TSDB,常见 TSDB:Influxdb、opentsdb、timeScaladb、Druid 等),时间序列数据库最简单的定义是数据格式里包含Timestamp字段的数据,比如某一时间环境的温度,CPU的使用率等,几乎所有的数据都可以打上一个Timestamp字段。时间序列数据更重要的一个属性是如何去查询它,包括数据的过滤,计算等等。
Influxdb是一个开源的分布式时序、时间和指标数据库,使用go语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展,是 InfluxData 的核心产品。
特性:
- 时序性(Time Series):与时间相关的函数的灵活使用(诸如最大、最小、求和等)。
- 度量(Metrics):对实时大量数据进行计算。
- 事件(Event):支持任意的事件数据,换句话说,任意事件的数据我们都可以做操作。
特点:
- schemaless(无结构),可以是任意数量的列;
- min, max, sum, count, mean, median 一系列函数,方便统计;
- Native HTTP API, 内置http支持,使用http读写;
- Powerful Query Language 类似sql;
- Built-in Explorer 自带管理工具。
- 基于时间序列,支持与时间有关的相关函数(如最大,最小,求和等);
- 可度量性:你可以实时对大量数据进行计算;
- 基于事件:它支持任意的事件数据;
- 无结构(无模式):可以是任意数量的列;
- 支持min, max, sum, count, mean, median 等一系列函数;
- 内置http支持,使用http读写;
- 强大的类SQL语法;
- 自带管理界面,方便使用(新版本需要通过Chronograf)
Influxdb 完整的上下游产业还包括:Chronograf、Telegraf、Kapacitor,其具体作用及关系如下:
Influxdb与传统数据库的区别
point的数据结构由时间戳(time)、标签(tags)、数据(fields)三部分组成,具体含义如下:
Influxdb还有个特有的概念:series(一般由:retention policy, measurement, tagset就共同组成),其含义如下:
所有在数据库中的数据,都需要通过图表来展示,而这个series表示这个表里面的数据,可以在图表上画成几条线:通过tags排列组合算出来。
需要注意的是,influxdb不需要像传统数据库一样创建各种表,其表的创建主要是通过第一次数据插入时自动创建,如下:
insert mytest, server=serverA count=1,name=5 //自动创建表
“mytest”,“server” 是 tags,“count”、“name” 是 fields
fields 中的 value 基本不用于索引
保留策略(retention policy)
- 每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久,在集群中的副本个数为1,之后用户可以自己设置(查看、新建、修改、删除),例如保留最近2小时的数据。插入和查询数据时如果不指定存储策略,则使用默认存储策略,且默认存储策略可以修改。InfluxDB 会定期清除过期的数据。
- 每个数据库可以有多个过期策略:
show retention policies on “db_name” - Shard 在 influxdb中是一个比较重要的概念,它和 retention policy 相关联。每一个存储策略下会存在许多 shard,每一个 shard 存储一个指定时间段内的数据,并且不重复,例如 7点-8点 的数据落入 shard0 中,8点-9点的数据则落入 shard1 中。每一个 shard 都对应一个底层的 tsm 存储引擎,有独立的 cache、wal、tsm file。
这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。 - 建议在数据库建立的时候设置存储策略,不建议设置过多且随意切换
create database testdb2 with duration 30d
存储引擎(Timestamp-Structure Merge Tree)
TSM是在LSM的基础上优化改善的,引入了serieskey的概念,对数据实现了很好的分类组织。TSM主要由四个部分组成: cache、wal、tsm file、compactor:
- cache:插入数据时,先往 cache 中写入再写入wal中,可以认为 cache 是 wal 文件中的数据在内存中的缓存,cache 中的数据并不是无限增长的,有一个 maxSize 参数用于控制当 cache 中的数据占用多少内存后就会将数据写入 tsm 文件。如果不配置的话,默认上限为 25MB
- wal:预写日志,对比MySQL的 binlog,其内容与内存中的 cache 相同,作用就是为了持久化数据,当系统崩溃后可以通过 wal 文件恢复还没有写入到 tsm 文件中的数据,当 InfluxDB 启动时,会遍历所有的 wal 文件,重新构造 cache。
- tsm file:每个 tsm 文件的大小上限是 2GB。当达到 cache-snapshot-memory-size,cache-max-memory-size 的限制时会触发将 cache 写入 tsm 文件。
- compactor:主要进行两种操作,一种是 cache 数据达到阀值后,进行快照,生成一个新的 tsm 文件。另外一种就是合并当前的 tsm 文件,将多个小的 tsm 文件合并成一个,减少文件的数量,并且进行一些数据删除操作。 这些操作都在后台自动完成,一般每隔 1 秒会检查一次是否有需要压缩合并的数据。
存储目录
influxdb的数据存储有三个目录,分别是meta、wal、data:
- meta 用于存储数据库的一些元数据,meta 目录下有一个 meta.db 文件;
- wal 目录存放预写日志文件,以 .wal 结尾;
- data 目录存放实际存储的数据文件,以 .tsm 结尾。
Influxdb和其他时序数据库比较
从部署、集群、资源占用、存储模型、性能等方面比较influxdb和opentsdb,具体如下:
访问方式
influxdb访问本质上都是 HTTP 方式,具体有如下:
客户端命令行
HTTP API 接口
各语言API 库(对 go 语言 API 封装)
基于 WEB 管理页面操作
连续查询
influxdb 的连续查询是在数据库中自动定时启动的一组语句,语句中必须包含
SELECT 等关键词。influxdb 会将查询结果放在指定的数据表中。
目的:使用连续查询是最优的降低采样率的方式,连续查询和存储策略搭配使用将会大大降低 InfluxDB 的系统占用量。而且使用连续查询后,数据会存放到指定的数据表中,这样就为以后统计不同精度的数据提供了方便。
CREATE CONTINUOUS QUERY wj_30m ON shhnwangjian
BEGIN
SELECT mean(connected_clients), MEDIAN(connected_clients),
MAX(connected_clients), MIN(connected_clients)
INTO redis_clients_30m
FROM redis_clients
GROUP BY ip,port,time(30m)
END
/*在shhnwangjian库中新建了一个名为 wj_30m 的连续查询,
每三十分钟取一个connected_clients字段的平均值、中位值、最大值、最小值 redis_clients_30m 表中,
使用的数据保留策略都是 default。*/
当数据超过保存策略里指定的时间之后就会被删除,但是这时候可能并不想数据被完全删掉,可以使用连续查询将数据聚合储存。
操作优化
控制 series 的数量;
使用批量写;
使用恰当的时间粒度;
存储的时候尽量对 Tag 进行排序;
根据数据情况,调整 shard 的 duration;
无关的数据写不同的database;
控制 Tag Key, 与 Tag Value 值的大小;
存储分离 ,将 wal 目录与 data 目录分别映射到不同的磁盘上,以减少读写操作的相互影响。
go 语言开发
go语言开发只需要一个依赖包:github.com/influxdata/influxdb/client/v2,需要注意是v1.8版本,直接clone会失败,
可先到:github.com/influxdata/influxdb中选择版本号V1.8,然后clone下载
对influxdb的操作主要有连接、插入、查询、关闭等几个步骤,其中查询的时候需要注意时间,要设置相应的时区,不然可能显示的时间结果不同.
import (
"github.com/influxdata/influxdb/client/v2"
...
)
//连接influxdb
func ConnectInflux()(client.Client, error){
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr:"http://localhost:8086",
Username:username,
Password:password,
})
if nil != err{
fmt.Println(err)
return nil, err
}
return conn, nil
}
//写入point
func WritePoints(con client.Client)error{
batchpoint ,err := client.NewBatchPoints(client.BatchPointsConfig{
Precision:"s",
Database:MyDB,
})
if nil != err{
fmt.Println(err)
return err
}
record := Record{AssertId:"assert_aaaaa", ModelId:"model0", PoinntId:"point1",
ModelPath:"model0/model1/point1", Attr:"", ModelTime:"123456789"}
tags := map[string]string{Tag1:record.AssertId, Tag2:record.ModelId}
fields := map[string]interface{}{Field1:record.PoinntId, Field2:record.ModelPath,
Field3:record.Attr, Field4:record.ModelTime}
point, err := client.NewPoint(Measurement, tags, fields, time.Now())
if nil != err{
fmt.Println(err)
return err
}
batchpoint.AddPoint(point)
if err := con.Write(batchpoint); err != nil{
fmt.Println(err)
return err
}
}
//查询时要注意时区,东八区设置为:tz('Asia/Shanghai'),命令行需要:precision rfc3339
query := fmt.Sprintf("select * from %s limit %d tz('Asia/Shanghai')", Measurement, 5)
res, err := querydb(conn, query)
需要注意的是,influxdb虽然很多时候都可以通过SQL语句操作,但数据更新例外。influxdb中数据更新只能重新Insert,且需要tags和时间戳相同,所以不建议大量更新数据。
极少出现删除数据的情况,删除数据基本都是清理过期数据。
参考文章:
InfluxDB-介绍
InfluxDB 入门
标签:存储,tsm,err,cache,基础,influxdb,Influxdb,数据 来源: https://blog.csdn.net/weixin_41924879/article/details/111354187