1. 前言

调试物联网设备和排查问题时,历史数据是很重要的一环,自己遇到过和解决的问题有:

  • 数据入库:并发较少时直接入库,数据量较大时使用队列缓冲数据
  • 历史数据滚动删除:采取单表、单表分区、分表分库等方案,定时自动创建与删除数据表、分区或数据库
  • 历史数据查询与分页:通过查询条件缩小数据集合,尽可能将查询转化为线性查询,利用数据库缓存优化后续查询
  • 历史数据更新:使用消息ID、设备ID、时间戳等命中索引,执行更新

目前主要使用PostgreSQL 10数据库,下面的内容也会以此展开。

2. 基础字段

物联网设备至少会拥有一个ID,以LoRaWAN为例:

  • 节点设备有devEUI(长度64位,可表示为16位16进制字符串)
  • 网关设备有gatewayID(长度64位,可表示为16位16进制字符串)
  • 每一条数据都有创建时间(createtime)和消息ID(msgID)

因此可以设计出以下的历史数据表结构:

id dev_eui/gateway_id createtime
1 1234567890123456 2019-10-01 00:40:41.431306
2 0034567890123456 2019-10-02 00:40:41.431306
3 1234567890123456 2019-10-03 00:40:41.431306
4 0034567890123456 2019-10-04 00:40:41.431306

我们可以:

  • 通过dev_eui/gateway_id来筛选指定设备的历史数据
  • 通过消息ID(msgID)来获取一条完整的历史数据
  • 通过数据创建时间(createtime)按日期筛选和排序数据
  • 结合三个参数执行粒度更细的操作,例如
    • 获取某个设备的在过去10天内的前100条数据
    • 将包含消息ID、设备ID和创建时间的的简化数据传递给前端,前端需要访问完整数据时将这些参数传递给后台获取数据
    • 按消息ID和创建时间检索数据执行更新

2.1 设备ID

在LoRaWAN中,设备ID都使用了EUI64的格式。这个长度为64位的ID可以使用8个字节以二进制的格式存储,对应PostgreSQL的bytea类型,也可以使用16个字节以字符串格式存储,对应PostgreSQL的char类型。为了便于维护,最后选择了使用字符串格式存储设备ID。

设备ID与用户ID关联,从而支持多用户的结构和HTTP接口权限验证。历史数据用设备ID标记,删除设备时,只解除业务层和协议层的关联,不删除历史数据,避免大量删除数据时导致性能下降。

2.2 消息ID

PostgreSQL内置的数据类型bigserial可用来自动生成序列号,长度64位,取值范围1~9223372036854775807,插入数据时缺省值为下一个序列值。

在实现webscket展示实时数据时,就利用到了消息ID。设备上行的数据首先入库,获取到一个msgID,将消息ID、设备ID、时间戳及其他字段打包成一个体积较小的数据包写入websocket,确保实时性。若用户需要查看某一个消息详情时,就会利用这三个参数获取完整数据。

2.3 时间戳

PostgresSQL内置的了两种时间戳类型

  • timestamp without time zone:长度64位,精度为微秒,可记录西元4713年至公元294276年,使用UTC时区展示
  • timestamp with time zone:长度64位,精度为微秒,可记录西元4713年至公元294276年,使用数据库配置的时区展示

时间戳是一个绝对数值,无论在哪一个时区,同一时刻获取的Unix时间都是一致的。时区不影响时间戳数值的存取,但会影响时间戳的转换,例如分别使用东八区和东七区的时区来解析2019-10-04 00:40:41,得到的Unix时间戳就相差一个小时。

为了简化时间转换,我们一开始就选择了timestamp without time zone类型,并确保:

  • 所有的时间戳数值存取使用UTC时区
  • 数据入库前截断原始时间戳数据,舍弃微秒单位后的数值
  • 前端根据PC设置的时区展示时间控件
  • 前后端使用精度为毫秒的Unix时间戳传参

所有的业务使用UTC时间戳,前端根据本地时区展示数据。

PostgreSQL 10时间类型文档:https://www.postgresql.org/docs/10/datatype-datetime.html

3. PostgreSQL分区方案

内置的数据分区功能在PostgreSQL 10引入,在PostgreSQL 11中得到完善,分区相对于分表的好处在于不用处理跨分区的数据查询与插入,对用户代码透明,只需要操作分区所属的表即可。

关于PostgreSQL的文章当属德哥的最多,可以在他的GitHub博客上看到,这里只简单记录下自己的思考。

3.1 PosgreSQL 9.6

PostgreSQL 9.6是我第一个接触的PostgreSQL数据库版本,当时只给loraserver做数据存储使用,在这个版本下可用的分区手段有

