设备属性数据存储
设备属性数据存储指的是设备上报数据的存储,Matrix云平台会为每一个产品创建一个设备属性存储数据集,开发者可以使用UDS JAVA API写入和读取设备属性数据。
基础概念
术语
名字 | 中文描述 | 语义 |
---|---|---|
property | 设备属性 | 设备上报数据 |
deviceId | 设备逻辑ID | 设备在平台的唯一标识 |
timestamp | 时间戳 | 设备上报数据的时间戳,是相对于1970年的毫秒数 |
全量上报 | 全量上报 | 每次上报的属性数据都包含所有预定义属性 |
差量上报 | 差量上报 | 每次上报的属性数据不一定包含所有预定义属性 |
status data | 当前状态数据 | 设备各个属性的最新数据 |
row | 数据行 | 代表一次上报数据 |
column | 数据列 | 代表一个属性 |
filter | 过滤条件 | 根据过滤条件查询数据 |
select | 选择属性 | 选择要查询的属性 |
publish | 发布/推送属性数据 | 如果APP订阅了设备属性,则会收到这条数据 |
null | 空 | 表示这个属性在这次上报中没有数据 |
ACContext | 上下文标识 |
数据模型
deviceId | timestamp | key_1 | key_2 | key_3 |
---|---|---|---|---|
device_1 | t1 | value_1 | value_2 | value_3 |
device_1 | t2 | value_1 | value_2 | value_3 |
device_1 | t3 | value_1 | value_2 | value_3 |
属性类型
类型 | 描述 |
---|---|
整型 | byte,short,int,long |
浮点型 | float,double |
字符串 | String |
布尔型 | Boolean |
接口
名字 | 描述 |
---|---|
create | 写入设备属性数据 |
find | 查找单行设备属性数据 |
scan | 查找多行设备属性数据 |
statistics | 简单的分区间数据统计,可以用于历史数据做图 |
export | 数据导出 |
publish | 发布设备属性数据 |
基础数据结构
ACContext
ACContext 包含了用户的MajorDomain, SubDomain, DeveloperId, TraceId, 时间戳, 签名等信息,每个请求都必须带有ACContext才能与云端交互。单个ACContext可以认为是逻辑上一系列请求的唯一标识。
ACFilter
ACFilter用于过滤结果集中的数据,当前支持:
查询历史数据仅支持: EQUAL, GREATER, GREATER_OR_EQUAL, LESS, LESS_OR_EQUAL, AND。 查询状态数据支持所有类型的ACFilter。
名字 | 数学表示 | 描述 |
---|---|---|
EQUAL | == | 等于 |
NOT_EQUAL | != | 不等于 |
GREATER | > | 大于 |
GREATER_OR_EQUAL | >= | 大于或等于 |
LESS | < | 小于 |
LESS_OR_EQUAL | <= | 小于或等于 |
LIKE | like | 字符串模糊匹配 |
NOT_LIKE | not like | 字符串模糊匹配 |
BINARY_LIKE | binary like | 区分大小写的字符串模糊匹配 |
BINARY_NOT_LIKE | binary not like | 不区分大小写的字符串模糊匹配 |
IN | in | 基于列表进行查找 |
NOT_IN | not int | 基于列表进行查找 |
AND | and | 与(与的优化级高于或) |
OR | or | 或 |
使用实例
// 实例1: 创建一个filter(key1>0 and key1<10)
ACFilter f1 = ac.filter().whereGreaterThan("key1", 0).andLessThan("key1", 10);
// 实例2: 创建一个filter(key1<=0 or key1>=10)
ACFilter f1 = ac.filter().whereLessThanOrEqualTo("key1", 0).orGreaterThanOrEqualTo("key1", 10);
// 实例3: 创建一个filter(key1以 "abcd" 为前缀, 不区分大小写)
ACFilter f1 = ac.filter().whereLike("key1", "abcd%");
// 实例4: 创建一个filter(key1以 "abcd" 为前缀, 不区分大小写)
ACFilter f1 = ac.filter().whereLike("key1", "abcd%");
// 实例5: 创建一个filter(key1以 "abcd" 为后缀, 不区分大小写)
ACFilter f1 = ac.filter().whereLike("key1", "%abcd");
// 实例6: 创建一个filter(key1包含子串 "abcd", 不区分大小写)
ACFilter f1 = ac.filter().whereLike("key1", "%abcd%");
// 实例7: 创建一个filter(key1 为 "v1" 或 "v2" 或 "v3"中的一个)
ACFilter f1 = ac.filter().whereIn("key1", String[]{"v1", "v2", "v3"});}
使用实例
以空气净化器为例来说明。
属性名 | 类型 | 描述 |
---|---|---|
pm25 | 整数 | pm2.5值 |
speed | 整数 | 当前风机转速 |
mode | 字符串 | 当前净化器状态(auto(自动), high(高速), medium(中速), low(低速)) |
数据写入
Create
标准用法
ac.dstore(ctx).create(设备ID,时间戳)
.put(key_1, value_1)
.put(key_2, value_2)
.put(key_3, value_3)
.execute();
使用实例
- 实例1: 写入一条数据 (设备ID: 1, 时间戳: 1469098960000, pm25值: 250, 风机转速: 40, 模式: "low")。
// 使用方式一
ac.dstore(ctx).create(1, 1469098960000L)
.put("pm25", 250)
.put("speed", 40)
.put("mode", "low")
.execute();
// 使用方式二
ac.dstore(ctx).create(1, 1469098960000L, "pm25", 250, "speed", 40, "mode", "low").execute();
// 使用方式三
ACObject obj = new ACObject();
obj.put("pm25", 250);
obj.put("speed", 40);
obj.put("mode", "low");
ac.dstore(ctx).create(1, 1469098960000L, obj).execute();
类比SQL
INSERT INTO `设备属性数据集` SET `设备ID`=1, `时间戳`=1469098960000, `pm25`=250, `speed`=40, `mode`='low';
- 实例2: 写入一条数据 (设备ID: 1, 时间戳: 1469098960000, pm25值: 250, 风机转速: 40,模式: "low"),并发布/推送。
ac.dstore(ctx).create(1, 1469098960000L)
.put("pm25", 250)
.put("speed", 40)
.put("mode", "low")
.execute(true);
- 实例3: 发布/推送一条数据(设备ID: 1, 时间戳: 1469098960000, pm25值: 250, 风机转速: 40, 模式: "low"), 不存储。
ac.dstore(ctx).create(1, 1469098960000L)
.put("pm25", 250)
.put("speed", 40)
.put("mode", "low")
.publish();
- 实例4: 写入一条数据 (设备ID: 1, 时间戳: 取服务器当前时间, pm25值: 250, 风机转速: 40, 模式: "low")。
ac.dstore(ctx).create(1, 0)
.put("pm25", 250)
.put("speed", 40)
.put("mode", "low")
.execute();
数据查询
Scan
范围查找数据,可能返回0~n条,当前最多支持返回1000条数据。
标准用法
// 查询历史数据(不支持offset, 不支持or)
// 如果没有符合条件的数据,则返回一个空的数组
List<ACObject> results = ac.dstore(ctx).scanHistory(设备ID)
.select(key_1, key_2, ..., key_n)
.startTime(开闭区间, 开始时间)
.endTime(开闭区间,结束时间)
.where(filter)
.and(filter)
.count(key_1, ...)
.sum(key_1, ...)
.avg(key_1, ...)
.max(key_1, ...)
.min(key_1, ...)
.orderByTimeAsc()
.orderByTimeDesc()
.limit(limit)
.execute();
// 查询状态数据
// 如果没有符合条件的数据,则返回一个空的数组
List<ACObject> results = ac.dstore(ctx).scanStatus()
.select(key_1, key_2, ..., key_n)
.start(开闭区间, key_1, value_1, key_n, value_n)
.end(开闭区间,key_1, value_1, key_n, value_n)
.where(filter)
.and(filter)
.or(filter)
.count(key_1, ...)
.sum(key_1, ...)
.avg(key_1, ...)
.max(key_1, ...)
.min(key_1, ...)
.orderByAsc(key_1, ...)
.orderByDesc(key_1, ...)
.groupBy(key_1, ...)
.offset(offset)
.limit(limit)
.execute();
使用实例
- 实例1: 查询 (设备ID为1,时间范围在[1469098960000, 1469102560000]间的历史数据(包含pm25值和对应的时间戳),按时间倒序输出)。
List<ACObject> results = ac.dstore(ctx).scanHistory(1)
.select("pm25", ACDStore.TIMESTAMP)
.startTime(true, 1469098960000L)
.endTime(true, 1469102560000L)
.orderByTimeDesc()
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出方式一(如果是NULL,整数转化为0, 字符串转化为空串,浮点型转化为0f)
for (ACObject result:results) {
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25 = result.getLong("pm25");
System.out.Println(timestamp + ", " + pm25);
}
// 输出方式二(支持NULL)
for (ACObject result:results) {
Long timestamp = result.getNullLong(ACDStore.TIMESTAMP);
Long pm25 = result.getNullLong("pm25");
System.out.Println(timestamp + "," + pm25);
}
类比SQL
SELECT `时间戳`, `pm25` FROM `设备数据集` WHERE `设备ID`=1 AND (`时间戳`>=1469098960000 AND `时间戳`<=1469102560000) ORDER BY `时间戳` DESC;
- 实例2: 查询 (设备ID为1的最近20条历史数据(pm25和时间戳),并按时间正序输出)。
List<ACObject> results = ac.dstore(ctx).scanHistory(1)
.select("pm25", ACDStore.TIMESTAMP)
.endTime(true, 1469102560000L)
.orderByTimeAsc()
.limit(20)
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出
for (ACObject result:results) {
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25 = result.getLong("pm25");
System.out.Println(timestamp + ", " + pm25);
}
类比SQL
SELECT `时间戳`, `pm25` FROM `设备数据集` WHERE `设备ID`=1 AND `时间戳`<=1469102560000L ORDERY BY `时间戳` DESC LIMIT 20;
- 实例3: 查询 (设备ID为1, 时间范围在[1469098960000, 1469102560000]间,并且pm25>200的数据行数)。
ACFilter filter = ac.filter().WhereGreaterThan("pm25", 200);
List<ACObject> results = ac.dstore(ctx).scanHistory(1)
.startTime(1469098960000L)
.endTime(1469102560000L)
.where(filter)
.count()
.execute();
// 输出
long count = result.get(0).getLong("_count");
System.out.Println(count);
类比SQL
SELECT count(*) as `_count` FROM `设备数据集` WHERE `设备ID`=1 AND (`时间戳`>=1469098960000 AND `时间戳`<=1469102560000) AND `pm25`>200;
- 实例4: 查询 (设备ID为1,时间范围在[1469098960000, 1469102560000]间,pm25的平均值)。
List<ACObject> results = ac.dstore(ctx).scanHistory(1)
.startTime(1469098960000L)
.endTime(1469102560000L)
.avg("pm25")
.execute();
// 输出
long avg = result.get(0).getLong("_avg_pm25");
System.out.Println(avg);
类比SQL
SELECT AVG(`pm25`) as `_avg_pm25` FROM `设备数据集` WHERE `设备ID`=1 AND (`时间戳`>=1469098960000 AND `时间戳`<=1469102560000);
- 实例5: 查询最近1分钟有过上报,并且当前设备模式为"high"的所有设备和数据。
ACFilter filter = ac.filter().WhereEqualTo("mode", "high");
Long curTime = System.currentTimeMillis();
List<ACObject> results = ac.dstore(ctx).scanStatus()
.select(ACDStore.DEVICE_ID, ACDStore.TIMESTAMP, "pm25", "mode", "speed")
.startTime(curTime-60*1000)
.where(filter)
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出
for (ACObject result:results) {
long deviceId = result.getLong(ACDStore.DEVICE_ID);
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25 = result.getLong("pm25");
String mode = result.getString("mode");
long speed = result.getLong("speed");
System.out.Println(deviceId + ", " + timestamp + ", " + pm25 + ", " + mode + ", " + speed);
}
类比SQL
SELECT `设备ID`, `时间戳`, `pm25`, `mode`, `speed` FROM `设备数据集` WHERE `时间戳`>=当前时间-1分钟 AND `mode`='high';
Find
查找单条数据,历史数据必须指定设备ID和时间戳,状态数据只须指定设备ID。
标准用法
// 查询历史数据
// 如果数据不存在,则返回null
ACObject result = ac.dstore(ctx).find(设备ID,时间戳)
.select(key_1, key_2, ...)
.execute();
// 查询状态数据
ACObject result = ac.dstore(ctx).find(设备ID)
.select(key_, key_2, ...)
.execute();
使用实例
- 实例1: 查询设备ID为1, 时间戳为1469098960000的上报数据(pm25, mode)。
ACObject result = ac.dstore(ctx).findHistroy(1, 1469098960000L)
.select("pm25", "mode")
.execute();
// 如果数据不存在,则result==null
// 输出
if (result != null) {
long pm25 = result.getLong("pm25");
String mode = result.getString("mode");
System.out.Println(pm25 + ", " + mode);
}
类比SQL
SELECT `pm25`, `mode` FROM `设备数据集` WHERE `设备ID`=1 AND `时间戳`=1469098960000;
- 实例2: 查询设备ID为1的设备的当前状态。
ACObject result = ac.dstore(ctx).findStatus(1)
.select("pm25", "mode", "speed")
.execute();
// 如果数据不存在,则result==null
// 输出
if (result != null) {
long pm25 = result.getLong("pm25");
String mode = result.getString("mode");
long speed = result.getLong("speed");
System.out.Println(pm25 + ", " + mode + ", " + speed);
}
类比SQL
SELECT `pm25`, `mode`, `speed` FROM `设备数据集` WHERE `设备ID`=1;
数据统计
statistics
数据统计主要用于对一定时间范围内的数据进行数据统计,可以用于数据报表和基于历史数据做图。
标准用法
// 对历史数据做简单统计
List<ACObject> results = ac.dstore(ctx).statisticsHistory(设备ID)
.startAbsoluteTime(起始绝对时间)
.endAbsoluteTime(结束绝对时间)
.startRelativeTime(相对时间值, 相对时间单位)
.endRelativeTime(相对时间值, 相对时间单位)
.interval(时间间隔值, 时间间隔单位)
.timeZone(时区)
.addStatistic(属性1, 聚集函数1, 聚焦函数2, ...)
.addStatistic(属性2, 聚焦函数1)
.execute();
- 当某个时间区间没有数据时,默认填充0
- 相对时间单位支持: INTERVAL_SECONDS, INTERVAL_MINUTES, INTERVAL_HOURS, INTERVAL_DAYS, INTERVAL_WEEKS, INTERVAL_MONTHS
- 聚集函数支持: AGGR_AVG, AGGR_SUM, AGGR_COUNT, AGGR_MAX, AGGR_MIN, AGGR_FIRST, AGGR_LAST
使用实例
- 实例1: 查询 (设备ID为1, 时间范围在[1487300704000, 1487387104000](24小时)间的历史数据, 每1个小时对pm25算一个平均值,一个最大值,一个最小值
List<ACObject> results = ac.dstore(ctx).statisticsHistory(1)
.startAbsoluteTime(1487300704000L)
.endAbsoluteTime(1487387104000L)
.interval(1, ACDStore.INTERVAL_HOURS)
.addStatistic("pm25", ACDStore.AGGR_AVG, ACDStore.AGGR_MAX, ACDStore.AGGR_MIN)
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出
for (ACObject result:results) {
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25_avg = result.getLong("_avg_pm25");
long pm25_max = result.getLong("_max_pm25");
long pm25_min = result.getLong("_min_pm25");
System.out.Println(timestamp + ", " + pm25_avg + ", " + pm25_max + ", " + pm25_min);
}
- 实例2: 查询 (设备ID为1, 开始时间为1487300704000后的24小时内的历史数据,每1个小时对pm25求一个平均值,一个最大值,一个最小值
List<ACObject> results = ac.dstore(ctx).statisticsHistroy(1)
.startAbsoluteTime(1487300704000L)
.endRelative(1, ACDStore.INTERVAL_DAYS)
.interval(1, ACDStore.INTERVAL_HOURS)
.addStatistic("pm25", ACDStore.AGGR_AVG, ACDStore.AGGR_MAX, ACDStore.AGGR_MIN)
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出
for (ACObject result:results) {
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25_avg = result.getLong("_avg_pm25");
long pm25_max = result.getLong("_max_pm25");
long pm25_min = result.getLong("_min_pm25");
System.out.Println(timestamp + ", " + pm25_avg + ", " + pm25_max + ", " + pm25_min);
}
- 实例3: 查询 (设备ID为1, 结束时间为1487387104000前24小时的历史数据, 每一个小时对pm25求一个平均值,一个最大值,一个最小值)
List<ACObject> results = ac.dstore(ctx).statisticsHistory(1)
.startRelative(1, ACDStore.INTERVAL_DAYS)
.endAbsoluteTime(1487387104000L)
.interval(1, ACDStore.INTERVAL_HOURS)
.addStatistic("pm25", ACDStore.AGGR_AVG, ACDStore.AGGR_MAX, ACDStore.AGGR_MIN)
.execute();
// 如果没有符合条件的数据,则返回一个空的数组
// 输出
for (ACObject result:results) {
long timestamp = result.getLong(ACDStore.TIMESTAMP);
long pm25_avg = result.getLong("_avg_pm25");
long pm25_max = result.getLong("_max_pm25");
long pm25_min = result.getLong("_min_pm25");
System.out.Println(timestamp + ", " + pm25_avg + ", " + pm25_max + ", " + pm25_min);
}
数据导出
export
导出单个设备指定时间范围的历史数据或是导出所有设备的状态数据
标准用法
// 导出单个设备的历史数据
ACDStoreIter iter = ac.dstore(ctx).exportHistory(设备ID)
.startTime(开始时间)
.endTime(结束时间)
.execute();
// 导出所有设备的状态数据
ACDStoreIter iter = ac.dsotre(ctx).exportStatus()
.execute();
使用实例
- 导出ID为1的的设备,在[1487300704000, 1487387104000]范围内的历史数据.
ACDStoreIter iter = ac.dstore(ctx)
.exportHistory(1)
.startTime(1487300704000L)
.endTime(1487387104000L)
.execute();
while(true) {
ACObject item = iter.next();
if (item == null) {
break;
} else {
long timestamp = item.getLong(ACDStore.TIMESTAMP);
long pm25 = item.getLong("pm25");
long speed = item.getLong("speed");
String mode = item.getString("mode");
System.out.println(timestamp + ", " + pm25 + ", " + speed + ", " + mode);
}
}
- 导出所有设备的状态数据
ACDStoreIter iter = ac.dstore(ctx)
.exportStatus()
.execute();
while(true) {
ACObject item = item.next();
if (item == null) {
break;
} else {
long deviceId = item.getLong(ACDStore.DEVICE_ID);
long pm25 = item.getLong("pm25");
long speed = item.getLong("speed");
String mode = item.getString("mode");
System.out.println(deviceId + ", " + pm25 + ", " + speed + ", " + mode);
}
}