|
@@ -0,0 +1,384 @@
|
|
|
|
+
|
|
|
|
+package com.anjiplus.template.gaea.business.modules.data.dataSource.service.impl;
|
|
|
|
+
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
+import com.anji.plus.gaea.constant.Enabled;
|
|
|
|
+import com.anji.plus.gaea.curd.mapper.GaeaBaseMapper;
|
|
|
|
+import com.anji.plus.gaea.exception.BusinessExceptionBuilder;
|
|
|
|
+import com.anji.plus.gaea.utils.GaeaAssert;
|
|
|
|
+import com.anjiplus.template.gaea.business.code.ResponseCode;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSet.controller.dto.DataSetDto;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSetParam.service.DataSetParamService;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.controller.dto.DataSourceDto;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.controller.param.ConnectionParam;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.dao.DataSourceMapper;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.dao.entity.DataSource;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.pool.constant.JdbcConstants;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.pool.util.JdbcUtil;
|
|
|
|
+import com.anjiplus.template.gaea.business.modules.data.dataSource.service.DataSourceService;
|
|
|
|
+import com.anjiplus.template.gaea.common.RespCommonCode;
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.http.HttpEntity;
|
|
|
|
+import org.springframework.http.HttpHeaders;
|
|
|
|
+import org.springframework.http.HttpMethod;
|
|
|
|
+import org.springframework.http.ResponseEntity;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+import org.springframework.web.client.RestClientException;
|
|
|
|
+import org.springframework.web.client.RestTemplate;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.sql.Connection;
|
|
|
|
+import java.sql.PreparedStatement;
|
|
|
|
+import java.sql.ResultSet;
|
|
|
|
+import java.sql.SQLException;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * @author Raod
|
|
|
|
+ * @desc DataSource 数据集服务实现
|
|
|
|
+ * @date 2021-03-18 12:09:57.728203200
|
|
|
|
+ **/
|
|
|
|
+@Service
|
|
|
|
+@Slf4j
|
|
|
|
+public class DataSourceServiceImpl implements DataSourceService {
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private DataSourceMapper dataSourceMapper;
|
|
|
|
+
|
|
|
|
+ @Resource(name = "dataSourceRestTemplate")
|
|
|
|
+ private RestTemplate restTemplate;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private DataSetParamService dataSetParamService;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public GaeaBaseMapper<DataSource> getMapper() {
|
|
|
|
+ return dataSourceMapper;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取所有数据源
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public List<DataSource> queryAllDataSource() {
|
|
|
|
+ LambdaQueryWrapper<DataSource> wrapper = Wrappers.lambdaQuery();
|
|
|
|
+ wrapper.select(DataSource::getSourceCode, DataSource::getSourceName)
|
|
|
|
+ .eq(DataSource::getEnableFlag, Enabled.YES.getValue());
|
|
|
|
+ return dataSourceMapper.selectList(wrapper);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 测试 连接
|
|
|
|
+ *
|
|
|
|
+ * @param connectionParam
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean testConnection(ConnectionParam connectionParam) {
|
|
|
|
+ String sourceType = connectionParam.getSourceType();
|
|
|
|
+ String sourceConfig = connectionParam.getSourceConfig();
|
|
|
|
+ DataSourceDto dto = new DataSourceDto();
|
|
|
|
+ dto.setSourceConfig(sourceConfig);
|
|
|
|
+ switch (sourceType) {
|
|
|
|
+ case JdbcConstants.ELASTIC_SEARCH_SQL:
|
|
|
|
+ testElasticsearchSqlConnection(dto);
|
|
|
|
+ break;
|
|
|
|
+ case JdbcConstants.MYSQL:
|
|
|
|
+ case JdbcConstants.KUDU_IMAPLA:
|
|
|
|
+ testRelationalDb(dto);
|
|
|
|
+ break;
|
|
|
|
+ case JdbcConstants.HTTP:
|
|
|
|
+ testHttp(dto);
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_TYPE_DOES_NOT_MATCH_TEMPORARILY);
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public List<JSONObject> execute(DataSourceDto dto) {
|
|
|
|
+ String sourceType = dto.getSourceType();
|
|
|
|
+ switch (sourceType) {
|
|
|
|
+ case JdbcConstants.ELASTIC_SEARCH_SQL:
|
|
|
|
+ return executeElasticsearchSql(dto);
|
|
|
|
+ case JdbcConstants.MYSQL:
|
|
|
|
+ case JdbcConstants.KUDU_IMAPLA:
|
|
|
|
+ return executeRelationalDb(dto);
|
|
|
|
+ case JdbcConstants.HTTP:
|
|
|
|
+ return executeHttp(dto);
|
|
|
|
+ default:
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_TYPE_DOES_NOT_MATCH_TEMPORARILY);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 执行sql,统计数据total
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public long total(DataSourceDto sourceDto, DataSetDto dto) {
|
|
|
|
+ //区分数据类型
|
|
|
|
+ String sourceType = sourceDto.getSourceType();
|
|
|
|
+ switch (sourceType) {
|
|
|
|
+ case JdbcConstants.ELASTIC_SEARCH_SQL:
|
|
|
|
+ return 0;
|
|
|
|
+ case JdbcConstants.MYSQL:
|
|
|
|
+ return mysqlTotal(sourceDto, dto);
|
|
|
|
+ default:
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_TYPE_DOES_NOT_MATCH_TEMPORARILY);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取mysql count 和添加limit分页信息
|
|
|
|
+ * @param sourceDto
|
|
|
|
+ * @param dto
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public long mysqlTotal(DataSourceDto sourceDto, DataSetDto dto){
|
|
|
|
+ String dynSentence = sourceDto.getDynSentence();
|
|
|
|
+ String sql = "select count(1) as count from (" + dynSentence + ") as gaeaExecute";
|
|
|
|
+ sourceDto.setDynSentence(sql);
|
|
|
|
+ List<JSONObject> result = execute(sourceDto);
|
|
|
|
+
|
|
|
|
+ //sql 拼接 limit 分页信息
|
|
|
|
+ int pageNumber = Integer.parseInt(dto.getContextData().getOrDefault("pageNumber", "1").toString());
|
|
|
|
+ int pageSize = Integer.parseInt(dto.getContextData().getOrDefault("pageSize", "10").toString());
|
|
|
|
+ String sqlLimit = " limit " + (pageNumber - 1) * pageSize + "," + pageSize;
|
|
|
|
+ sourceDto.setDynSentence(dynSentence.concat(sqlLimit));
|
|
|
|
+ log.info("当前total:{}, 添加分页参数,sql语句:{}", JSONObject.toJSONString(result), sourceDto.getDynSentence());
|
|
|
|
+ return result.get(0).getLongValue("count");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public List<JSONObject> executeElasticsearchSql(DataSourceDto dto) {
|
|
|
|
+ analysisHttpConfig(dto);
|
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
|
+ headers.setAll(JSONObject.parseObject(dto.getHeader(), Map.class));
|
|
|
|
+ HttpEntity<String> entity = new HttpEntity<>(dto.getDynSentence(), headers);
|
|
|
|
+ ResponseEntity<JSONObject> exchange;
|
|
|
|
+ try {
|
|
|
|
+ exchange = restTemplate.exchange(dto.getApiUrl(), HttpMethod.valueOf(dto.getMethod()), entity, JSONObject.class);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ if (exchange.getStatusCode().isError()) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, exchange.getBody());
|
|
|
|
+ }
|
|
|
|
+ List<JSONObject> result;
|
|
|
|
+ try {
|
|
|
|
+ JSONObject body = exchange.getBody();
|
|
|
|
+ //解析es sql数据
|
|
|
|
+ if (null == body) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ JSONArray columns = body.getJSONArray("columns");
|
|
|
|
+ JSONArray rows = body.getJSONArray("rows");
|
|
|
|
+ result = new ArrayList<>();
|
|
|
|
+ for (int i = 0; i < rows.size(); i++) {
|
|
|
|
+ JSONArray row = rows.getJSONArray(i);
|
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
|
+ for (int j = 0; j < row.size(); j++) {
|
|
|
|
+ String name = columns.getJSONObject(j).getString("name");
|
|
|
|
+ String value = row.getString(j);
|
|
|
|
+ jsonObject.put(name, value);
|
|
|
|
+ }
|
|
|
|
+ result.add(jsonObject);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.ANALYSIS_DATA_ERROR, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public List<JSONObject> executeRelationalDb(DataSourceDto dto) {
|
|
|
|
+ analysisRelationalDbConfig(dto);
|
|
|
|
+ Connection pooledConnection = null;
|
|
|
|
+ try {
|
|
|
|
+ pooledConnection = JdbcUtil.getPooledConnection(dto);
|
|
|
|
+
|
|
|
|
+ PreparedStatement statement = pooledConnection.prepareStatement(dto.getDynSentence());
|
|
|
|
+ ResultSet rs = statement.executeQuery();
|
|
|
|
+
|
|
|
|
+ int columnCount = rs.getMetaData().getColumnCount();
|
|
|
|
+
|
|
|
|
+ List<String> columns = new ArrayList<>();
|
|
|
|
+ for (int i = 1; i <= columnCount; i++) {
|
|
|
|
+ String columnName = rs.getMetaData().getColumnLabel(i);
|
|
|
|
+ columns.add(columnName);
|
|
|
|
+ }
|
|
|
|
+ List<JSONObject> list = new ArrayList<>();
|
|
|
|
+ while (rs.next()) {
|
|
|
|
+ JSONObject jo = new JSONObject();
|
|
|
|
+ columns.forEach(t -> {
|
|
|
|
+ try {
|
|
|
|
+ Object value = rs.getObject(t);
|
|
|
|
+ jo.put(t, value);
|
|
|
|
+ } catch (SQLException throwable) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.EXECUTE_SQL_ERROR, throwable.getMessage());
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ list.add(jo);
|
|
|
|
+ }
|
|
|
|
+ return list;
|
|
|
|
+ } catch (Exception throwable) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.EXECUTE_SQL_ERROR, throwable.getMessage());
|
|
|
|
+ } finally {
|
|
|
|
+ try {
|
|
|
|
+ pooledConnection.close();
|
|
|
|
+ } catch (SQLException throwable) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, throwable.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * http 执行获取数据
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ */
|
|
|
|
+ public List<JSONObject> executeHttp(DataSourceDto dto) {
|
|
|
|
+ analysisHttpConfig(dto);
|
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
|
+ headers.setAll(JSONObject.parseObject(dto.getHeader(), Map.class));
|
|
|
|
+ HttpEntity<String> entity = new HttpEntity<>(dto.getDynSentence(), headers);
|
|
|
|
+ ResponseEntity<JSONObject> exchange;
|
|
|
|
+ try {
|
|
|
|
+ exchange = restTemplate.exchange(dto.getApiUrl(), HttpMethod.valueOf(dto.getMethod()), entity, JSONObject.class);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ if (exchange.getStatusCode().isError()) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, exchange.getBody());
|
|
|
|
+ }
|
|
|
|
+ JSONObject body = exchange.getBody();
|
|
|
|
+ List<JSONObject> result = new ArrayList<>();
|
|
|
|
+ result.add(body);
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 关系型数据库 测试连接
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ */
|
|
|
|
+ public void testRelationalDb(DataSourceDto dto) {
|
|
|
|
+ analysisRelationalDbConfig(dto);
|
|
|
|
+ try {
|
|
|
|
+ Connection unPooledConnection = JdbcUtil.getUnPooledConnection(dto);
|
|
|
|
+ String catalog = unPooledConnection.getCatalog();
|
|
|
|
+ log.info("数据库测试连接成功:{}", catalog);
|
|
|
|
+ unPooledConnection.close();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * http 测试连接
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ */
|
|
|
|
+ public void testHttp(DataSourceDto dto) {
|
|
|
|
+ analysisHttpConfig(dto);
|
|
|
|
+ String apiUrl = dto.getApiUrl();
|
|
|
|
+ String method = dto.getMethod();
|
|
|
|
+ String body = dto.getBody();
|
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
|
+ headers.setAll(JSONObject.parseObject(dto.getHeader(), Map.class));
|
|
|
|
+ HttpEntity<String> entity = new HttpEntity<>(body, headers);
|
|
|
|
+ ResponseEntity<Object> exchange;
|
|
|
|
+ try {
|
|
|
|
+ exchange = restTemplate.exchange(apiUrl, HttpMethod.valueOf(method), entity, Object.class);
|
|
|
|
+ if (exchange.getStatusCode().isError()) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, exchange.getBody());
|
|
|
|
+ }
|
|
|
|
+ } catch (RestClientException e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 关系型数据库 测试连接
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ */
|
|
|
|
+ public void testElasticsearchSqlConnection(DataSourceDto dto) {
|
|
|
|
+ analysisHttpConfig(dto);
|
|
|
|
+ String apiUrl = dto.getApiUrl();
|
|
|
|
+ String method = dto.getMethod();
|
|
|
|
+ String body = dto.getBody();
|
|
|
|
+ HttpHeaders headers = new HttpHeaders();
|
|
|
|
+ headers.setAll(JSONObject.parseObject(dto.getHeader(), Map.class));
|
|
|
|
+ HttpEntity<String> entity = new HttpEntity<>(body, headers);
|
|
|
|
+ ResponseEntity<Object> exchange;
|
|
|
|
+ try {
|
|
|
|
+ exchange = restTemplate.exchange(apiUrl, HttpMethod.valueOf(method), entity, Object.class);
|
|
|
|
+ if (exchange.getStatusCode().isError()) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, exchange.getBody());
|
|
|
|
+ }
|
|
|
|
+ } catch (RestClientException e) {
|
|
|
|
+ throw BusinessExceptionBuilder.build(RespCommonCode.DATA_SOURCE_CONNECTION_FAILED, e.getMessage());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public void analysisRelationalDbConfig(DataSourceDto dto) {
|
|
|
|
+ JSONObject json = JSONObject.parseObject(dto.getSourceConfig());
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("jdbcUrl"), ResponseCode.PARAM_IS_NULL,"jdbcUrl not empty");
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("driverName"), ResponseCode.PARAM_IS_NULL,"driverName not empty");
|
|
|
|
+ String jdbcUrl = json.getString("jdbcUrl");
|
|
|
|
+ String username = json.getString("username");
|
|
|
|
+ String password = json.getString("password");
|
|
|
|
+ String driverName = json.getString("driverName");
|
|
|
|
+ dto.setJdbcUrl(jdbcUrl);
|
|
|
|
+ dto.setDriverName(driverName);
|
|
|
|
+ dto.setUsername(username);
|
|
|
|
+ dto.setPassword(password);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * es通过api获取数据
|
|
|
|
+ *
|
|
|
|
+ * @param dto
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public void analysisHttpConfig(DataSourceDto dto) {
|
|
|
|
+ JSONObject json = JSONObject.parseObject(dto.getSourceConfig());
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("apiUrl"), ResponseCode.PARAM_IS_NULL,"apiUrl not empty");
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("method"), ResponseCode.PARAM_IS_NULL,"method not empty");
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("header"), ResponseCode.PARAM_IS_NULL,"header not empty");
|
|
|
|
+ GaeaAssert.isFalse(json.containsKey("body"), ResponseCode.PARAM_IS_NULL,"body not empty");
|
|
|
|
+ String apiUrl = json.getString("apiUrl");
|
|
|
|
+ String method = json.getString("method");
|
|
|
|
+ String header = json.getString("header");
|
|
|
|
+ String body = json.getString("body");
|
|
|
|
+ //解决url中存在的动态参数
|
|
|
|
+ apiUrl = dataSetParamService.transform(dto.getContextData(), apiUrl);
|
|
|
|
+ //请求头中动态参数
|
|
|
|
+ header = dataSetParamService.transform(dto.getContextData(), header);
|
|
|
|
+ dto.setApiUrl(apiUrl);
|
|
|
|
+ dto.setMethod(method);
|
|
|
|
+ dto.setHeader(header);
|
|
|
|
+ dto.setBody(body);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+}
|