目 录CONTENT

文章目录

SpringBoot与Calcite整合:实现多数据源统一查询系统

在等晚風吹
2025-05-09 / 0 评论 / 0 点赞 / 6 阅读 / 0 字 / 正在检测是否收录...

SpringBoot与Calcite整合:实现多数据源统一查询系统

为什么选择Apache Calcite?

简化开发流程

  • 高层次抽象: Apache Calcite 提供高层次抽象,开发者可专注业务逻辑,无需处理底层数据库连接和查询执行细节。
  • 减少重复工作: 使用 Calcite 避免重复造轮子,节省开发时间和成本。

强大的SQL解析和优化能力

  • SQL标准支持: 支持多种 SQL 方言(如 MySQL、PostgreSQL 等),无缝处理不同数据库的 SQL 语句。
  • 查询优化: 内置查询优化器根据数据源特性智能优化,提升查询性能。

灵活性和可扩展性

  • 自定义模式和表: 通过编程动态添加和管理多数据源,支持不同模式和表结构。
  • 插件机制: 支持多种插件,可灵活扩展功能,如自定义函数、聚合操作等。

高性能

  • 内存计算: 支持内存数据处理,减少 I/O 开销,提升查询速度。
  • 分布式计算: 可扩展到分布式环境,支持大规模数据集处理(本项目聚焦单机实现)。

集成性强

  • 与其他工具集成: 支持与 Apache Flink、Presto 等大数据工具集成,构建完整数据分析解决方案。

哪些公司使用了Apache Calcite?

  • Google: 用于高性能、灵活的数据处理系统。
  • IBM: 增强数据仓库和分析解决方案的查询性能。
  • Intel: 支持大数据分析工具,专注内存计算。
  • Alibaba Cloud: 在大数据平台中提供查询优化和执行能力。
  • MaxCompute (ODPS): 阿里巴巴大规模数据计算服务使用 Calcite 处理 SQL 查询。
  • Elasticsearch: Kibana 的复杂查询依赖 Calcite 进行 SQL 解析和优化。
  • Netflix: 构建数据虚拟化层,支持复杂查询和分析。
  • Microsoft: 用于 Azure Synapse Analytics 等大数据产品。
  • Teradata: 增强数据库系统查询优化和性能。
  • Uber: 处理庞大数据集,支持复杂查询和分析。

应用场景

数据虚拟化

  • 虚拟数据层: 集中分散数据,提供统一视图。
  • 动态数据源管理: 动态添加和管理数据源,支持灵活架构。

商业智能 (BI) 工具

  • 报表生成: 支持复杂报表生成和数据分析。
  • 自助服务分析: 提供非技术人员数据探索功能。

机器学习与人工智能

  • 特征工程: 用于特征提取和数据准备。
  • 模型训练: 结合 AI 框架处理大规模数据集。

多数据源查询

  • 统一接口: 通过单一接口查询 MySQL、PostgreSQL、Oracle 等数据库。
  • 联合查询: 支持跨数据源复杂 SQL 查询。

大数据平台集成

  • Hadoop 生态: 与 Hive、HBase、Druid 等结合,提供统一查询接口。
  • 流处理与批处理: 支持 Apache Flink、Apache Beam 实现实时分析。

嵌入式数据库

  • 轻量级 SQL 引擎: 适用于嵌入式应用和内存数据库。
  • 内存计算: 加速查询性能,适合快速响应场景。

数据湖解决方案

  • 统一元数据管理: 提供统一元数据管理和查询接口。
  • 多样化数据格式: 支持 JSON、Parquet、ORC 等格式。

代码实操

Maven 依赖

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Apache Calcite Core -->
    <dependency>
        <groupId>org.apache.calcite</groupId>
        <artifactId>calcite-core</artifactId>
        <version>1.32.0</version>
    </dependency>
    <!-- HikariCP Connection Pool -->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
    </dependency>
    <!-- MySQL Connector Java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- PostgreSQL JDBC Driver -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- Test Dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

配置文件 (application.yml)

spring:
  datasource:
    order-db:
      url: jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
      username: root
      password: root
      driver-class-name: com.mysql.cj.jdbc.Driver
    user-db:
      url: jdbc:postgresql://localhost:5432/user_db
      username: postgres
      password: postgres
      driver-class-name: org.postgresql.Driver
