
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业数据分析场景中,数据往往分散在不同类型的数据库中:MySQL 存储业务数据,ClickHouse 做实时分析,Oracle 管理财务数据,PostgreSQL 处理用户数据。如何让 AI 无缝访问这些异构数据源?AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持。
典型企业的数据库分布:
挑战:
AskTable 通过抽象基类定义统一接口,每种数据库实现自己的适配器:
class DataSourceAdmin:
project_id: str
id: str
name: str
engine: DBEngine # 数据库类型
access_config: AccessConfigPrivate | None
file_paths: dict[str, str] | None
@property
def accessor(self):
"""获取数据源的数据访问器"""
if self._accessor:
return self._accessor
if not self.access_config:
raise errors.DataSourceConfigError
self._accessor = BaseAccessor.create(
engine=self.engine,
access_config=self.access_config.model_dump(),
file_paths=self.file_paths,
)
return self._accessor
@property
def dialect(self) -> str:
"""获取 SQL 方言"""
if self.engine == DBEngine.dap:
return self.accessor.dialect
return sql_dialect_adaptor(self.engine)
@property
def runtime_meta(self) -> MetaBase:
"""实时获取数据源的 MetaData"""
log.info(f"get runtime meta for {self.id}")
meta = self.accessor.get_meta()
return meta
@property
def brain_meta(self) -> MetaAdmin:
"""获取 Brain 中的 MetaData(由 get_admin 预加载)"""
if not self._meta:
raise RuntimeError("brain_meta not loaded, use get_admin(with_meta=True)")
return self._meta
设计亮点:
# 所有数据库都通过相同的接口访问
accessor = datasource.accessor
# 连接数据库
with accessor.connect():
# 执行查询
df = accessor.query("SELECT * FROM users LIMIT 10")
# 获取元数据
meta = accessor.get_meta()
优势:
self._accessor = BaseAccessor.create(
engine=self.engine,
access_config=self.access_config.model_dump(),
file_paths=self.file_paths,
)
工厂模式:根据 engine 类型动态创建对应的适配器
# 伪代码示例
class BaseAccessor:
@classmethod
def create(cls, engine: DBEngine, access_config: dict, file_paths: dict):
if engine == DBEngine.mysql:
return MySQLAccessor(access_config)
elif engine == DBEngine.postgresql:
return PostgreSQLAccessor(access_config)
elif engine == DBEngine.clickhouse:
return ClickHouseAccessor(access_config)
elif engine == DBEngine.oracle:
return OracleAccessor(access_config)
# ... 40+ 种数据库
else:
raise UnsupportedDatabaseError(f"Unsupported engine: {engine}")
@property
def dialect(self) -> str:
if self.engine == DBEngine.dap:
return self.accessor.dialect
return sql_dialect_adaptor(self.engine)
方言映射:
def sql_dialect_adaptor(engine: DBEngine) -> str:
"""将数据库引擎映射到 SQL 方言"""
mapping = {
DBEngine.mysql: "mysql",
DBEngine.postgresql: "postgres",
DBEngine.clickhouse: "clickhouse",
DBEngine.oracle: "oracle",
DBEngine.sqlserver: "tsql",
DBEngine.hive: "hive",
DBEngine.doris: "mysql", # Doris 兼容 MySQL 协议
DBEngine.starrocks: "mysql", # StarRocks 兼容 MySQL 协议
# ...
}
return mapping.get(engine, "generic")
用途:
从代码中可以看到 AskTable 支持的数据库类型:
# 关系型数据库
- MySQL / MariaDB
- PostgreSQL
- Oracle
- SQL Server
- SQLite
# 云数据库
- 阿里云 ADB MySQL / ADB PostgreSQL
- 阿里云 PolarDB MySQL / PolarDB PostgreSQL
- 阿里云 Hologres
- 腾讯云 TDSQL MySQL / TDSQL PostgreSQL
- 华为云 GaussDB / GaussDB DWS
# 分析型数据库
- ClickHouse
- Doris
- StarRocks
- SelectDB
- Databend
# 大数据平台
- Hive
- MaxCompute
# 国产数据库
- 达梦 (DaMeng)
- 人大金仓 (KingBase ES)
- 南大通用 (GBase 8a / 8c)
- 瀚高 (HighGo)
- 虚谷 (XuGu)
- 优炫 (UXDB)
- 神舟通用 (Oscar)
- 翰高 (MogDB)
- 海量 (Vastbase)
- 沃趣 (YashanDB)
- 南大通用 (GreenPlum)
- 巨杉 (SequoiaDB)
- 柏睿 (RapidsDB)
- 偶数 (OceanBase)
# 文档数据
- Excel
- 飞书多维表格 (Bitable)
# 数据湖
- DAP (Data Access Platform)
总计 40+ 种数据库!
以 MySQL 为例,看看适配器的实现:
class MySQLAccessor(BaseAccessor):
def __init__(self, access_config: dict):
self.host = access_config["host"]
self.port = access_config["port"]
self.database = access_config["database"]
self.username = access_config["username"]
self.password = access_config["password"]
self.connection = None
def connect(self):
"""建立数据库连接"""
import pymysql
self.connection = pymysql.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.username,
password=self.password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return self
def query(self, sql: str) -> pd.DataFrame:
"""执行查询并返回 DataFrame"""
if not self.connection:
raise ConnectionError("Not connected to database")
with self.connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return pd.DataFrame(result)
def get_meta(self) -> MetaBase:
"""获取数据库元数据"""
schemas = []
# 获取所有表
tables_sql = """
SELECT
TABLE_SCHEMA,
TABLE_NAME,
TABLE_COMMENT
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
"""
tables = self.query_raw(tables_sql, (self.database,))
for table in tables:
# 获取表的字段信息
fields_sql = """
SELECT
COLUMN_NAME,
DATA_TYPE,
COLUMN_COMMENT,
IS_NULLABLE,
COLUMN_KEY
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
"""
fields = self.query_raw(fields_sql, (self.database, table['TABLE_NAME']))
# 构建元数据结构
# ...
return MetaBase(schemas=schemas)
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
self.connection = None
关键点:
ClickHouse 是列式数据库,有特殊的查询优化:
class ClickHouseAccessor(BaseAccessor):
def query(self, sql: str) -> pd.DataFrame:
"""ClickHouse 特殊处理"""
# 添加 FORMAT 子句
if "FORMAT" not in sql.upper():
sql = f"{sql} FORMAT JSONEachRow"
# 使用 HTTP 接口
response = requests.post(
f"http://{self.host}:{self.port}",
params={"database": self.database},
data=sql,
auth=(self.username, self.password)
)
# 解析 JSON 结果
lines = response.text.strip().split('\n')
data = [json.loads(line) for line in lines]
return pd.DataFrame(data)
def get_meta(self) -> MetaBase:
"""从 system.tables 和 system.columns 获取元数据"""
tables_sql = """
SELECT
database,
name as table_name,
comment
FROM system.tables
WHERE database = '{database}'
"""
# ...
特点:
Oracle 有独特的分页和元数据查询方式:
class OracleAccessor(BaseAccessor):
def query(self, sql: str, limit: int = None) -> pd.DataFrame:
"""Oracle 分页处理"""
if limit:
# Oracle 11g 使用 ROWNUM
sql = f"""
SELECT * FROM (
{sql}
) WHERE ROWNUM <= {limit}
"""
# Oracle 12c+ 可以使用 FETCH FIRST
# sql = f"{sql} FETCH FIRST {limit} ROWS ONLY"
return super().query(sql)
def get_meta(self) -> MetaBase:
"""从 ALL_TABLES 和 ALL_TAB_COLUMNS 获取元数据"""
tables_sql = """
SELECT
OWNER,
TABLE_NAME,
COMMENTS
FROM ALL_TAB_COMMENTS
WHERE OWNER = :owner
"""
# ...
特点:
Excel 不是数据库,但 AskTable 也支持:
class ExcelAccessor(BaseAccessor):
def __init__(self, access_config: dict, file_paths: dict):
self.file_path = file_paths.get("excel_file")
self.sheets = {}
def connect(self):
"""加载 Excel 文件"""
import openpyxl
self.workbook = openpyxl.load_workbook(self.file_path)
# 将每个 sheet 转换为 DataFrame
for sheet_name in self.workbook.sheetnames:
sheet = self.workbook[sheet_name]
data = sheet.values
cols = next(data)
self.sheets[sheet_name] = pd.DataFrame(data, columns=cols)
return self
def query(self, sql: str) -> pd.DataFrame:
"""使用 DuckDB 执行 SQL"""
import duckdb
# 创建临时数据库
conn = duckdb.connect(':memory:')
# 注册所有 sheet 为表
for sheet_name, df in self.sheets.items():
conn.register(sheet_name, df)
# 执行 SQL
result = conn.execute(sql).fetchdf()
conn.close()
return result
def get_meta(self) -> MetaBase:
"""从 DataFrame 推断元数据"""
schemas = []
for sheet_name, df in self.sheets.items():
fields = []
for col in df.columns:
fields.append({
"name": col,
"data_type": str(df[col].dtype),
"description": ""
})
# ...
return MetaBase(schemas=schemas)
特点:
需求:用户想知道"MySQL 中的订单数据和 ClickHouse 中的用户行为数据的关联分析"
实现:
# 1. 从 MySQL 查询订单数据
mysql_ds = get_datasource("mysql_orders")
orders_df = mysql_ds.accessor.query("""
SELECT user_id, order_amount, order_date
FROM orders
WHERE order_date >= '2024-01-01'
""")
# 2. 从 ClickHouse 查询用户行为数据
clickhouse_ds = get_datasource("clickhouse_events")
events_df = clickhouse_ds.accessor.query("""
SELECT user_id, event_type, event_count
FROM user_events
WHERE event_date >= '2024-01-01'
GROUP BY user_id, event_type
""")
# 3. 在内存中关联
import duckdb
conn = duckdb.connect(':memory:')
conn.register('orders', orders_df)
conn.register('events', events_df)
result = conn.execute("""
SELECT
o.user_id,
SUM(o.order_amount) as total_amount,
e.event_count
FROM orders o
LEFT JOIN events e ON o.user_id = e.user_id
WHERE e.event_type = 'page_view'
GROUP BY o.user_id, e.event_count
ORDER BY total_amount DESC
LIMIT 100
""").fetchdf()
背景:企业从 Oracle 迁移到达梦数据库
AskTable 的优势:
# 只需修改配置,无需修改代码
# 原来
datasource_config = {
"engine": "oracle",
"host": "oracle.example.com",
"port": 1521,
"database": "ORCL",
# ...
}
# 迁移后
datasource_config = {
"engine": "dameng", # 只改这一行
"host": "dameng.example.com",
"port": 5236,
"database": "DMDB",
# ...
}
无缝切换:
背景:
AskTable 统一接入:
# 配置多个数据源
datasources = [
{
"name": "本地业务库",
"engine": "mysql",
"host": "192.168.1.100",
# ...
},
{
"name": "阿里云分析库",
"engine": "clickhouse",
"host": "xxx.clickhouse.aliyuncs.com",
# ...
},
{
"name": "腾讯云日志",
"engine": "cls",
"endpoint": "xxx.cls.tencentyun.com",
# ...
}
]
# 用户提问:"对比本地订单和云端分析数据"
# AskTable 自动:
# 1. 识别需要访问哪些数据源
# 2. 生成对应方言的 SQL
# 3. 并行查询
# 4. 合并结果
class ConnectionPool:
def __init__(self, accessor_factory, max_connections=10):
self.accessor_factory = accessor_factory
self.max_connections = max_connections
self.pool = queue.Queue(maxsize=max_connections)
self._init_pool()
def _init_pool(self):
for _ in range(self.max_connections):
accessor = self.accessor_factory()
accessor.connect()
self.pool.put(accessor)
def get_connection(self):
return self.pool.get(timeout=30)
def return_connection(self, accessor):
self.pool.put(accessor)
优势:
class DataSourceAdmin:
_meta_cache: dict[str, MetaAdmin] = {}
_cache_ttl: int = 3600 # 1 小时
@property
def brain_meta(self) -> MetaAdmin:
cache_key = f"{self.id}:{self.modified_at}"
if cache_key in self._meta_cache:
return self._meta_cache[cache_key]
# 从数据库加载元数据
meta = self._load_meta_from_db()
self._meta_cache[cache_key] = meta
return meta
优势:
def optimize_query(sql: str, dialect: str) -> str:
"""根据方言优化 SQL"""
if dialect == "clickhouse":
# ClickHouse 优化
# 1. 添加 PREWHERE 子句
# 2. 使用 FINAL 处理重复数据
# 3. 优化 JOIN 顺序
pass
elif dialect == "mysql":
# MySQL 优化
# 1. 添加索引提示
# 2. 优化子查询
# 3. 使用 STRAIGHT_JOIN
pass
# ...
return optimized_sql
安全性:
# ✅ 推荐:使用密钥管理服务
access_config = {
"host": "db.example.com",
"port": 3306,
"database": "mydb",
"username": "user",
"password": get_secret("db_password") # 从密钥管理服务获取
}
# ❌ 不推荐:明文存储密码
access_config = {
"password": "plain_text_password"
}
try:
result = datasource.accessor.query(sql)
except ConnectionError:
# 连接失败,重试
retry_with_backoff()
except QueryTimeoutError:
# 查询超时,优化 SQL 或增加超时时间
log.warning(f"Query timeout: {sql}")
except PermissionError:
# 权限不足,检查用户权限
log.error(f"Permission denied for user {username}")
# 记录查询性能
@monitor_query_performance
def query(self, sql: str) -> pd.DataFrame:
start_time = time.time()
result = self._execute_query(sql)
duration = time.time() - start_time
# 记录慢查询
if duration > 5.0:
log.warning(f"Slow query ({duration:.2f}s): {sql}")
# 上报指标
metrics.record("query_duration", duration, tags={
"database": self.engine,
"datasource": self.id
})
return result
AskTable 通过插件化的数据库适配器架构,实现了对 40+ 种数据库的统一支持:
这种架构让 AskTable 能够适应各种复杂的企业数据环境,无论是传统数据库、云数据库、国产数据库,还是大数据平台,都能无缝接入,为用户提供统一的 AI 数据分析体验。