
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业数据分析场景中,不同用户应该看到不同的数据。销售人员只能查询自己负责地区的数据,财务人员只能查询本部门的数据。传统的表级权限控制过于粗糙,而 AskTable 通过 Policy 系统实现了灵活的行级安全控制。
表级权限:
-- 授予用户查询订单表的权限
GRANT SELECT ON orders TO user_sales;
问题:
视图方案:
-- 为每个地区创建视图
CREATE VIEW orders_beijing AS
SELECT * FROM orders WHERE region = '北京';
CREATE VIEW orders_shanghai AS
SELECT * FROM orders WHERE region = '上海';
-- 授予不同用户不同视图的权限
GRANT SELECT ON orders_beijing TO user_beijing;
GRANT SELECT ON orders_shanghai TO user_shanghai;
问题:
通过 Policy 系统,定义灵活的行级过滤规则:
# 定义策略:销售人员只能查询自己负责地区的数据
policy = RowFilter(
condition="region_filter",
db_regex=".*", # 匹配所有 schema
table_regex="orders|sales", # 匹配订单和销售表
field_regex="region|area", # 匹配地区字段
operator_expression="= '{{ user_region }}'", # 过滤条件
variables=["user_region"] # 需要的变量
)
# 应用策略
# 原始 SQL:SELECT * FROM orders
# 注入后:SELECT * FROM orders WHERE region = '北京'
优势:
上图展示了 Policy 从定义到应用的完整流程:通过正则表达式匹配相关字段,使用 Jinja2 渲染过滤条件,最后将条件注入到用户 SQL 中,实现透明的行级过滤。
class RowFilter:
condition: str # 策略名称
db_regex: str # Schema 正则表达式
table_regex: str # 表名正则表达式
field_regex: str # 字段名正则表达式
operator_expression: str # 过滤表达式
variables: list[str] # 需要的变量
def __init__(
self,
condition: str,
db_regex: str,
table_regex: str,
field_regex: str,
operator_expression: str,
variables: list[str],
):
self.condition = condition
self.db_regex = db_regex
self.table_regex = table_regex
self.field_regex = field_regex
self.operator_expression = operator_expression
self.variables = variables
def __repr__(self):
return f"<Filter {self.condition}>"
设计亮点:
db_regex: str # 匹配 Schema
table_regex: str # 匹配表名
field_regex: str # 匹配字段名
为什么需要三层?
场景 1:跨表统一过滤
# 所有包含 user_id 字段的表都按用户过滤
RowFilter(
condition="user_filter",
db_regex=".*", # 所有 schema
table_regex=".*", # 所有表
field_regex="user_id", # 只匹配 user_id 字段
operator_expression="= '{{ current_user_id }}'",
variables=["current_user_id"]
)
# 应用到:
# - orders 表:WHERE user_id = '123'
# - payments 表:WHERE user_id = '123'
# - reviews 表:WHERE user_id = '123'
场景 2:特定表特定字段
# 只对订单表的地区字段过滤
RowFilter(
condition="region_filter",
db_regex="public", # 只匹配 public schema
table_regex="orders|order_items", # 只匹配订单相关表
field_regex="region|area|province", # 匹配地区相关字段
operator_expression="IN ({{ allowed_regions }})",
variables=["allowed_regions"]
)
场景 3:排除特定表
# 除了管理员表,其他表都按部门过滤
RowFilter(
condition="department_filter",
db_regex=".*",
table_regex="^(?!admin_).*", # 排除 admin_ 开头的表
field_regex="dept_id|department_id",
operator_expression="= '{{ user_dept_id }}'",
variables=["user_dept_id"]
)
operator_expression: str # 支持 Jinja2 模板
variables: list[str] # 需要的变量
简单变量替换:
operator_expression = "= '{{ user_id }}'"
variables = {"user_id": "123"}
# 渲染结果:= '123'
复杂逻辑:
operator_expression = """
{% if is_admin %}
IS NOT NULL
{% else %}
IN ({{ allowed_values | join(', ') }})
{% endif %}
"""
variables = {
"is_admin": False,
"allowed_values": ["'北京'", "'上海'", "'广州'"]
}
# 渲染结果:IN ('北京', '上海', '广州')
日期计算:
operator_expression = ">= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)"
variables = {"days": 30}
# 渲染结果:>= DATE_SUB(NOW(), INTERVAL 30 DAY)
def get_real_conditions_by_table(
self,
meta: MetaAdmin,
table_full_names: list[str],
variables: dict[str, str] | None = None,
):
"""
返回对应的所有表的真实条件
返回结果:
{
"schema1.table1": [
"schema1.table1.user_id = '123'",
"schema1.table1.region = '北京'"
],
"schema1.table2": [
"schema1.table2.user_id = '123'"
]
}
"""
result = {}
log.debug(
f"Filter {self} by {meta} in {table_full_names} with variables {variables}"
)
# 1. 按正则表达式过滤元数据
filtered_meta = meta.filter_by_regex(
self.db_regex, self.table_regex, self.field_regex
)
log.debug(f"Filtered meta: {filtered_meta}")
# 2. 渲染表达式
if variables:
expr = render_jinja2_template(self.operator_expression, variables)
log.debug(
f"Rendered expr: {expr}, operator_expression: {self.operator_expression}, variables: {variables}"
)
else:
expr = self.operator_expression
log.debug(
f"Rendered expr: {expr}, operator_expression: {self.operator_expression}"
)
# 3. 为每个匹配的字段生成条件
for schema in filtered_meta.schemas.values():
for table in schema.tables.values():
if table.full_name in table_full_names:
result[table.full_name] = [
f"{field.full_name} {expr}" for field in table.fields.values()
]
log.debug(f"Conditions: {result}")
total_conds = sum([len(conds) for conds in result.values()])
log.info(
f"Get {total_conds} conditions by {self} in {len(table_full_names)} tables."
)
return result
执行流程:
示例:
# 策略定义
policy = RowFilter(
condition="user_region_filter",
db_regex="public",
table_regex="orders|sales",
field_regex="region",
operator_expression="= '{{ user_region }}'",
variables=["user_region"]
)
# 应用策略
conditions = policy.get_real_conditions_by_table(
meta=meta,
table_full_names=["public.orders", "public.sales"],
variables={"user_region": "北京"}
)
# 结果
{
"public.orders": [
"public.orders.region = '北京'"
],
"public.sales": [
"public.sales.region = '北京'"
]
}
将策略条件注入到用户的 SQL 查询中:
def inject_row_filters(
original_sql: str,
conditions: dict[str, list[str]],
dialect: str = "mysql"
) -> str:
"""
将行级过滤条件注入到 SQL 中
"""
import sqlglot
# 解析 SQL
parsed = sqlglot.parse_one(original_sql, dialect=dialect)
# 遍历所有表引用
for table_node in parsed.find_all(sqlglot.exp.Table):
table_name = table_node.name
schema_name = table_node.db or "public"
full_name = f"{schema_name}.{table_name}"
# 获取该表的过滤条件
if full_name in conditions:
table_conditions = conditions[full_name]
# 构建 WHERE 条件
for condition_str in table_conditions:
condition_expr = sqlglot.parse_one(condition_str, dialect=dialect)
# 注入到 WHERE 子句
if parsed.args.get("where"):
# 已有 WHERE,用 AND 连接
parsed.args["where"] = sqlglot.exp.And(
this=parsed.args["where"],
expression=condition_expr
)
else:
# 没有 WHERE,直接添加
parsed.args["where"] = condition_expr
# 生成新的 SQL
return parsed.sql(dialect=dialect)
示例:
# 原始 SQL
original_sql = """
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
"""
# 策略条件
conditions = {
"public.orders": ["public.orders.region = '北京'"],
"public.customers": ["public.customers.region = '北京'"]
}
# 注入后的 SQL
injected_sql = inject_row_filters(original_sql, conditions)
# 结果
"""
SELECT
o.order_id,
o.amount,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
AND o.region = '北京'
AND c.region = '北京'
"""
需求:SaaS 平台,每个租户只能查询自己的数据
策略定义:
tenant_filter = RowFilter(
condition="tenant_isolation",
db_regex=".*", # 所有 schema
table_regex=".*", # 所有表
field_regex="tenant_id|org_id|company_id", # 租户 ID 字段
operator_expression="= '{{ tenant_id }}'",
variables=["tenant_id"]
)
应用:
# 用户登录时获取租户 ID
user = authenticate(username, password)
tenant_id = user.tenant_id
# 查询时自动注入租户过滤
# 原始:SELECT * FROM orders
# 注入:SELECT * FROM orders WHERE tenant_id = 'tenant_001'
效果:
需求:销售人员只能查询自己负责地区的数据
策略定义:
region_filter = RowFilter(
condition="region_access",
db_regex="public",
table_regex="orders|sales|customers",
field_regex="region|area|province|city",
operator_expression="IN ({{ allowed_regions }})",
variables=["allowed_regions"]
)
应用:
# 用户登录时获取负责地区
user = authenticate(username, password)
allowed_regions = user.get_allowed_regions() # ['北京', '天津', '河北']
# 渲染表达式
variables = {
"allowed_regions": ", ".join([f"'{r}'" for r in allowed_regions])
}
# 结果:IN ('北京', '天津', '河北')
# 查询时自动注入
# 原始:SELECT * FROM orders
# 注入:SELECT * FROM orders WHERE region IN ('北京', '天津', '河北')
需求:普通用户只能查询最近 90 天的数据,管理员无限制
策略定义:
time_filter = RowFilter(
condition="time_range",
db_regex=".*",
table_regex=".*",
field_regex="created_at|updated_at|order_date|sale_date",
operator_expression="""
{% if is_admin %}
IS NOT NULL
{% else %}
>= DATE_SUB(NOW(), INTERVAL {{ days }} DAY)
{% endif %}
""",
variables=["is_admin", "days"]
)
应用:
# 普通用户
variables = {"is_admin": False, "days": 90}
# 渲染:>= DATE_SUB(NOW(), INTERVAL 90 DAY)
# 管理员
variables = {"is_admin": True, "days": 90}
# 渲染:IS NOT NULL(相当于无限制)
需求:用户只能查询本部门及下级部门的数据
策略定义:
dept_filter = RowFilter(
condition="department_hierarchy",
db_regex=".*",
table_regex=".*",
field_regex="dept_id|department_id",
operator_expression="IN ({{ dept_ids }})",
variables=["dept_ids"]
)
应用:
# 获取用户部门及下级部门
user = authenticate(username, password)
dept_tree = get_department_tree(user.dept_id)
dept_ids = [dept.id for dept in dept_tree] # ['D001', 'D001-01', 'D001-02']
# 渲染表达式
variables = {
"dept_ids": ", ".join([f"'{d}'" for d in dept_ids])
}
# 结果:IN ('D001', 'D001-01', 'D001-02')
需求:客服人员可以查询用户信息,但敏感字段需要脱敏
策略定义:
# 方案 1:在 SQL 层面脱敏
mask_filter = RowFilter(
condition="phone_mask",
db_regex="public",
table_regex="users|customers",
field_regex="phone|mobile",
operator_expression="", # 不过滤行,而是修改字段
variables=[]
)
# 方案 2:在结果层面脱敏
def apply_masking(result: pd.DataFrame, meta: MetaAdmin) -> pd.DataFrame:
for field in meta.all_fields():
if field.identifiable_type == IdentifiableType.phone:
result[field.name] = result[field.name].apply(mask_phone)
return result
class PolicyCache:
_cache: dict[str, dict] = {}
@classmethod
def get_conditions(cls, policy_id: str, variables: dict) -> dict | None:
cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
return cls._cache.get(cache_key)
@classmethod
def set_conditions(cls, policy_id: str, variables: dict, conditions: dict):
cache_key = f"{policy_id}:{hash(frozenset(variables.items()))}"
cls._cache[cache_key] = conditions
# 确保过滤字段有索引
CREATE INDEX idx_orders_region ON orders(region);
CREATE INDEX idx_orders_tenant_id ON orders(tenant_id);
CREATE INDEX idx_orders_created_at ON orders(created_at);
# 复合索引
CREATE INDEX idx_orders_tenant_region ON orders(tenant_id, region);
# 优化前:多个 AND 条件
WHERE tenant_id = 'T001'
AND region = '北京'
AND created_at >= '2024-01-01'
# 优化后:使用复合索引
# 索引:(tenant_id, region, created_at)
# 查询计划:使用索引扫描,而不是全表扫描
最小权限原则:
# ✅ 默认拒绝,显式授权
default_filter = RowFilter(
condition="default_deny",
db_regex=".*",
table_regex=".*",
field_regex="tenant_id",
operator_expression="= '{{ tenant_id }}'",
variables=["tenant_id"]
)
# ❌ 默认允许,显式拒绝
# 容易遗漏,不安全
分层策略:
# 第一层:租户隔离(所有用户)
tenant_filter = RowFilter(...)
# 第二层:部门隔离(非管理员)
dept_filter = RowFilter(...)
# 第三层:地区隔离(销售人员)
region_filter = RowFilter(...)
# ✅ 从用户会话获取变量
variables = {
"tenant_id": session.get("tenant_id"),
"user_id": session.get("user_id"),
"dept_id": session.get("dept_id"),
"allowed_regions": session.get("allowed_regions"),
}
# ❌ 从用户输入获取变量
# 容易被篡改,不安全
variables = {
"tenant_id": request.args.get("tenant_id") # 危险!
}
def apply_policy(sql: str, policy: RowFilter, variables: dict) -> str:
# 记录策略应用
log.info(f"Applying policy {policy.condition}")
log.info(f"Original SQL: {sql}")
log.info(f"Variables: {variables}")
# 应用策略
injected_sql = inject_row_filters(sql, policy, variables)
# 记录结果
log.info(f"Injected SQL: {injected_sql}")
# 审计日志
audit_log.record({
"user_id": current_user.id,
"policy": policy.condition,
"original_sql": sql,
"injected_sql": injected_sql,
"variables": variables,
"timestamp": datetime.now()
})
return injected_sql
AskTable 的 Policy 系统通过行级安全控制,实现了细粒度的数据权限管理:
行级安全是企业级数据分析平台的必备能力,通过 Policy 系统,AskTable 在保证数据安全的同时,提供了灵活的权限管理能力。