Django Rbac 权限设计

相关概念

  • ACL

ACL 是 Access Control List 的缩写,称为访问控制列表,包含了对一个对象或一条记录可进行何种操作的权限定义。

例如一个文件对象的 ACL 为 { “Alice”: { “read”: true, “write”: true }, “Bob”: { “read”: true } },

这代表 Alice 对该文件既能读又能写,而 Bob 只能读取。

  • RBAC

RBAC基于角色的权限访问控制(Role-Based Access Control)不同于赋予使用者权限,而是将权限赋予角色。

RBAC模型中「权限」只和「角色」对应,而用户也和「角色」对应,为用户赋予角色,然后管理角色的权限,完成了权限与用户的解耦。

  • RBAC0/RBAC1/RBAC2/RBAC3

RBAC0主要特点是:

用户和角色之间是多对一还是多对多的关系。

RBAC1主要特点是:

角色可以继承,形成树状。

RBAC2主要特定是:

角色可以互斥。(出纳和会计) 基数约束。(ceo) 先决条件。(逐层升级) 运行时互斥。(运行时只允许一个角色,水的三态)

RBAC3,统一rabac1和rbac2。

  • 数据权限
1
2
object (row) level permissions 
model (table) level permissions
  • 权限的直观表现

操作权限: web系统页面的菜单和按钮 数据权限: web系统中对数据记录操作

django的默认权限

Django 带有一个简单的权限系统。它提供了为指定的用户和用户组分配权限的方法。

这里的group和角色实际上为一个概念

User 对象有两个多对多字段:groups 和 user_permissions。 User 对象可以像访问其他 Django model : 一样访问他们的相关对象。

1
2
3
4
5
6
7
8
myuser.groups.set([group_list])
myuser.groups.add(group, group, ...)
myuser.groups.remove(group, group, ...)
myuser.groups.clear()
myuser.user_permissions.set([permission_list])
myuser.user_permissions.add(permission, permission, ...)
myuser.user_permissions.remove(permission, permission, ...)
myuser.user_permissions.clear()

注意这里:可以直接对用户进行授权。

假设你有一个名为 foo 应用程序和一个名为 Bar 的模型,要测试基础权限,你应该使用:

1
2
3
4
添加:user.has_perm('foo.add_bar')
修改:user.has_perm('foo.change_bar')
删除:user.has_perm('foo.delete_bar')
查看:user.has_perm('foo.view_bar')

可以扩展/屏蔽默认的权限:

1
2
3
4
class Person(models.Model):
    class Meta:
        default_permissions = ()
        permissions = [('can_eat_pizzas', 'Can eat pizzas')]

default_permissions = ()屏蔽了Person默认的add_personchange_persondelete_personview_person。而permissions = [('can_eat_pizzas', 'Can eat pizzas')]Person增加了can_eat_pizzas权限。

系统需求分析

  1. 角色有编码,可以便捷的在编程过程中使用, 比如:
1
2
if user.role.has("manager") :
    dosomething()
  1. 权限可以使用菜单组织成二级目录,比如:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
* 权限管理
    - 用户管理
        * 增加
        * 编辑
        * 删除
        * 搜索
    - 角色管理
    - 权限管理
* 论坛管理
    - 版面管理
        * 新增版面
        * 修改版面
        * 查看版面
        * 关闭
    - 文章管理
        * ...
  1. 权限可以配合RESTFul规范进行远程拦截(没有model情况下)
1
2
GET /articles/
DELETE /articles/1/

模型实现

操作权限部分

  1. model部分如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Permission(models.Model):
    """约定一级代表目录,二级代表页面,三级代表按钮"""
    name = models.CharField(verbose_name='名称', max_length=32, blank=True, null=True)
    code = models.CharField(verbose_name='编码', max_length=32, blank=True, null=True)
    higher = models.ForeignKey('self', verbose_name='上级', on_delete=models.CASCADE)
    url = models.CharField(verbose_name='路径', max_length=32, blank=True, null=True)
    action = models.CharField(verbose_name='方法', max_length=32, blank=True, null=True)
    ...


class Role(models.Model):
    name = models.CharField(verbose_name='名称', max_length=32, blank=True, null=True)
    code = models.CharField(verbose_name='编码', max_length=32, blank=True, null=True)
    permissions = models.ManyToManyField(
        Permission,
        verbose_name='permissions',
        blank=True,
    )
    ...


