cube-studio/myapp/security.py
2022-11-01 19:23:27 +08:00

864 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask_login import current_user
import logging
import jwt
from flask_babel import lazy_gettext
from flask import current_app
from flask_appbuilder.security.sqla import models as ab_models
from flask_appbuilder.security.sqla.manager import SecurityManager
from flask_babel import lazy_gettext as _
from flask_appbuilder.security.views import (
PermissionModelView,
PermissionViewModelView,
RoleModelView,
UserModelView
)
from flask_appbuilder.security.sqla.models import assoc_user_role
from flask_appbuilder.security.decorators import has_access
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.widgets import ListWidget
from sqlalchemy import or_
from flask_appbuilder.security.views import expose
from flask import g, flash, request
from flask_appbuilder.security.sqla.models import assoc_permissionview_role
from sqlalchemy import select
from flask_appbuilder.const import (
AUTH_DB,
AUTH_LDAP,
AUTH_OAUTH,
AUTH_OID,
AUTH_REMOTE_USER,
LOGMSG_WAR_SEC_LOGIN_FAILED
)
# user list page template
class MyappSecurityListWidget(ListWidget):
"""
Redeclaring to avoid circular imports
"""
template = "myapp/fab_overrides/list.html"
# role list page template
class MyappRoleListWidget(ListWidget):
"""
Role model view from FAB already uses a custom list widget override
So we override the override
"""
template = "myapp/fab_overrides/list_role.html"
def __init__(self, **kwargs):
kwargs["appbuilder"] = current_app.appbuilder
super().__init__(**kwargs)
# customize list,add,edit page
UserModelView.list_columns= ["username", "active", "roles"]
UserModelView.edit_columns= ["first_name", "last_name", "username", "active", "email"]
UserModelView.add_columns= ["first_name", "last_name", "username", "email", "active", "roles"]
UserModelView.list_widget = MyappSecurityListWidget
RoleModelView.list_widget = MyappRoleListWidget
PermissionViewModelView.list_widget = MyappSecurityListWidget
PermissionModelView.list_widget = MyappSecurityListWidget
# expand user
from flask_appbuilder.security.sqla.models import User,Role
from sqlalchemy import Column, String
class MyUser(User):
__tablename__ = 'ab_user'
org = Column(String(200)) # Organization
def get_full_name(self):
return self.username
def __repr__(self):
return self.username
def is_admin(self):
user_roles = [role.name.lower() for role in list(self.roles)]
if "admin" in user_roles:
return True
return False
@property
def secret(self):
if self.changed_on:
pass
# help(self.changed_on)
# timestamp = int(func.date_format(self.changed_on))
timestamp = int(self.changed_on.timestamp())
payload = {
"iss": self.username
# "iat": timestamp, # Issue period
# "nbf": timestamp, # Effective Date
# "exp": timestamp + 60 * 60 * 24 * 30 * 12, # Valid for 12 months
}
global_password = 'myapp'
encoded_jwt = jwt.encode(payload, global_password, algorithm='HS256')
encoded_jwt = encoded_jwt.decode('utf-8')
return encoded_jwt
return ''
# customize role view
class MyRoleModelView(RoleModelView):
datamodel = SQLAInterface(Role)
order_columns = ["id"]
route_base = "/roles"
list_columns = ["name", "permissions"]
class MyUserRemoteUserModelView(UserModelView):
list_columns = ["username", "active", "roles", ]
edit_columns = ["first_name", "last_name", "username", "active", "email", "roles",'org' ]
add_columns = ["first_name", "last_name", "username", "email", "active", "roles",'org' ]
show_columns = ["username", "active", "roles", "login_count"]
list_widget = MyappSecurityListWidget
label_columns = {
"get_full_name": lazy_gettext("Full Name"),
"first_name": lazy_gettext("First Name"),
"last_name": lazy_gettext("Last Name"),
"username": lazy_gettext("User Name"),
"password": lazy_gettext("Password"),
"active": lazy_gettext("Is Active?"),
"email": lazy_gettext("Email"),
"roles": lazy_gettext("Role"),
"last_login": lazy_gettext("Last login"),
"login_count": lazy_gettext("Login count"),
"fail_login_count": lazy_gettext("Failed login count"),
"created_on": lazy_gettext("Created on"),
"created_by": lazy_gettext("Created by"),
"changed_on": lazy_gettext("Changed on"),
"changed_by": lazy_gettext("Changed by"),
"secret": lazy_gettext("Authorization"),
}
show_fieldsets = [
(
lazy_gettext("User info"),
{"fields": ["username", "active", "roles", "login_count",'secret']},
),
(
lazy_gettext("Personal Info"),
{"fields": ["first_name", "last_name", "email",'org'], "expanded": True},
),
(
lazy_gettext("Audit Info"),
{
"fields": [
"last_login",
"fail_login_count",
"created_on",
"created_by",
"changed_on",
"changed_by",
],
"expanded": False,
},
),
]
user_show_fieldsets = [
(
lazy_gettext("User info"),
{"fields": ["username", "active", "roles", "login_count",'secret']},
),
(
lazy_gettext("Personal Info"),
{"fields": ["first_name", "last_name", "email"], "expanded": True},
),
]
@expose("/userinfo/")
@has_access
def userinfo(self):
item = self.datamodel.get(g.user.id, self._base_filters)
widgets = self._get_show_widget(
g.user.id, item, show_fieldsets=self.user_show_fieldsets
)
self.update_redirect()
return self.render_template(
self.show_template,
title=self.user_info_title,
widgets=widgets,
appbuilder=self.appbuilder,
)
from flask_appbuilder.security.views import SimpleFormView
from flask_appbuilder._compat import as_unicode
from flask_babel import lazy_gettext
from wtforms import StringField
from wtforms.validators import DataRequired
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_appbuilder.forms import DynamicForm
class UserInfoEditView(SimpleFormView):
class UserInfoEdit(DynamicForm):
first_name = StringField(
lazy_gettext("First Name"),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
description=lazy_gettext("Write the user first name or names"),
)
last_name = StringField(
lazy_gettext("Last Name"),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
description=lazy_gettext("Write the user last name"),
)
username = StringField(
lazy_gettext("User Name"),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
description=lazy_gettext("Write the Username"),
)
password = StringField(
lazy_gettext("Password"),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
description=lazy_gettext("Password"),
)
email = StringField(
lazy_gettext("Email"),
validators=[DataRequired()],
widget=BS3TextFieldWidget(),
description=lazy_gettext("Write the Email"),
)
org = StringField(
lazy_gettext("Org"),
widget=BS3TextFieldWidget(),
description=lazy_gettext("organization name"),
)
form = UserInfoEdit
form_title = lazy_gettext("Edit User Information")
redirect_url = "/"
message = lazy_gettext("User information changed")
def form_get(self, form):
item = self.appbuilder.sm.get_user_by_id(g.user.id)
# fills the form generic solution
for key, value in form.data.items():
if key == "csrf_token":
continue
form_field = getattr(form, key)
form_field.data = getattr(item, key)
def form_post(self, form):
form = self.form.refresh(request.form)
item = self.appbuilder.sm.get_user_by_id(g.user.id)
form.populate_obj(item)
self.appbuilder.sm.update_user(item)
flash(as_unicode(self.message), "info")
from myapp.project import MyCustomRemoteUserView
from myapp.project import Myauthdbview
# myapp自带的角色和角色权限自定义了各种权限
# 基础类fab-Security-Manager中 def load_user(self, pk): 是用来认证用户的
# before_request是user赋值给g.user
# @pysnooper.snoop()
class MyappSecurityManager(SecurityManager):
user_model = MyUser
rolemodelview = MyRoleModelView #
# Remote Authentication
userremoteusermodelview = MyUserRemoteUserModelView
authremoteuserview = MyCustomRemoteUserView
# Account password authentication
userdbmodelview = MyUserRemoteUserModelView
authdbview = Myauthdbview
# userinfo edit view
userinfoeditview = UserInfoEditView
# 构建启动前工作,认证
@staticmethod
def before_request():
g.user = current_user
def __init__(self, appbuilder):
super(MyappSecurityManager, self).__init__(appbuilder)
# 添加从header中进行认证的方式
self.lm.header_loader(self.load_user_from_header)
# 使用header 认证通过rtx名获取用户
# @pysnooper.snoop()
def load_user_from_header(self, authorization_value):
# token=None
# if 'token' in request.headers:
# token = request.headers['token']
if authorization_value:
# rtx 免认证
if len(authorization_value) < 20:
username = authorization_value
if username:
user = self.find_user(username)
g.user = user
return user
else: # token 认证
encoded_jwt = authorization_value.encode('utf-8')
payload = jwt.decode(encoded_jwt, 'myapp', algorithms=['HS256'])
# if payload['iat'] > time.time():
# return
# elif payload['exp'] < time.time():
# return
# else:
user = self.find_user(payload['iss'])
g.user = user
return user
# 自定义登录用户
def load_user(self, pk):
user = self.get_user_by_id(int(pk))
# set cookie
return user
# 注册security菜单栏下的子菜单和链接
# @pysnooper.snoop()
def register_views(self):
if not self.appbuilder.app.config.get('FAB_ADD_SECURITY_VIEWS', True):
return
# Security APIs
self.appbuilder.add_api(self.security_api)
if self.auth_user_registration:
if self.auth_type == AUTH_DB:
self.registeruser_view = self.registeruserdbview()
elif self.auth_type == AUTH_OID:
self.registeruser_view = self.registeruseroidview()
elif self.auth_type == AUTH_OAUTH:
self.registeruser_view = self.registeruseroauthview()
if self.registeruser_view:
self.appbuilder.add_view_no_menu(self.registeruser_view)
self.appbuilder.add_view_no_menu(self.resetpasswordview())
self.appbuilder.add_view_no_menu(self.resetmypasswordview())
self.appbuilder.add_view_no_menu(self.userinfoeditview())
if self.auth_type == AUTH_DB:
self.user_view = self.userdbmodelview
self.auth_view = self.authdbview()
elif self.auth_type == AUTH_LDAP:
self.user_view = self.userldapmodelview
self.auth_view = self.authldapview()
elif self.auth_type == AUTH_OAUTH:
self.user_view = self.useroauthmodelview
self.auth_view = self.authoauthview()
elif self.auth_type == AUTH_REMOTE_USER:
self.user_view = self.userremoteusermodelview
self.auth_view = self.authremoteuserview()
else:
self.user_view = self.useroidmodelview
self.auth_view = self.authoidview()
if self.auth_user_registration:
pass
self.registeruser_view = self.registeruseroidview()
self.appbuilder.add_view_no_menu(self.registeruser_view)
self.appbuilder.add_view_no_menu(self.auth_view)
self.user_view = self.appbuilder.add_view(
self.user_view,
"List Users",
icon="fa-user",
href="/users/list/?_flt_2_username=",
label=_("List Users"),
category="Security",
category_icon="fa-cogs",
category_label=_("Security"),
)
role_view = self.appbuilder.add_view(
self.rolemodelview,
"List Roles",
icon="fa-group",
href="/roles/list/?_flt_2_name=",
label=_("List Roles"),
category="Security",
category_icon="fa-cogs",
)
role_view.related_views = [self.user_view.__class__]
if self.userstatschartview:
self.appbuilder.add_view(
self.userstatschartview,
"User's Statistics",
icon="fa-bar-chart-o",
label=_("User's Statistics"),
category="Security",
)
if self.auth_user_registration:
self.appbuilder.add_view(
self.registerusermodelview,
"User's Statistics",
icon="fa-user-plus",
label=_("User Registrations"),
category="Security",
)
self.appbuilder.menu.add_separator("Security")
self.appbuilder.add_view(
self.permissionmodelview,
"Base Permissions",
icon="fa-lock",
label=_("Base Permissions"),
category="Security",
)
self.appbuilder.add_view(
self.viewmenumodelview,
"Views/Menus",
icon="fa-list-alt",
label=_("Views/Menus"),
category="Security",
)
self.appbuilder.add_view(
self.permissionviewmodelview,
"Permission on Views/Menus",
icon="fa-link",
label=_("Permission on Views/Menus"),
category="Security",
)
# @pysnooper.snoop()
def add_org_user(self,username,first_name,last_name,org,email,roles,password="",hashed_password=""):
"""
Generic function to create user
"""
try:
user = self.user_model()
user.first_name = first_name
user.org = org
user.last_name = last_name
user.username = username
user.email = email
user.active = True
user.roles+=roles # 添加默认注册角色
user.password=password
# if hashed_password:
# user.password = hashed_password
# else:
# user.password = generate_password_hash(password)
self.get_session.add(user)
self.get_session.commit()
try:
from myapp.models.model_team import Project_User, Project
public_project = self.get_session.query(Project).filter(Project.name == "public").filter(Project.type == "org").first()
if public_project:
project_user = Project_User()
project_user.project = public_project
project_user.role = 'dev'
project_user.user_id = user.id
self.get_session.add(project_user)
self.get_session.commit()
except Exception:
self.get_session.rollback()
return user
except Exception:
self.get_session.rollback()
return False
# 添加public项目组
# 添加注册远程用户
# @pysnooper.snoop()
def auth_user_remote_org_user(self, username,org_name='',password='',email='',first_name='',last_name=''):
if not username:
return None
# 查找用户
user = self.find_user(username=username)
# 添加以组织同名的角色,同时添加上级角色
# 注册rtx同名角色
rtx_role = self.add_role(username)
# 如果用户不存在就注册用户
if user is None:
user = self.add_org_user(
username=username,
first_name=first_name if first_name else username,
last_name=last_name if last_name else username,
password=password,
org=org_name, # 添加组织架构
email=username + "@tencent.com" if not email else email,
roles=[self.find_role(self.auth_user_registration_role),rtx_role] if self.find_role(self.auth_user_registration_role) else [rtx_role,] # org_role 添加gamma默认角色, 组织架构角色先不自动添加
)
elif not user.is_active: # 如果用户未激活不允许接入
print(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username))
return None
if user:
user.org = org_name if org_name else user.org
user.email = email if email else user.email
user.first_name = first_name if first_name else user.first_name
user.last_name = last_name if last_name else user.last_name
gamma_role = self.find_role(self.auth_user_registration_role)
if gamma_role and gamma_role not in user.roles:
user.roles.append(gamma_role)
if rtx_role and rtx_role not in user.roles:
user.roles.append(rtx_role)
# 更新用户信息
if org_name:
user.org = org_name # 更新组织架构字段
org_role = self.add_role(org_name)
if org_role not in user.roles:
user.roles.append(org_role)
self.update_user_auth_stat(user)
return user
READ_ONLY_MODEL_VIEWS = {
'link','Minio','Kubernetes Dashboard','Granfana','Wiki'
}
USER_MODEL_VIEWS = {
"UserDBModelView",
"UserLDAPModelView",
"UserOAuthModelView",
"UserOIDModelView",
"UserRemoteUserModelView",
}
# 只有admin才能看到的menu
ADMIN_ONLY_VIEW_MENUS = {
"ResetPasswordView",
"RoleModelView",
"List Users",
"List Roles",
"UserStatsChartView",
"Base Permissions",
"Permission on Views/Menus",
"Action Log",
"Views/Menus",
"ViewMenuModelView",
"User's Statistics",
"Security",
} | USER_MODEL_VIEWS
ALPHA_ONLY_VIEW_MENUS = {}
# 只有admin才有的权限
ADMIN_ONLY_PERMISSIONS = {
"can_override_role_permissions",
"can_override_role_permissions",
# "can_approve", # db owner需要授权approve 权限后才能授权
"can_update_role",
}
READ_ONLY_PERMISSION = {"can_show", "can_list",'can_add'}
ALPHA_ONLY_PERMISSIONS = {
"muldelete"
}
# 用户创建menu才有的权限
OBJECT_SPEC_PERMISSIONS = {
"can_only_access_owned_queries",
}
# 所有人都可以有的基本权限
ACCESSIBLE_PERMS = {"can_userinfo","can_request_access","can_approve"}
# 获取用户是否有在指定视图上的指定权限名
# @pysnooper.snoop()
def can_access(self, permission_name, view_name):
"""Protecting from has_access failing from missing perms/view"""
user = g.user
if user.is_anonymous:
return self.is_item_public(permission_name, view_name)
return self._has_view_access(user, permission_name, view_name)
# 获取用户具有指定权限的视图
def user_view_menu_names(self, permission_name: str):
from myapp import db
base_query = (
db.session.query(self.viewmenu_model.name)
.join(self.permissionview_model)
.join(self.permission_model)
.join(assoc_permissionview_role)
.join(self.role_model)
)
# 非匿名用户
if not g.user.is_anonymous:
# filter by user id
view_menu_names = (
base_query.join(assoc_user_role)
.join(self.user_model)
.filter(self.user_model.id == g.user.id)
.filter(self.permission_model.name == permission_name)
).all()
return set([s.name for s in view_menu_names])
# Properly treat anonymous user 匿名用户
public_role = self.get_public_role()
if public_role:
# filter by public role
view_menu_names = (
base_query.filter(self.role_model.id == public_role.id).filter(
self.permission_model.name == permission_name
)
).all()
return set([s.name for s in view_menu_names])
return set()
# 在视图上添加权限
def merge_perm(self, permission_name, view_menu_name):
logging.warning(
"This method 'merge_perm' is deprecated use add_permission_view_menu"
)
self.add_permission_view_menu(permission_name, view_menu_name)
# 判断权限是否是user自定义权限
def is_user_defined_permission(self, perm):
return perm.permission.name in self.OBJECT_SPEC_PERMISSIONS
# 初始化自定义角色,将对应的权限加到对应的角色上
# @pysnooper.snoop()
def sync_role_definitions(self):
"""Inits the Myapp application with security roles and such"""
logging.info("Syncing role definition")
# Creating default roles
self.set_role("Admin", self.is_admin_pvm)
self.set_role("Gamma", self.is_gamma_pvm)
self.set_role("granter", self.is_granter_pvm)
# commit role and view menu updates
self.get_session.commit()
self.clean_perms()
# 清理权限
def clean_perms(self):
"""FAB leaves faulty permissions that need to be cleaned up"""
logging.info("Cleaning faulty perms")
sesh = self.get_session
pvms = sesh.query(ab_models.PermissionView).filter(
or_(
ab_models.PermissionView.permission == None, # NOQA
ab_models.PermissionView.view_menu == None, # NOQA
)
)
deleted_count = pvms.delete()
sesh.commit()
if deleted_count:
logging.info("Deleted {} faulty permissions".format(deleted_count))
# 为角色添加权限pvm_check为自定义权限校验函数。这样变量权限就能通过pvm_check函数知道时候应该把权限加到角色上
def set_role(self, role_name, pvm_check):
logging.info("Syncing {} perms".format(role_name))
sesh = self.get_session
# 获取所有的pv记录
pvms = sesh.query(ab_models.PermissionView).all()
# 获取权限和视图都有值的pv
pvms = [p for p in pvms if p.permission and p.view_menu]
# 添加或者获取role
role = self.add_role(role_name)
# 检查pv是否归属于该role
role_pvms = [p for p in pvms if pvm_check(p)]
role.permissions = role_pvms
# 添加pv-role记录
sesh.merge(role)
sesh.commit()
# 看一个权限是否是只有admin角色该有的权限
def is_admin_only(self, pvm):
# not readonly operations on read only model views allowed only for admins
if (
pvm.view_menu.name in self.READ_ONLY_MODEL_VIEWS
and pvm.permission.name not in self.READ_ONLY_PERMISSION
):
return True
return (
pvm.view_menu.name in self.ADMIN_ONLY_VIEW_MENUS
or pvm.permission.name in self.ADMIN_ONLY_PERMISSIONS
)
# 校验权限是否是默认所有人可接受的
def is_accessible_to_all(self, pvm):
return pvm.permission.name in self.ACCESSIBLE_PERMS
# 看一个权限是否是admin角色该有的权限
def is_admin_pvm(self, pvm):
return not self.is_user_defined_permission(pvm)
# 看一个权限是否是gamma角色该有的权限
def is_gamma_pvm(self, pvm):
return not (
self.is_user_defined_permission(pvm)
or self.is_admin_only(pvm)
) or self.is_accessible_to_all(pvm)
def is_granter_pvm(self, pvm):
return pvm.permission.name in {"can_override_role_permissions", "can_approve"}
# 创建视图,创建权限,创建视图-权限绑定记录。
def set_perm(self, mapper, connection, target,permission_name): # noqa
#
# connection is sql
# target is tables/db model
if target.perm != target.get_perm():
link_table = target.__table__
connection.execute(
link_table.update()
.where(link_table.c.id == target.id)
.values(perm=target.get_perm())
)
# add to view menu if not already exists
permission_name = permission_name
view_menu_name = target.get_perm()
permission = self.find_permission(permission_name)
view_menu = self.find_view_menu(view_menu_name)
pv = None
# 如果权限不存存在就创建
if not permission:
permission_table = (
self.permission_model.__table__ # pylint: disable=no-member
)
connection.execute(permission_table.insert().values(name=permission_name))
permission = self.find_permission(permission_name)
# 如果视图不存在就创建
if not view_menu:
view_menu_table = self.viewmenu_model.__table__ # pylint: disable=no-member
connection.execute(view_menu_table.insert().values(name=view_menu_name))
view_menu = self.find_view_menu(view_menu_name)
# 获取是否存在 视图-权限绑定 记录
if permission and view_menu:
pv = (
self.get_session.query(self.permissionview_model)
.filter_by(permission=permission, view_menu=view_menu)
.first()
)
# 如果没有视图-权限绑定 记录,就创建
if not pv and permission and view_menu:
permission_view_table = (
self.permissionview_model.__table__ # pylint: disable=no-member
)
connection.execute(
permission_view_table.insert().values(
permission_id=permission.id, view_menu_id=view_menu.id
)
)
# 重新获取权限视图绑定记录
pv = (
self.get_session.query(self.permissionview_model)
.filter_by(permission=permission, view_menu=view_menu)
.first()
)
return pv
# 根据权限视图添加到相关pv-role
@classmethod
def add_pv_role(self,permission_name,view_menu_name,session):
permission = session.query(self.permission_model).filter_by(name=permission_name).first()
view_menu = session.query(self.viewmenu_model).filter_by(name=view_menu_name).first()
# 获取是否存在 视图-权限绑定 记录
if permission and view_menu:
pv = (
session.query(self.permissionview_model)
.filter_by(permission=permission, view_menu=view_menu)
.first()
)
try:
# 为用户所属组织架构都添加该pv
if pv and g.user and g.user.org:
roles = session.query(self.role_model).all() # 获取所有角色自动在相应角色下面添加pv
if roles:
for role in roles:
if role.name in g.user.org:
# 为pvm-role表中添加记录
pv_role = session.execute(select([assoc_permissionview_role.c.id]).where(assoc_permissionview_role.c.permission_view_id==pv.id)
.where(assoc_permissionview_role.c.role_id==role.id)
.limit(1)
).fetchall()
if not pv_role:
session.execute(assoc_permissionview_role.insert().values(
permission_view_id=pv.id, role_id=role.id
)
)
except Exception as e:
logging.error(e)
@classmethod
def get_join_projects_id(self,session):
from myapp.models.model_team import Project_User
if g.user:
projects_id = session.query(Project_User.project_id).filter(Project_User.user_id == User.get_user_id()).all()
projects_id = [project_id[0] for project_id in projects_id]
return projects_id
else:
return []
@classmethod
def get_create_pipeline_ids(self,session):
from myapp.models.model_job import Pipeline
if g.user:
pipeline_ids = session.query(Pipeline.id).filter(Pipeline.created_by_fk == User.get_user_id()).all()
pipeline_ids = [pipeline_id[0] for pipeline_id in pipeline_ids]
return pipeline_ids
else:
return []