jpa:
  show-sql: true
  hibernate:
    ddl-auto: update
  properties:
    hibernate:
      dialect: org.hibernate.dialect.MySQL8Dialect

数据源配置

package com.example.multids.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    @Bean(name = "mysqlDataSource")
    public DataSource mysqlDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("root");
        return new HikariDataSource(config);
    }

    @Bean(name = "postgresDataSource")
    public DataSource postgresDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/user_db");
        config.setUsername("postgres");
        config.setPassword("postgres");
        return new HikariDataSource(config);
    }
}

自定义数据源工厂

package com.example.multids.factory;

import com.example.multids.schema.MySchemas;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DataSourceFactory {

    public static CalciteConnection createConnection(DataSource mysqlDataSource, DataSource postgresDataSource) throws SQLException {
        String modelJson = "{\n" +
                "  \"version\": \"1.0\",\n" +
                "  \"defaultSchema\": \"my_schemas\",\n" +
```markdown
                "  \"schemas\": [\n" +
                "    {\n" +
                "      \"name\": \"my_schemas\",\n" +
                "      \"type\": \"custom\",\n" +
                "      \"factory\": \"" + ReflectiveSchema.Factory.class.getName() + "\",\n" +
                "      \"operand\": {\n" +
                "        \"class\": \"" + MySchemas.class.getName() + "\"\n" +
                "      }\n" +
                "    }\n" +
                "  ]\n" +
                "}";
        
        Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + modelJson);
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        SchemaPlus schema = calciteConnection.getRootSchema().getSubSchema("my_schemas");
        schema.add("orders", JdbcSchema.create(calciteConnection.getRootSchema(), "orders", mysqlDataSource, null, Lex.MYSQL));
        schema.add("users", JdbcSchema.create(calciteConnection.getRootSchema(), "users", postgresDataSource, null, Lex.POSTGRESQL));

        return calciteConnection;
    }
}

自定义模式

package com.example.multids.schema;

import org.apache.calcite.schema.impl.AbstractSchema;

import java.util.Map;

public class MySchemas extends AbstractSchema {
    @Override
    protected Map<String, org.apache.calcite.schema.Table> getTableMap() {
        return super.getTableMap();
    }
}

查询控制器

package com.example.multids.controller;

import com.example.multids.factory.DataSourceFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

@RestController
public class QueryController {

    private final DataSource mysqlDataSource;
    private final DataSource postgresDataSource;

    @Autowired
    public QueryController(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
                           @Qualifier("postgresDataSource") DataSource postgresDataSource) {
        this.mysqlDataSource = mysqlDataSource;
        this.postgresDataSource = postgresDataSource;
    }

    @GetMapping("/query")
    public List<List<String>> query(@RequestParam String sql) throws SQLException {
        CalciteConnection connection = DataSourceFactory.createConnection(mysqlDataSource, postgresDataSource);
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        List<List<String>> result = new ArrayList<>();
        while (resultSet.next()) {
            int columnCount = resultSet.getMetaData().getColumnCount();
            List<String> row = new ArrayList<>();
            for (int i = 1; i <= columnCount; i++) {
                row.add(resultSet.getString(i));
            }
            result.add(row);
        }

        resultSet.close();
        statement.close();
        connection.close();

        return result;
    }
}

主应用

package com.example.multids;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MultidsApplication {
    public static void main(String[] args) {
        SpringApplication.run(MultidsApplication.class, args);
    }
}

测试

MySQL orders

CREATE TABLE orders (
    id INT PRIMARY KEY,
    user_id INT,
    amount DECIMAL(10, 2),
    order_date DATETIME
);

PostgreSQL users

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);

测试联合查询

SQL 查询:

SELECT o.id AS order_id, u.name AS user_name, o.amount, o.order_date
FROM orders o
JOIN users u ON o.user_id = u.id;

测试结果

$ curl -X GET "http://localhost:8080/query?sql=SELECT%20o.id%20AS%20order_id,%20u.name%20AS%20user_name,%20o.amount,%20o.order_date%20FROM%20orders%20o%20JOIN%20users%20u%20ON%20o.user_id%20=%20u.id"
[
    ["1", "Alice", "199.99", "2025-04-10 21:30:00"],
    ["2", "Bob", "250.75", "2025-04-10 20:45:00"]
]
0

评论区