|
@@ -1,6 +1,8 @@
|
|
|
package cn.iocoder.yudao.module.iot.service.tdengine;
|
|
|
|
|
|
import cn.hutool.core.date.DateUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
|
|
|
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
|
|
|
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.IotDeviceStatusUpdateReqVO;
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
|
@@ -11,6 +13,9 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdTableDO;
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.thinkmodelfunction.IotThinkModelFunctionDO;
|
|
|
import cn.iocoder.yudao.module.iot.dal.redis.deviceData.DeviceDataRedisDAO;
|
|
|
+import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDDLMapper;
|
|
|
+import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDMLMapper;
|
|
|
+import cn.iocoder.yudao.module.iot.enums.IotConstants;
|
|
|
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStatusEnum;
|
|
|
import cn.iocoder.yudao.module.iot.enums.product.IotProductFunctionTypeEnum;
|
|
|
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
|
@@ -30,6 +35,14 @@ import java.util.stream.Collectors;
|
|
|
@Service
|
|
|
public class IotThingModelMessageServiceImpl implements IotThingModelMessageService {
|
|
|
|
|
|
+ private static final String TAG_NOTE = "TAG";
|
|
|
+ private static final String NOTE = "note";
|
|
|
+ private static final String TIME = "time";
|
|
|
+ private static final String DEVICE_KEY = "device_key";
|
|
|
+ private static final String DEVICE_NAME = "device_name";
|
|
|
+ private static final String PRODUCT_KEY = "product_key";
|
|
|
+ private static final String DEVICE_TYPE = "device_type";
|
|
|
+
|
|
|
@Value("${spring.datasource.dynamic.datasource.tdengine.url}")
|
|
|
private String url;
|
|
|
|
|
@@ -38,11 +51,9 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
@Resource
|
|
|
private IotDeviceService iotDeviceService;
|
|
|
@Resource
|
|
|
- private TdEngineTableService tdEngineTableService;
|
|
|
- @Resource
|
|
|
- private TdEngineSuperTableService tdEngineSuperTableService;
|
|
|
+ private TdEngineDDLMapper tdEngineDDLMapper;
|
|
|
@Resource
|
|
|
- private TdEngineDataWriterService tdEngineDataWriterService;
|
|
|
+ private TdEngineDMLMapper tdEngineDMLMapper;
|
|
|
|
|
|
@Resource
|
|
|
private DeviceDataRedisDAO deviceDataRedisDAO;
|
|
@@ -51,62 +62,64 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
@Override
|
|
|
@TenantIgnore
|
|
|
public void saveThingModelMessage(IotDeviceDO device, ThingModelMessage thingModelMessage) {
|
|
|
- // 判断设备状态,如果为未激活状态,创建数据表
|
|
|
+ // 1. 判断设备状态,如果为未激活状态,创建数据表并更新设备状态
|
|
|
if (IotDeviceStatusEnum.INACTIVE.getStatus().equals(device.getStatus())) {
|
|
|
- // 创建设备数据表
|
|
|
createDeviceTable(device.getDeviceType(), device.getProductKey(), device.getDeviceName(), device.getDeviceKey());
|
|
|
- // 更新设备状态
|
|
|
- // TODO @haohao:下面可以考虑,链式调用。iotDeviceService.updateDeviceStatus(new IotDeviceStatusUpdateReqVO().setid().setstatus())
|
|
|
- IotDeviceStatusUpdateReqVO updateReqVO = new IotDeviceStatusUpdateReqVO();
|
|
|
- updateReqVO.setId(device.getId());
|
|
|
- updateReqVO.setStatus(IotDeviceStatusEnum.ONLINE.getStatus());
|
|
|
- iotDeviceService.updateDeviceStatus(updateReqVO);
|
|
|
+ iotDeviceService.updateDeviceStatus(new IotDeviceStatusUpdateReqVO()
|
|
|
+ .setId(device.getId())
|
|
|
+ .setStatus(IotDeviceStatusEnum.ONLINE.getStatus()));
|
|
|
}
|
|
|
|
|
|
- // TODO @haohao:这个变量,可以和 “过滤并收集有效的属性字段” 那块,因为关联度高一点。
|
|
|
- // 获取设备属性
|
|
|
+ // 2. 获取设备属性并进行物模型校验,过滤非物模型属性
|
|
|
Map<String, Object> params = thingModelMessage.dataToMap();
|
|
|
+ List<IotThinkModelFunctionDO> functionList = getValidFunctionList(thingModelMessage.getProductKey());
|
|
|
+ if (functionList.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 3. 过滤并收集有效的属性字段,缓存设备属性
|
|
|
+ List<TdFieldDO> schemaFieldValues = filterAndCollectValidFields(params, functionList, device, thingModelMessage.getTime());
|
|
|
+ if (schemaFieldValues.size() == 1) { // 仅有时间字段,无需保存
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. 构建并保存设备属性数据
|
|
|
+ tdEngineDMLMapper.insertData(TdTableDO.builder()
|
|
|
+ .dataBaseName(getDatabaseName())
|
|
|
+ .tableName(getDeviceTableName(device.getProductKey(), device.getDeviceName()))
|
|
|
+ .columns(schemaFieldValues)
|
|
|
+ .build());
|
|
|
+ }
|
|
|
|
|
|
- // 物模型校验,过滤非物模型属性
|
|
|
- List<IotThinkModelFunctionDO> functionList = iotThinkModelFunctionService
|
|
|
- .getThinkModelFunctionListByProductKey(thingModelMessage.getProductKey())
|
|
|
+ private List<IotThinkModelFunctionDO> getValidFunctionList(String productKey) {
|
|
|
+ return iotThinkModelFunctionService
|
|
|
+ .getThinkModelFunctionListByProductKey(productKey)
|
|
|
.stream()
|
|
|
.filter(function -> IotProductFunctionTypeEnum.PROPERTY.getType().equals(function.getType()))
|
|
|
.toList();
|
|
|
- if (functionList.isEmpty()) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
- // 获取属性标识符集合
|
|
|
- // TODO @haohao:这个变量,可以和 “过滤并收集有效的属性字段” 那块,因为关联度高一点。另外,可以使用 CollectionUtils。convertSet
|
|
|
- Set<String> propertyIdentifiers = functionList.stream()
|
|
|
- .map(IotThinkModelFunctionDO::getIdentifier)
|
|
|
- .collect(Collectors.toSet());
|
|
|
+ private List<TdFieldDO> filterAndCollectValidFields(Map<String, Object> params, List<IotThinkModelFunctionDO> functionList, IotDeviceDO device, Long time) {
|
|
|
+ // 1. 获取属性标识符集合
|
|
|
+ Set<String> propertyIdentifiers = CollectionUtils.convertSet(functionList, IotThinkModelFunctionDO::getIdentifier);
|
|
|
|
|
|
+ // 2. 构建属性标识符和属性的映射
|
|
|
Map<String, IotThinkModelFunctionDO> functionMap = functionList.stream()
|
|
|
.collect(Collectors.toMap(IotThinkModelFunctionDO::getIdentifier, function -> function));
|
|
|
|
|
|
- // 过滤并收集有效的属性字段
|
|
|
+ // 3. 过滤并收集有效的属性字段
|
|
|
List<TdFieldDO> schemaFieldValues = new ArrayList<>();
|
|
|
- schemaFieldValues.add(new TdFieldDO("time", thingModelMessage.getTime()));
|
|
|
+ schemaFieldValues.add(new TdFieldDO(TIME, time));
|
|
|
params.forEach((key, val) -> {
|
|
|
if (propertyIdentifiers.contains(key)) {
|
|
|
schemaFieldValues.add(new TdFieldDO(key.toLowerCase(), val));
|
|
|
// 缓存设备属性
|
|
|
// TODO @haohao:这个缓存的写入,可以使用的时候 cache 么?被动读
|
|
|
- setDeviceDataCache(device, functionMap.get(key), val, thingModelMessage.getTime());
|
|
|
+ setDeviceDataCache(device, functionMap.get(key), val, time);
|
|
|
}
|
|
|
});
|
|
|
- // TODO @haohao:疑问,为什么 1 不继续哈?
|
|
|
- if (schemaFieldValues.size() == 1) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // 构建并保存设备属性数据
|
|
|
- tdEngineDataWriterService.insertData(TdTableDO.builder().build()
|
|
|
- .setDataBaseName(getDatabaseName())
|
|
|
- .setTableName(getDeviceTableName(device.getProductKey(), device.getDeviceName()))
|
|
|
- .setColumns(schemaFieldValues));
|
|
|
+ return schemaFieldValues;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -132,7 +145,6 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
deviceDataRedisDAO.set(deviceData);
|
|
|
}
|
|
|
|
|
|
- // TODO @haohao:实现没问题哈。这个方法的空行有点多,逻辑分块上没这么明显。看看能不能改下。
|
|
|
/**
|
|
|
* 创建设备数据表
|
|
|
*
|
|
@@ -142,36 +154,37 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
* @param deviceKey 设备 Key
|
|
|
*/
|
|
|
private void createDeviceTable(Integer deviceType, String productKey, String deviceName, String deviceKey) {
|
|
|
+ // 1. 获取超级表名和数据库名
|
|
|
String superTableName = getProductPropertySTableName(deviceType, productKey);
|
|
|
String dataBaseName = getDatabaseName();
|
|
|
|
|
|
- List<Map<String, Object>> maps = tdEngineSuperTableService.describeSuperTable(new TdTableDO(dataBaseName, superTableName));
|
|
|
+ // 2. 获取超级表的结构信息
|
|
|
+ List<Map<String, Object>> maps = tdEngineDDLMapper.describeSuperTable(new TdTableDO(dataBaseName, superTableName));
|
|
|
List<TdFieldDO> tagsFieldValues = new ArrayList<>();
|
|
|
-
|
|
|
if (maps != null) {
|
|
|
- // TODO @haohao:一些字符串,是不是可以枚举起来哈。
|
|
|
- // TODO @haohao:这种过滤的,常用的,可以考虑用 CollectionUtils.filterList。一些常用的 stream 操作,适合封装哈
|
|
|
- List<Map<String, Object>> taggedNotesList = maps.stream()
|
|
|
- .filter(map -> "TAG".equals(map.get("note")))
|
|
|
- .toList();
|
|
|
+ // 2.1 过滤出 TAG 类型的字段
|
|
|
+ List<Map<String, Object>> taggedNotesList = CollectionUtils.filterList(maps, map -> TAG_NOTE.equals(map.get(NOTE)));
|
|
|
+
|
|
|
|
|
|
+ // 2.2 解析字段信息
|
|
|
tagsFieldValues = FieldParser.parse(taggedNotesList.stream()
|
|
|
.map(map -> List.of(map.get("field"), map.get("type"), map.get("length")))
|
|
|
.collect(Collectors.toList()));
|
|
|
|
|
|
+ // 2.3 设置 TAG 字段的值
|
|
|
for (TdFieldDO tagsFieldValue : tagsFieldValues) {
|
|
|
switch (tagsFieldValue.getFieldName()) {
|
|
|
- case "product_key" -> tagsFieldValue.setFieldValue(productKey);
|
|
|
- case "device_key" -> tagsFieldValue.setFieldValue(deviceKey);
|
|
|
- case "device_name" -> tagsFieldValue.setFieldValue(deviceName);
|
|
|
- case "device_type" -> tagsFieldValue.setFieldValue(deviceType);
|
|
|
+ case PRODUCT_KEY -> tagsFieldValue.setFieldValue(productKey);
|
|
|
+ case DEVICE_KEY -> tagsFieldValue.setFieldValue(deviceKey);
|
|
|
+ case DEVICE_NAME -> tagsFieldValue.setFieldValue(deviceName);
|
|
|
+ case DEVICE_TYPE -> tagsFieldValue.setFieldValue(deviceType);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 创建设备数据表
|
|
|
+ // 3. 创建设备数据表
|
|
|
String tableName = getDeviceTableName(productKey, deviceName);
|
|
|
- tdEngineTableService.createTable(TdTableDO.builder().build()
|
|
|
+ tdEngineDDLMapper.createTable(TdTableDO.builder().build()
|
|
|
.setDataBaseName(dataBaseName)
|
|
|
.setSuperTableName(superTableName)
|
|
|
.setTableName(tableName)
|
|
@@ -184,8 +197,7 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
* @return 数据库名称
|
|
|
*/
|
|
|
private String getDatabaseName() {
|
|
|
- // TODO @haohao:可以使用 StrUtil.subAftetLast 这种方法
|
|
|
- return url.substring(url.lastIndexOf("/") + 1);
|
|
|
+ return StrUtil.subAfter(url, "/", true);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -198,9 +210,9 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
private static String getProductPropertySTableName(Integer deviceType, String productKey) {
|
|
|
// TODO @haohao:枚举下,会好点哈。
|
|
|
return switch (deviceType) {
|
|
|
- case 1 -> String.format("gateway_sub_%s", productKey).toLowerCase();
|
|
|
- case 2 -> String.format("gateway_%s", productKey).toLowerCase();
|
|
|
- default -> String.format("device_%s", productKey).toLowerCase();
|
|
|
+ case 1 -> String.format(IotConstants.GATEWAY_SUB_STABLE_NAME_FORMAT, productKey).toLowerCase();
|
|
|
+ case 2 -> String.format(IotConstants.GATEWAY_STABLE_NAME_FORMAT, productKey).toLowerCase();
|
|
|
+ default -> String.format(IotConstants.DEVICE_STABLE_NAME_FORMAT, productKey).toLowerCase();
|
|
|
};
|
|
|
}
|
|
|
|
|
@@ -212,7 +224,7 @@ public class IotThingModelMessageServiceImpl implements IotThingModelMessageServ
|
|
|
* @return 设备表名
|
|
|
*/
|
|
|
private static String getDeviceTableName(String productKey, String deviceName) {
|
|
|
- return String.format("device_%s_%s", productKey.toLowerCase(), deviceName.toLowerCase());
|
|
|
+ return String.format(IotConstants.DEVICE_TABLE_NAME_FORMAT, productKey.toLowerCase(), deviceName.toLowerCase());
|
|
|
}
|
|
|
|
|
|
}
|