前者需要自行安装拓展插件,如果是一些云服务提供的数据库,可能无法支持。后者需要手写大量的规则,需要的操作如下

  • 创建主表,所有分区继承该表
  • 创建子表,添加约束定义每个分区中允许的键值
  • 在每个分区上添加所需的索引
  • 定义触发器,将插入主表的数据重定向到相应的分区
  • 在分区增删时刷新触发器

总的来说,都很复杂,如果当时需要在PostgreSQL 9.6版本做数据分区的话,我可能会直接选择分表,然后在代码中利用时间戳的年月日来处理数据的插入与查询重定向。

3.2 PostgreSQL 10

PostgreSQL 10的分区使用起来还是有继承的影子,需要的操作如下

  • 创建主表,可选range和list分区
  • 创建分区,定义每个分区中允许的键值
  • 在每个子表上添加所需的索引

我关注的内置的分区表特性如下

  • 自动处理了数据插入路由,无需手写触发器,但找不到分区时会报错
  • 删除分区时直接drop对应的分区表名即可,比直接删除数据快很多
  • 不可在主表上添加索引,需要在每个子表上逐一添加

总的来说,比PostgreSQL 9.6简化很多,实际应用时分区表和索引的添加放在一块处理,也没有太多的麻烦,更多的相关文章如下:

3.3 PostgreSQL 11及后续版本

PostgreSQL 11于2018年10月18日发布,在分区方面带来了以下改善

  • 支持哈希分区
  • 支持缺省分区,保存不含有分区键值的记录
  • 支持在主表执行创建主键、外键、索引和触发器时自动应用所有分区表
  • 性能提升

也就是说在这个版本下,我们所需的操作只有

  • 创建主表,添加索引
  • 创建缺省分区,存放不含有分区键值的记录
  • 创建分区,定义每个分区中允许的键值

更多的相关文章如下

4. 迭代过程

历史数据的消息ID、设备ID及创建时间在一开始就固定下来并添加了索引,但是随业务需求和数据量的增加,对查询和更新的优化手段也在改变,演变如下。

4.1 第一阶段

直接使用设备ID筛选数据,按消息ID倒序排列,使用offset和limit字段确定返回的分页

所有历史数据的查询语句都类似如下的格式,其中pageNo和pageSize由前端计算生成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
select 
  xxx,
  xxx,
  xxx 
from 
  device_uplink 
where 
  dev_eui='xxxxxxxxxxxxxxxx' 
order by id desc 
  offset (pageNo-1)*pageSize limit pageSize;

然后使用count语句计算总数据量

1
select count(id) from device_uplink where dev_eui='xxxxxxxxxxxxxxxx';

这样的查询方式

  • 最简单的线性查找
  • 第一个分页的内容随上行数据更新
  • 第二页及以后的分页内容受上行的数据影响,刷新时会变动
  • device_uplink表数据量增加到千万级后,查询总时长明显增加
  • offset越大,需要略过的数据越多,查询越慢
  • 数据量过大,对数据库缓存不友好

这一个阶段也尝试过一些优化手段

  • 子查询与CTE:首先按设备ID筛选出所有数据,然后按offset和limit取分页
  • 数据总量估计:创建一个函数,从历史的EXPLAIN查询中获取大致的数据总量
  • 使用消息ID作为游标:无法准确计算分页数量,只能展示上一页及下一页的信息

然而还是无法解决数据太多的问题。

4.2 第二阶段

按创建时间过滤数据

LoRaWAN适合低速通信、大量设备的环境,某种程度上与Nginx类似(适合大量不活跃连接的场景),但是LoRaWAN也支持高速、低延迟的场景。在高速场景下每天每个网关设备可以上传高达二十几万条的数据,累积一段时间就可以达到百万级。如果直接查询通过偏移来获取分页,就会出现分页越取越慢,最后几乎停滞。

若将查询的时间段固定,除了减少数据总量外,还可以利用到数据库缓存。于是新的历史数据的查询语句如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
select 
  xxx,
  xxx,
  xxx 
from 
  device_uplink 
where 
  dev_eui = 'xxxxxxxxxxxxxxxx' 
  and 
  createtime between 'xxxx-xx-xx xx:xx:xx' and 'xxxx-xx-xx xx:xx:xx'
order by createtime desc 
  offset (pageNo-1)*pageSize limit pageSize;

然后使用count语句计算总数据量

1
2
3
4
5
6
7
8
select 
  count(createtime) 
from 
  device_uplink 
where 
  dev_eui = 'xxxxxxxxxxxxxxxx' 
  and 
  createtime between 'xxxx-xx-xx xx:xx:xx' and 'xxxx-xx-xx xx:xx:xx';

