
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业级数据分析场景中,数据安全是头等大事。不同角色的用户应该只能看到属于自己权限范围内的数据:销售只能看自己区域的订单,财务只能看已审核的报表,管理层可以看全局数据。
传统的权限控制方案往往需要在应用层手动拼接 WHERE 条件,不仅容易出错,还难以应对复杂的 SQL 查询(子查询、CTE、UNION 等)。AskTable 的 SQL 权限守卫通过 SQLGlot AST 改写技术,实现了透明、安全、高性能的行级权限控制。
本文将深入剖析这套权限引擎的设计与实现。
应用层拼接 WHERE 条件:
# 传统做法:手动拼接权限条件
if user.role == "sales":
sql = f"SELECT * FROM orders WHERE region = '{user.region}'"
else:
sql = "SELECT * FROM orders"
问题:
数据库视图方案:
-- 为每个角色创建视图
CREATE VIEW sales_orders AS
SELECT * FROM orders WHERE region = 'East';
问题:
user_id)✅ 透明性:权限控制对 AI 和用户透明,无需手动处理 ✅ 安全性:自动注入权限条件,无遗漏、无注入风险 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 灵活性:支持动态变量(如
{{user_id}})和通配符规则
✅ 高性能:AST 改写开销小(< 10ms)
SQLGlot 是一个强大的 SQL 解析器和转换器,支持 20+ 种 SQL 方言。
核心优势:
为什么不用正则表达式?
SQLGlot 的 Scope 是权限注入的关键。Scope 表示 SQL 中的一个查询范围(SELECT、子查询、CTE 等),包含:
sources:当前 Scope 中的表和别名derived_table_scopes:派生表(子查询)的 Scopecte_scopes:CTE 的 Scopesubquery_scopes:WHERE/HAVING 中的子查询 Scopeunion_scopes:UNION 的 Scope通过递归遍历 Scope 树,我们可以在每个查询层级注入权限条件。
AskTable 的 SQL 权限守卫采用清晰的分层架构:
权限规则是一个简单的 SQL 条件表达式:
# 示例规则
rules = [
"orders.region = 'East'", # 固定值
"orders.user_id = {{user_id}}", # Jinja2 变量
"*.*.status IN ('approved', 'completed')", # 通配符(所有表的 status 字段)
]
规则格式:
schema.table.field <operator> <value>=, !=, >, <, >=, <=, IN, LIKE, IS, NOT IN, NOT LIKE, IS NOT* 表示匹配所有 schema 或 tabledef parse_rule(
rule: str, variables: dict[str, str], sql: str, dialect: str
) -> list[Rule]:
# 1. 渲染 Jinja2 变量
template = Template(rule)
rule_str = template.render(**variables)
# 2. 替换通配符(* -> ALL)
rule_str = rule_str.replace("*", "ALL")
# 3. 解析规则 AST
rule_ast = sqlglot.parse_one(rule_str, read=dialect)
sql_ast = sqlglot.parse_one(sql, read=dialect)
# 4. 提取规则中的列信息
rule_column = extract_column_from_rule(rule_ast)
# 5. 遍历 SQL 中的所有表,匹配规则
rules: list[Rule] = []
for table in sql_ast.find_all(exp.Table):
# 检查 schema 和 table 是否匹配
schema_matches = rule_column.db == "ALL" or rule_column.db == table.db
table_matches = rule_column.table == "ALL" or rule_column.table == table.name
if schema_matches and table_matches:
# 生成针对该表的规则
qualified_ast = qualify_rule_for_table(rule_ast, table)
rules.append(Rule(
schema=table.db,
table=table.name,
field=rule_column.name,
ast=qualified_ast,
))
return rules
关键点:
{{user_id}})*.*.* 可以匹配所有表的指定字段示例:
# 输入规则
rule = "*.*.status = 'approved'"
# 输入 SQL
sql = "SELECT * FROM orders JOIN products ON orders.product_id = products.id"
# 解析后的规则
[
Rule(schema="public", table="orders", field="status", ast="orders.status = 'approved'"),
Rule(schema="public", table="products", field="status", ast="products.status = 'approved'"),
]
class SqlPermissionGuard:
def apply_rules(
self, sql: str, rules: list[str], variables: dict[str, str] | None = None
) -> str:
# 1. 解析 SQL 并构建 Scope 树
ast = sqlglot.parse_one(sql, read=self.dialect).copy()
root_scope = build_scope(ast)
# 2. 解析所有规则
parsed_rules = list(
itertools.chain.from_iterable(
parse_rule(r, variables, sql, self.dialect) for r in rules
)
)
# 3. 递归处理 Scope 树
self._process_scope(root_scope, parsed_rules)
# 4. 生成新的 SQL
return ast.sql(dialect=self.dialect)
def _process_scope(self, scope: Scope, rules: list[Rule]):
"""递归处理 Scope 及其所有子 Scope"""
# 1. 为当前 Scope 构建权限条件
conditions = self._build_conditions_for_scope(scope, rules)
if conditions:
self._inject_conditions(scope, conditions)
# 2. 递归处理所有子 Scope
for subscope in self._get_all_subscopes(scope):
self._process_scope(subscope, rules)
def _get_all_subscopes(self, scope: Scope) -> list[Scope]:
"""获取所有子 Scope(子查询、CTE、UNION 等)"""
return (
scope.derived_table_scopes # 派生表(子查询)
+ scope.cte_scopes # CTE
+ scope.subquery_scopes # WHERE/HAVING 中的子查询
+ scope.union_scopes # UNION
+ scope.udtf_scopes # 用户定义表函数
)
关键点:
def _inject_conditions(self, scope: Scope, conditions: list[exp.Expression]):
"""将权限条件注入到 WHERE 子句"""
# 1. 找到 SELECT 语句
select = (
scope.expression
if isinstance(scope.expression, exp.Select)
else scope.expression.find(exp.Select)
)
if not select:
return
# 2. 合并所有条件(去重)
combined = self._combine_conditions(conditions)
if not combined:
return
# 3. 注入到 WHERE 子句
where = select.find(exp.Where)
if where:
# 已有 WHERE:用 AND 连接
where.set("this", exp.And(this=where.this, expression=combined))
else:
# 无 WHERE:创建新的 WHERE 子句
select.set("where", exp.Where(this=combined))
def _combine_conditions(self, conditions: list[exp.Expression]) -> exp.Expression | None:
"""合并多个条件,去重并用 AND 连接"""
if not conditions:
return None
# 去重(基于 SQL 字符串)
unique_conditions = []
seen = set()
for cond in conditions:
cond_str = cond.sql(dialect=self.dialect)
if cond_str not in seen:
seen.add(cond_str)
unique_conditions.append(cond)
# 用 AND 连接
if len(unique_conditions) == 1:
return unique_conditions[0]
result = unique_conditions[0]
for cond in unique_conditions[1:]:
result = exp.And(this=result, expression=cond)
return result
关键点:
示例:
# 原始 SQL
sql = "SELECT * FROM orders WHERE status = 'pending'"
# 权限规则
rules = ["orders.region = 'East'"]
# 改写后的 SQL
"SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'"
SQL 中的表可能有别名,权限条件需要使用正确的引用:
def build_conditions(
self, rule: Rule, tables: list[tuple[str | None, exp.Table]]
) -> list[exp.Expression]:
"""为匹配的表构建权限条件"""
conditions = []
for alias, table in tables:
condition = rule.ast.copy()
column = condition.find(exp.Column)
if column and column.table:
if alias:
# 有别名:使用别名
column.set("table", exp.to_identifier(alias))
column.set("db", None)
else:
# 无别名:使用完整表名
column.set("table", exp.to_identifier(table.this.this))
column.set("db", exp.to_identifier(table.db))
column.set("catalog", None)
conditions.append(condition)
return conditions
示例:
# 原始 SQL(有别名)
sql = "SELECT * FROM orders o WHERE o.status = 'pending'"
# 权限规则
rules = ["orders.region = 'East'"]
# 改写后的 SQL(使用别名 o)
"SELECT * FROM orders o WHERE o.status = 'pending' AND o.region = 'East'"
# 原始 SQL
sql = """
SELECT * FROM (
SELECT * FROM orders WHERE status = 'pending'
) AS pending_orders
"""
# 权限规则
rules = ["orders.region = 'East'"]
# 改写后的 SQL(子查询内部也被注入)
"""
SELECT * FROM (
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
) AS pending_orders
"""
关键点:
# 原始 SQL
sql = """
WITH pending_orders AS (
SELECT * FROM orders WHERE status = 'pending'
)
SELECT * FROM pending_orders
"""
# 权限规则
rules = ["orders.region = 'East'"]
# 改写后的 SQL
"""
WITH pending_orders AS (
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
)
SELECT * FROM pending_orders
"""
# 原始 SQL
sql = """
SELECT * FROM orders WHERE status = 'pending'
UNION
SELECT * FROM orders WHERE status = 'approved'
"""
# 权限规则
rules = ["orders.region = 'East'"]
# 改写后的 SQL(两个 SELECT 都被注入)
"""
SELECT * FROM orders WHERE status = 'pending' AND orders.region = 'East'
UNION
SELECT * FROM orders WHERE status = 'approved' AND orders.region = 'East'
"""
# 原始 SQL
sql = """
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending'
"""
# 权限规则
rules = [
"orders.region = 'East'",
"products.category = 'Electronics'"
]
# 改写后的 SQL(两个表都被注入)
"""
SELECT o.*, p.name
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.status = 'pending' AND o.region = 'East' AND p.category = 'Electronics'
"""
AST 改写开销:
优化策略:
SQL 注入防护:
权限遗漏防护:
兼容性保障:
# 用户信息
user = {"role": "sales", "region": "East"}
# 权限规则
rules = ["orders.region = '{{region}}'"]
# 原始 SQL(AI 生成)
sql = "SELECT SUM(amount) FROM orders WHERE status = 'completed'"
# 应用权限
filtered_sql = guard(
sql=sql,
sql_dialect="mysql",
rules=rules,
variables={"region": user["region"]}
)
# 结果
"SELECT SUM(amount) FROM orders WHERE status = 'completed' AND orders.region = 'East'"
# 权限规则(所有表的 deleted 字段必须为 0)
rules = ["*.*.deleted = 0"]
# 原始 SQL
sql = """
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending'
"""
# 应用权限
filtered_sql = guard(sql=sql, sql_dialect="postgres", rules=rules)
# 结果(两个表都被注入)
"""
SELECT o.*, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending' AND o.deleted = 0 AND c.deleted = 0
"""
# 权限规则
rules = ["orders.user_id = {{user_id}}"]
# 原始 SQL(包含子查询和 CTE)
sql = """
WITH monthly_sales AS (
SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
FROM orders
WHERE status = 'completed'
GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
SELECT AVG(total) FROM monthly_sales
)
"""
# 应用权限
filtered_sql = guard(
sql=sql,
sql_dialect="postgres",
rules=rules,
variables={"user_id": "12345"}
)
# 结果(CTE 内部被注入)
"""
WITH monthly_sales AS (
SELECT DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total
FROM orders
WHERE status = 'completed' AND orders.user_id = '12345'
GROUP BY month
)
SELECT * FROM monthly_sales
WHERE total > (
SELECT AVG(total) FROM monthly_sales
)
"""
| 方案 | 透明性 | 完整性 | 安全性 | 性能 | 维护成本 |
|---|---|---|---|---|---|
| AST 改写 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 应用层拼接 | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| 数据库视图 | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
| ORM 过滤器 | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Row-Level Security | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ |
AskTable 选择 AST 改写的原因:
问题:不同数据库的标识符引号规则不同。
解决方案:
`table`"table""TABLE"(大写)# 根据方言决定是否加引号
should_quote = self.dialect != "oracle"
column.set("table", exp.to_identifier(name=table_name, quoted=should_quote))
问题:如何区分真实别名和表名?
-- 真实别名
SELECT * FROM orders o
-- 表名(无别名)
SELECT * FROM orders
解决方案:
has_real_alias = source.alias and source.alias != source.name
问题:同一个表在 SQL 中出现多次,会重复注入条件。
解决方案:
set 记录已注入的条件问题:AST 改写失败时如何处理?
解决方案:
try:
filtered_sql = guard(sql, dialect, rules, variables)
except Exception:
# 降级到 LLM 改写
filtered_sql = await llm_rewrite_sql(sql, rules)
AskTable 的 SQL 权限守卫,通过 SQLGlot AST 改写技术,实现了:
✅ 透明性:AI 和用户无需感知权限控制 ✅ 完整性:支持所有 SQL 语法(子查询、CTE、UNION、JOIN) ✅ 安全性:无 SQL 注入风险,无权限遗漏 ✅ 灵活性:支持动态变量和通配符规则 ✅ 高性能:AST 改写开销 < 10ms
我们计划将 SQL 权限守卫 开源,帮助更多团队构建安全的数据分析系统。敬请期待!
相关阅读:
技术交流: