SpringBoot与Calcite整合:实现多数据源统一查询系统
为什么选择Apache Calcite?
简化开发流程
- 高层次抽象: Apache Calcite 提供高层次抽象,开发者可专注业务逻辑,无需处理底层数据库连接和查询执行细节。
- 减少重复工作: 使用 Calcite 避免重复造轮子,节省开发时间和成本。
强大的SQL解析和优化能力
- SQL标准支持: 支持多种 SQL 方言(如 MySQL、PostgreSQL 等),无缝处理不同数据库的 SQL 语句。
- 查询优化: 内置查询优化器根据数据源特性智能优化,提升查询性能。
灵活性和可扩展性
- 自定义模式和表: 通过编程动态添加和管理多数据源,支持不同模式和表结构。
- 插件机制: 支持多种插件,可灵活扩展功能,如自定义函数、聚合操作等。
高性能
- 内存计算: 支持内存数据处理,减少 I/O 开销,提升查询速度。
- 分布式计算: 可扩展到分布式环境,支持大规模数据集处理(本项目聚焦单机实现)。
集成性强
- 与其他工具集成: 支持与 Apache Flink、Presto 等大数据工具集成,构建完整数据分析解决方案。
哪些公司使用了Apache Calcite?
- Google: 用于高性能、灵活的数据处理系统。
- IBM: 增强数据仓库和分析解决方案的查询性能。
- Intel: 支持大数据分析工具,专注内存计算。
- Alibaba Cloud: 在大数据平台中提供查询优化和执行能力。
- MaxCompute (ODPS): 阿里巴巴大规模数据计算服务使用 Calcite 处理 SQL 查询。
- Elasticsearch: Kibana 的复杂查询依赖 Calcite 进行 SQL 解析和优化。
- Netflix: 构建数据虚拟化层,支持复杂查询和分析。
- Microsoft: 用于 Azure Synapse Analytics 等大数据产品。
- Teradata: 增强数据库系统查询优化和性能。
- Uber: 处理庞大数据集,支持复杂查询和分析。
应用场景
数据虚拟化
- 虚拟数据层: 集中分散数据,提供统一视图。
- 动态数据源管理: 动态添加和管理数据源,支持灵活架构。
商业智能 (BI) 工具
- 报表生成: 支持复杂报表生成和数据分析。
- 自助服务分析: 提供非技术人员数据探索功能。
机器学习与人工智能
- 特征工程: 用于特征提取和数据准备。
- 模型训练: 结合 AI 框架处理大规模数据集。
多数据源查询
- 统一接口: 通过单一接口查询 MySQL、PostgreSQL、Oracle 等数据库。
- 联合查询: 支持跨数据源复杂 SQL 查询。
大数据平台集成
- Hadoop 生态: 与 Hive、HBase、Druid 等结合,提供统一查询接口。
- 流处理与批处理: 支持 Apache Flink、Apache Beam 实现实时分析。
嵌入式数据库
- 轻量级 SQL 引擎: 适用于嵌入式应用和内存数据库。
- 内存计算: 加速查询性能,适合快速响应场景。
数据湖解决方案
- 统一元数据管理: 提供统一元数据管理和查询接口。
- 多样化数据格式: 支持 JSON、Parquet、ORC 等格式。
代码实操
Maven 依赖
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache Calcite Core -->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.32.0</version>
</dependency>
<!-- HikariCP Connection Pool -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- MySQL Connector Java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- PostgreSQL JDBC Driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件 (application.yml
)
spring:
datasource:
order-db:
url: jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
user-db:
url: jdbc:postgresql://localhost:5432/user_db
username: postgres
password: postgres
driver-class-name: org.postgresql.Driver
jpa:
show-sql: true
hibernate:
ddl-auto: update
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
数据源配置
package com.example.multids.config;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Bean(name = "mysqlDataSource")
public DataSource mysqlDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");
config.setUsername("root");
config.setPassword("root");
return new HikariDataSource(config);
}
@Bean(name = "postgresDataSource")
public DataSource postgresDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/user_db");
config.setUsername("postgres");
config.setPassword("postgres");
return new HikariDataSource(config);
}
}
自定义数据源工厂
package com.example.multids.factory;
import com.example.multids.schema.MySchemas;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DataSourceFactory {
public static CalciteConnection createConnection(DataSource mysqlDataSource, DataSource postgresDataSource) throws SQLException {
String modelJson = "{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"my_schemas\",\n" +
```markdown
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"my_schemas\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"" + ReflectiveSchema.Factory.class.getName() + "\",\n" +
" \"operand\": {\n" +
" \"class\": \"" + MySchemas.class.getName() + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + modelJson);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus schema = calciteConnection.getRootSchema().getSubSchema("my_schemas");
schema.add("orders", JdbcSchema.create(calciteConnection.getRootSchema(), "orders", mysqlDataSource, null, Lex.MYSQL));
schema.add("users", JdbcSchema.create(calciteConnection.getRootSchema(), "users", postgresDataSource, null, Lex.POSTGRESQL));
return calciteConnection;
}
}
自定义模式
package com.example.multids.schema;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.Map;
public class MySchemas extends AbstractSchema {
@Override
protected Map<String, org.apache.calcite.schema.Table> getTableMap() {
return super.getTableMap();
}
}
查询控制器
package com.example.multids.controller;
import com.example.multids.factory.DataSourceFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@RestController
public class QueryController {
private final DataSource mysqlDataSource;
private final DataSource postgresDataSource;
@Autowired
public QueryController(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
@Qualifier("postgresDataSource") DataSource postgresDataSource) {
this.mysqlDataSource = mysqlDataSource;
this.postgresDataSource = postgresDataSource;
}
@GetMapping("/query")
public List<List<String>> query(@RequestParam String sql) throws SQLException {
CalciteConnection connection = DataSourceFactory.createConnection(mysqlDataSource, postgresDataSource);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
List<List<String>> result = new ArrayList<>();
while (resultSet.next()) {
int columnCount = resultSet.getMetaData().getColumnCount();
List<String> row = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
row.add(resultSet.getString(i));
}
result.add(row);
}
resultSet.close();
statement.close();
connection.close();
return result;
}
}
主应用
package com.example.multids;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MultidsApplication {
public static void main(String[] args) {
SpringApplication.run(MultidsApplication.class, args);
}
}
测试
MySQL orders
表
CREATE TABLE orders (
id INT PRIMARY KEY,
user_id INT,
amount DECIMAL(10, 2),
order_date DATETIME
);
PostgreSQL users
表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
测试联合查询
SQL 查询:
SELECT o.id AS order_id, u.name AS user_name, o.amount, o.order_date
FROM orders o
JOIN users u ON o.user_id = u.id;
测试结果
$ curl -X GET "http://localhost:8080/query?sql=SELECT%20o.id%20AS%20order_id,%20u.name%20AS%20user_name,%20o.amount,%20o.order_date%20FROM%20orders%20o%20JOIN%20users%20u%20ON%20o.user_id%20=%20u.id"
[
["1", "Alice", "199.99", "2025-04-10 21:30:00"],
["2", "Bob", "250.75", "2025-04-10 20:45:00"]
]
评论区