这样的查询方式

  • 前端需要传递起始和结束时间
  • 使用指定时间段内的数据进行分页,切换分页时只要起始和结束时间不变,分页的内容不会因为节点上行数据而变动
  • 若指定时间段内的数据总量不大,首次查询会得到缓存,进而加快后续查询
  • 以创建时间代替消息ID排序,消息创建时间在入库前就确定了,假设入库时阻塞延时了,也能够保证查询时顺序展示

于是来到了最后一个问题,如何快速、大量地删除历史数据?

4.3 第三阶段

按创建时间执行数据分区

存储所有历史数据是为了方便排查问题,虽然用户删除设备时一并删除历史数据,但数据库的磁盘空间不是无限的,还需要定期删除一些过期数据,如下所示

1
2
delete from device_uplink where dev_eui = 'xxxxxxxxxxxxxxxx';
delete from device_uplink where createtime < 'xxxx-xx-xx xx:xx:xx';

这样删除数据时

  • 由于删除设备时一并删除关联的记录,对用户来说,历史数据删除的时间也体现在设备删除中
  • 数据删除时间随数据量增加,线上测试环境一周就可以累积700万条左右的数据,这部分删除工作之前还是手动维护的
  • 删除操作会锁全表,其他读写操作会被挂起
  • 已删除的数据需要等待自动vacuum触发时才能释放磁盘空间,因为在业务层有一些会查询历史数据的操作,这部分数据被一些事务引用,迟迟得不到释放

在重构历史数据存储时,有几个目标

  • 删除设备时只解除业务层的关联,避免删除历史数据
  • 历史数据按天分区或分表,程序启动的时自动创建分区
  • 定时任务自动创建和删除数据,
  • 删除数据时直接删除指定的分区或表,避免长时间锁表

由于去年阿里云上只有PostgreSQL 10可选,因此只能基于它已有的分区功能实现

  • 通过github.com/rubenv/sql-migrate包执行数据表迁移
  • 通过code migration执行数据迁移
  • 通过github.com/robfig/cron包执行定时任务

以下是数据表自动创建和迁移逻辑:

(1)执行数据库迁移,创建分区主表

之前提到的历史数据表结构如下:

id dev_eui/gateway_id createtime
1 1234567890123456 2019-10-01 00:40:41.431306
2 0034567890123456 2019-10-02 00:40:41.431306
3 1234567890123456 2019-10-03 00:40:41.431306
4 0034567890123456 2019-10-04 00:40:41.431306

如下所示

1
2
3
4
5
6
create table device_uplink_v2 (
	id bigserial not null,
	dev_eui char(16) not null,
	...
	createtime timestamp
) partition by range (createtime);

利用PostgreSQL 10的range分区创建所有的主表

(2)执行code migration

code migration的逻辑取自broccar的代码,如下所示

创建code_migration

1
2
3
4
create table code_migration (
    id text primary key,
    applied_at timestamp with time zone not null
);

Migrate函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func Migrate(db *common.DBLogger, name string, f func() error) error {
	return storage.Transaction(db, func(tx sqlx.Ext) error {
		_, err := tx.Exec(`lock table code_migration`)
		if err != nil {
			return errors.Wrap(err, "lock code migration table")
		}

		res, err := tx.Exec(`
			insert into code_migration (
				id,
				applied_at
			) values ($1, $2)
			on conflict
				do nothing
		`, name, time.Now())
		if err != nil {
			switch err := err.(type) {
			case *pq.Error:
				switch err.Code.Name() {
				case "unique_violation":
					return nil
				}
			}

			return err
		}

		ra, err := res.RowsAffected()
		if err != nil {
			return err
		}

		if ra == 0 {
			return nil
		}

		return f()
	})
}

首先主动锁表,然后在一个事务中创建数据库记录,并执行传入的函数,这个函数即为我们的迁移代码。

在迁移代码中,我们执行以下操作

  • 分区名格式统一为主表名+yyyy_mm_dd
  • 按日创建分区,用to_regclass函数检查分区表是否存在
  • 创建过去N天及未来M天的分区,确保当前可获取的时间戳内的数据都能够入库
  • 为分区创建索引
  • 执行数据迁移,从老表中select出过去N天内数据,插入到新分区表中
  • 同步自增的消息ID主键值

如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*创建分区并添加索引*/
create table 
  device_uplink_v2_2019_06_01 
partition of 
  device_uplink_v2 
for values from 
  ('2019-06-01 00:00:00') 
to 
  ('2019-06-02 00:00:00');
create index xxxxxxxxxx on device_uplink_v2_2019_06_01 (xxxxx); 
...
create table 
  device_uplink_v2_2019_07_03
partition of 
  device_uplink_v2 
for values from 
  ('2019-07-03 00:00:00') 
to 
  ('2019-07-04 00:00:00');
