
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在多租户数据分析系统中,SQL 权限控制是保障数据安全的核心机制。传统的权限控制往往依赖数据库层面的视图或行级安全策略,但这种方式存在灵活性不足、难以动态调整等问题。AskTable 采用了一种创新的方法:基于 AST(抽象语法树)的 SQL 权限控制系统,通过 SQLGlot 库的 Scope 分析能力,在 SQL 执行前动态注入权限条件。
SQL Guard 的设计遵循"Clean Architecture"原则,将权限控制逻辑与业务逻辑完全分离。其核心思想是:
* 通配符匹配任意 schema 或 table权限规则采用类 SQL 的表达式语法,支持 Jinja2 变量模板:
# 示例规则
rules = [
"users.orders.user_id = {{ current_user_id }}", # 用户只能查看自己的订单
"*.*.region IN ('CN', 'US')", # 限制地区范围
"sales.*.created_at >= '2024-01-01'", # 时间范围限制
]
规则解析器首先将规则字符串转换为 AST,并识别其中的 schema、table、field 信息:
def parse_rule(rule: str, variables: dict, sql: str, dialect: str) -> list[Rule]:
# 1. 渲染 Jinja2 变量
template = Template(rule)
rule_str = template.render(**variables)
# 2. 替换通配符 * 为 ALL
rule_str = rule_str.replace("*", "ALL")
# 3. 解析规则和 SQL 为 AST
rule_ast = sqlglot.parse_one(rule_str, read=dialect)
sql_ast = sqlglot.parse_one(sql, read=dialect)
# 4. 提取规则中的列信息
rule_column = extract_column_from_rule(rule_ast)
# 5. 遍历 SQL 中的所有表,匹配规则
rules = []
for table in sql_ast.find_all(exp.Table):
if matches(rule_column, table):
qualified_ast = qualify_rule_ast(rule_ast, table)
rules.append(Rule(
schema=table.db,
table=table.name,
field=rule_column.name,
ast=qualified_ast
))
return rules
SQLGlot 的 Scope 提供了强大的作用域分析能力,可以准确识别表的别名、子查询、CTE 等复杂结构:
def _process_scope(self, scope: Scope, rules: list[Rule]):
# 1. 为当前 scope 构建权限条件
conditions = self._build_conditions_for_scope(scope, rules)
if conditions:
self._inject_conditions(scope, conditions)
# 2. 递归处理所有子 scope
for subscope in self._get_all_subscopes(scope):
self._process_scope(subscope, rules)
def _get_all_subscopes(self, scope: Scope) -> list[Scope]:
return (
scope.derived_table_scopes + # FROM 子查询
scope.cte_scopes + # WITH CTE
scope.subquery_scopes + # WHERE/SELECT 子查询
scope.union_scopes + # UNION
scope.udtf_scopes # 表函数
)
条件注入需要处理表别名、引号、以及与现有 WHERE 子句的合并:
def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]):
# 1. 找到 SELECT 语句
select = scope.expression if isinstance(scope.expression, exp.Select) \
else scope.expression.find(exp.Select)
# 2. 合并所有条件(去重)
combined = self._combine_conditions(conditions)
# 3. 注入到 WHERE 子句
where = select.find(exp.Where)
if where:
# 与现有条件用 AND 连接
where.set("this", exp.And(this=where.this, expression=combined))
else:
# 创建新的 WHERE 子句
select.set("where", exp.Where(this=combined))
# 原始 SQL
sql = """
SELECT o.id, o.amount, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""
# 权限规则
rules = ["*.orders.tenant_id = '{{ tenant_id }}'"]
variables = {"tenant_id": "tenant_123"}
# 应用权限后的 SQL
protected_sql = guard(sql, "postgres", rules, variables)
生成的 SQL:
SELECT o.id, o.amount, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
AND o.tenant_id = 'tenant_123' -- 自动注入
sql = """
SELECT *
FROM (
SELECT user_id, SUM(amount) as total
FROM orders
GROUP BY user_id
) subq
WHERE total > 1000
"""
rules = ["*.orders.region = 'CN'"]
protected_sql = guard(sql, "mysql", rules)
生成的 SQL:
SELECT *
FROM (
SELECT user_id, SUM(amount) as total
FROM orders
WHERE region = 'CN' -- 注入到子查询内部
GROUP BY user_id
) subq
WHERE total > 1000
sql = """
WITH recent_orders AS (
SELECT * FROM orders WHERE created_at > '2024-01-01'
)
SELECT * FROM recent_orders
UNION ALL
SELECT * FROM orders WHERE status = 'pending'
"""
rules = ["*.orders.user_id = {{ user_id }}"]
variables = {"user_id": "123"}
protected_sql = guard(sql, "postgres", rules, variables)
生成的 SQL:
WITH recent_orders AS (
SELECT * FROM orders
WHERE created_at > '2024-01-01'
AND user_id = '123' -- CTE 内部注入
)
SELECT * FROM recent_orders
UNION ALL
SELECT * FROM orders
WHERE status = 'pending'
AND user_id = '123' -- UNION 分支注入
* 可匹配任意 schema 或 tableSQL Guard 的性能开销主要来自 AST 解析和 Scope 构建:
基于 AST 的 SQL 权限控制系统是 AskTable 安全架构的核心组件。通过 SQLGlot 的强大能力,我们实现了:
这种设计不仅保障了数据安全,还为多租户、行级安全等复杂场景提供了优雅的解决方案。