class User(AbstractUser):
    roles = models.ManyToManyField(
        Role,
        verbose_name='roles',
        blank=True,
    )
    ...
  1. 操作权限拦截如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class RBACMiddleware:

    def __call__(self, request):
        request_url = request.path_info
        request_user = request.user
        for url in settings.SAFE_URL:
            if re.match(url, request_url):
                pass
        # 读取数据库/缓存
        if has_permission_url(request_user, request_url):
            pass
        else:
            return render(request, 'page403.html')

数据权限部分

数据权限和业务结合紧密,一般不需要做统一的数据权限拦截,各个业务自由使用。 不过可以将数据权限抽象成下面几种类型,规范使用,实现可配置化。

1
2
3
4
5
6
7
* 行限制(根据某列的条件控制可影响的行数)
    - 所有者 is_owner_required 只能够删除自己的数据行
    - 协作者 is_teamworker_required 可以编辑team(部门)所属的数据行
    - 受限者 is_manager_required 可以批准3天内请假
* 列限制 (控制可影响的列)
    - 电话号码保密 filter_phone
    - 薪资保密 filter_salary
  1. model 部分
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Checker(models.Model):
    CHECKER_CLAZZ = (
        (1, 函数),
        (2, 表达式),
    )

    clazz = models.CharField(verbose_name='类别', choices=CHECKER_TYPE, max_length=15, blank=True, null=True)
    name = models.CharField(verbose_name='名称', max_length=32, blank=True, null=True)
    code = models.CharField(verbose_name='编码', max_length=32, blank=True, null=True)
    value = models.CharField(verbose_name='数值', max_length=32, blank=True, null=True)
    ...
  1. 实现部分

简单的拦截:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def is_owner_required(model, pk_name='pk'):
    def decorator(view_func):
        def wrap(request, *args, **kwargs):
            pk = kwargs.get(pk_name, None)
            o=model.objects.get(pk=pk) #raises ObjectDoesNotExist
            if o.is_owner(request.user):
                return view_func(request, *args, **kwargs)
            else:
                raise PermissionDenied
        return wraps(view_func)(wrap)
    return decorator

def is_teamworker_required(model, pk_name='pk'):
    def decorator(view_func):
        def wrap(request, *args, **kwargs):
            pk = kwargs.get(pk_name, None)
            o=model.objects.get(pk=pk) #raises ObjectDoesNotExist
            if o.is_teamworker(request.user):
                return view_func(request, *args, **kwargs)
            else:
                raise PermissionDenied
        return wraps(view_func)(wrap)
    return decorator

@is_owner_required(Comment)
def delete_comment(request, pk):
    pass
    
@is_teamworker_required(Comment)
def edit_comment(request, pk):
    pass

复杂点的拦截:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def is_manager_required(code, pk_name='pk'):
    def decorator(view_func):
        def wrap(request, *args, **kwargs):
            pk = kwargs.get(pk_name, None)
            o=model.objects.get(pk=pk) #raises ObjectDoesNotExist
            c=checkModel.objects.get(code=code)
            # check request user role limit value
            if c.check(request.user):
                return view_func(request, *args, **kwargs)
            else:
                raise PermissionDenied
        return wraps(view_func)(wrap)
    return decorator
    
@is_manager_required(code="manager_limit_3")
def audit_holiday(request, pk):
    pass

完全动态的拦截:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def common_required(code, pk_name='pk'):
    def decorator(view_func):
        def wrap(request, *args, **kwargs):
            pk = kwargs.get(pk_name, None)
            o=model.objects.get(pk=pk) #raises ObjectDoesNotExist
            c=checkModel.objects.get(code=code)
            # 动态获取模块
            module = __import__(c.value.module_name, fromlist=[c.value.module_class])
            # 动态获取验证函数
            checker = getattr(module, c.value.name)
            # 执行验证函数
            if checker.check(request.user, c.value.number):
                return view_func(request, *args, **kwargs)
            else:
                raise PermissionDenied
        return wraps(view_func)(wrap)
    return decorator
    
@common_required(code="check_user_level")
def dosomething(request, pk):
    pass

最后,欢迎加下面的微信和我互动交流,一起进阶: wx