create index xxxxxxxxxx on device_uplink_v2_2019_07_03 (xxxxx);
/*迁移数据*/
insert into 
  device_uplink_v2 
  ('xxxxxxxxxxxxxxxx',...,'xxxx-xx-xx xx:xx:xx') 
select 
  ('xxxxxxxxxxxxxxxx',...,'xxxx-xx-xx xx:xx:xx') 
from 
  device_uplink 
where 
  createtime > '2019-06-01 00:00:00';
/*同步消息ID*/
select setval('device_uplink_v2_id_seq', (select max(id) from 'device_uplink_id_seq')+1);

(3)启动定时任务

到这一个步骤时,我们已经有了分区主表,过去N天及未来M天的分区,因此需要执行以下操作

  • 使用to_regclass函数检查未来M天的分区表是否存在,若不存在则创建分区、添加索引
  • 启动定时任务,在一个事务中
    • 删除过去N+P天至N天的分区表
    • 检查未来M天的分区是否存在,若不存在则创建分区、添加索引

上述的逻辑确保即使数据迁移阶段失败,也不会影响从当前时间戳开始的后续事务,分区表将会持续地被删除和创建,即使全球各地的时区不同,未来M天的分区表也足以覆盖这些时区。潜在的风险是:

  • 若删除过去第N+P天至第N天失败,这些数据会一直残留在历史数据中
  • 若创建未来M天的分区持续失败,则对应时间段的数据会一直无法入库

至此完成历史数据的自动维护功能,但为了更好应对数据膨胀的问题,最后还是为所有历史数据入库操作添加了开关,在无需存储历史数据的场景下,直接关闭数据入库。

5. 历史数据与实时消息推送

之前提到,所有数据首先入库,然后将一部分简化数据通过websocket推送到前端,其中

  • 消息ID可用于从简化数据获取对应的完整数据
  • 设备ID可用于权限验证
  • 创建时间可用于过滤数据

在引入按创建时间过滤数据时,依旧保持只用消息ID和设备ID命中历史数据的逻辑,如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
select 
  xxx,
  xxx,
  xxx 
from 
  device_uplink 
where 
  dev_eui = 'xxxxxxxxxxxxxxxx' 
and
  id = xxxx;

引入按创建时间执行数据分区后,如果不确定时间参数,就会执行全分区扫描。

我们知道PostgreSQL数据库的时间戳精度为微秒,前后端传参的时间戳精度为毫秒,则一条数据库中的历史数据的创建时间戳,必定落在前后端传参用的毫秒时间戳毫秒其实和结束之间,于是可以做以下优化:

  • 假定有一条简化消息,创建时间为2019-07-01 00:00:00.222对应的毫秒级Unix时间戳
  • 则必定存在一条完整消息,其创建时间介于2019-07-01 00:00:00.2222019-07-01 00:00:00.223

于是查询语句可修改如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
select 
  xxx,
  xxx,
  xxx 
from 
  device_uplink_v2 
where 
  dev_eui = 'xxxxxxxxxxxxxxxx' 
and
  id = xxxx
and 
  createtime between '2019-07-01 00:00:00.222' and '2019-07-01 00:00:00.223';

上述语句利用创建时间首先命中分区device_uplink_v2_2019_07_01,然后根据消息ID及设备ID筛选出目标数据。

6. 历史数据与下行队列

在开发LoRaWAN协议栈过程中,遇到一个比较难处理的业务是应用层下行数据的追踪。

  • A类设备可能间隔漫长的时间才会上行数据,触发下行数据捎带
  • B/C类设备需要尽可能利用下行窗口执行主动轮询下发

因此至少需要两个数据表,一个存储完整的历史数据,另一个存储可增删的节点下行数据队列,前者数据量远大于后者。

在重构代码前,只能利用fCnt(帧计数器)和设备ID两个字段来关联这两个处于不同数据库的表,反向寻找最近一条匹配的记录,而引入数据分区后,原本在索引上的扫描由于缺少时间字段,变成全分区扫描。

有了上一节中的优化心得,这次直接使用微秒级精度的时间戳来关联两个表,下行队列中的记录增加两个历史数据表的索引字段:消息ID创建时间,形成如下的工作机制:

  • 用户下发数据时,在一个事务中
    • 创建历史数据表记录,获取消消息ID创建时间
    • 创建下行队列表记录,保存对应的消息ID创建时间
  • 数据从服务器下发至网关时,利用消息ID创建时间命中历史数据表记录,更新历史数据的已发送字段
  • 节点响应确认下行时,利用消息ID创建时间命中历史数据表记录,更新历史数据的已确认字段
  • 关闭历史数据入库时,由于不存在消息ID创建时间,数据下发及节点回复确认下行时,不执行更新操作