mirror of
https://github.com/tencentmusic/cube-studio.git
synced 2025-01-24 14:04:01 +08:00
1728 lines
64 KiB
Python
1728 lines
64 KiB
Python
import functools
|
||
import json
|
||
import logging
|
||
import re
|
||
import traceback
|
||
import urllib.parse
|
||
import os
|
||
from inspect import isfunction
|
||
from sqlalchemy import create_engine
|
||
from flask_appbuilder.actions import action
|
||
from apispec import yaml_utils
|
||
from flask_babel import gettext as __
|
||
from flask_babel import lazy_gettext as _
|
||
from flask_appbuilder.actions import ActionItem
|
||
from flask import Blueprint, current_app, jsonify, make_response, request
|
||
from flask import flash
|
||
from flask import Flask, current_app, send_from_directory, make_response,send_file
|
||
from flask.globals import session
|
||
from flask_babel import lazy_gettext as _
|
||
import jsonschema
|
||
from marshmallow import ValidationError
|
||
from marshmallow_sqlalchemy.fields import Related, RelatedList
|
||
import prison
|
||
from sqlalchemy.exc import IntegrityError
|
||
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
||
from sqlalchemy.orm.properties import ColumnProperty
|
||
from sqlalchemy.orm.relationships import RelationshipProperty
|
||
from werkzeug.exceptions import BadRequest
|
||
from flask import render_template,redirect
|
||
import yaml
|
||
from marshmallow import validate
|
||
from wtforms import validators
|
||
from flask_appbuilder.api.convert import Model2SchemaConverter
|
||
from flask_appbuilder.api.schemas import get_info_schema, get_item_schema, get_list_schema
|
||
from flask_appbuilder._compat import as_unicode
|
||
from flask_appbuilder.const import (
|
||
API_ADD_COLUMNS_RES_KEY,
|
||
API_ADD_COLUMNS_RIS_KEY,
|
||
API_ADD_TITLE_RES_KEY,
|
||
API_ADD_TITLE_RIS_KEY,
|
||
API_DESCRIPTION_COLUMNS_RES_KEY,
|
||
API_DESCRIPTION_COLUMNS_RIS_KEY,
|
||
API_EDIT_COLUMNS_RES_KEY,
|
||
API_EDIT_COLUMNS_RIS_KEY,
|
||
API_EDIT_TITLE_RES_KEY,
|
||
API_EDIT_TITLE_RIS_KEY,
|
||
API_FILTERS_RES_KEY,
|
||
API_FILTERS_RIS_KEY,
|
||
API_LABEL_COLUMNS_RES_KEY,
|
||
API_LABEL_COLUMNS_RIS_KEY,
|
||
API_LIST_COLUMNS_RES_KEY,
|
||
API_LIST_COLUMNS_RIS_KEY,
|
||
API_LIST_TITLE_RES_KEY,
|
||
API_LIST_TITLE_RIS_KEY,
|
||
API_ORDER_COLUMN_RIS_KEY,
|
||
API_ORDER_COLUMNS_RES_KEY,
|
||
API_ORDER_COLUMNS_RIS_KEY,
|
||
API_ORDER_DIRECTION_RIS_KEY,
|
||
API_PAGE_INDEX_RIS_KEY,
|
||
API_PAGE_SIZE_RIS_KEY,
|
||
API_PERMISSIONS_RES_KEY,
|
||
API_PERMISSIONS_RIS_KEY,
|
||
API_RESULT_RES_KEY,
|
||
API_SELECT_COLUMNS_RIS_KEY,
|
||
API_SHOW_COLUMNS_RES_KEY,
|
||
API_SHOW_COLUMNS_RIS_KEY,
|
||
API_SHOW_TITLE_RES_KEY,
|
||
API_SHOW_TITLE_RIS_KEY,
|
||
API_URI_RIS_KEY,
|
||
PERMISSION_PREFIX,
|
||
)
|
||
from flask import (
|
||
abort,
|
||
g
|
||
)
|
||
from flask_appbuilder.exceptions import FABException, InvalidOrderByColumnFABException
|
||
from flask_appbuilder.security.decorators import permission_name, protect,has_access
|
||
from flask_appbuilder.api import BaseModelApi,BaseApi,ModelRestApi
|
||
from sqlalchemy.sql import sqltypes
|
||
from myapp import app, appbuilder,db,event_logger
|
||
conf = app.config
|
||
|
||
log = logging.getLogger(__name__)
|
||
API_COLUMNS_INFO_RIS_KEY = 'columns_info'
|
||
API_ADD_FIELDSETS_RIS_KEY = 'add_fieldsets'
|
||
API_EDIT_FIELDSETS_RIS_KEY = 'edit_fieldsets'
|
||
API_SHOW_FIELDSETS_RIS_KEY = 'show_fieldsets'
|
||
API_HELP_URL_RIS_KEY = 'help_url'
|
||
API_ACTION_RIS_KEY='action'
|
||
API_ROUTE_RIS_KEY ='route_base'
|
||
|
||
API_PERMISSIONS_RIS_KEY="permissions"
|
||
API_USER_PERMISSIONS_RIS_KEY="user_permissions"
|
||
API_RELATED_RIS_KEY="related"
|
||
API_COLS_WIDTH_RIS_KEY='cols_width'
|
||
API_IMPORT_DATA_RIS_KEY = 'import_data'
|
||
|
||
def get_error_msg():
|
||
if current_app.config.get("FAB_API_SHOW_STACKTRACE"):
|
||
return traceback.format_exc()
|
||
return "Fatal error"
|
||
|
||
|
||
def safe(f):
|
||
|
||
def wraps(self, *args, **kwargs):
|
||
try:
|
||
return f(self, *args, **kwargs)
|
||
except BadRequest as e:
|
||
return self.response_error(400,message=str(e))
|
||
except Exception as e:
|
||
logging.exception(e)
|
||
return self.response_error(500,message=get_error_msg())
|
||
|
||
return functools.update_wrapper(wraps, f)
|
||
|
||
|
||
def rison(schema=None):
|
||
"""
|
||
Use this decorator to parse URI *Rison* arguments to
|
||
a python data structure, your method gets the data
|
||
structure on kwargs['rison']. Response is HTTP 400
|
||
if *Rison* is not correct::
|
||
|
||
class ExampleApi(BaseApi):
|
||
@expose('/risonjson')
|
||
@rison()
|
||
def rison_json(self, **kwargs):
|
||
return self.response(200, result=kwargs['rison'])
|
||
|
||
You can additionally pass a JSON schema to
|
||
validate Rison arguments::
|
||
|
||
schema = {
|
||
"type": "object",
|
||
"properties": {
|
||
"arg1": {
|
||
"type": "integer"
|
||
}
|
||
}
|
||
}
|
||
|
||
class ExampleApi(BaseApi):
|
||
@expose('/risonjson')
|
||
@rison(schema)
|
||
def rison_json(self, **kwargs):
|
||
return self.response(200, result=kwargs['rison'])
|
||
"""
|
||
|
||
def _rison(f):
|
||
def wraps(self, *args, **kwargs):
|
||
value = request.args.get(API_URI_RIS_KEY, None)
|
||
kwargs["rison"] = dict()
|
||
if value:
|
||
try:
|
||
kwargs["rison"] = prison.loads(value)
|
||
except prison.decoder.ParserException:
|
||
if current_app.config.get("FAB_API_ALLOW_JSON_QS", True):
|
||
# Rison failed try json encoded content
|
||
try:
|
||
kwargs["rison"] = json.loads(
|
||
urllib.parse.parse_qs(f"{API_URI_RIS_KEY}={value}").get(
|
||
API_URI_RIS_KEY
|
||
)[0]
|
||
)
|
||
except Exception:
|
||
return self.response_error(400,message="Not a valid rison/json argument"
|
||
)
|
||
else:
|
||
return self.response_error(400,message="Not a valid rison argument")
|
||
if schema:
|
||
try:
|
||
jsonschema.validate(instance=kwargs["rison"], schema=schema)
|
||
except jsonschema.ValidationError as e:
|
||
return self.response_error(400,message=f"Not a valid rison schema {e}")
|
||
return f(self, *args, **kwargs)
|
||
|
||
return functools.update_wrapper(wraps, f)
|
||
|
||
return _rison
|
||
|
||
|
||
def expose(url="/", methods=("GET",)):
|
||
"""
|
||
Use this decorator to expose API endpoints on your API classes.
|
||
|
||
:param url:
|
||
Relative URL for the endpoint
|
||
:param methods:
|
||
Allowed HTTP methods. By default only GET is allowed.
|
||
"""
|
||
|
||
def wrap(f):
|
||
if not hasattr(f, "_urls"):
|
||
f._urls = []
|
||
f._urls.append((url, methods))
|
||
return f
|
||
|
||
return wrap
|
||
|
||
|
||
# 在响应体重添加字段和数据
|
||
def merge_response_func(func, key):
|
||
"""
|
||
Use this decorator to set a new merging
|
||
response function to HTTP endpoints
|
||
|
||
candidate function must have the following signature
|
||
and be childs of BaseApi:
|
||
```
|
||
def merge_some_function(self, response, rison_args):
|
||
```
|
||
|
||
:param func: Name of the merge function where the key is allowed
|
||
:param key: The key name for rison selection
|
||
:return: None
|
||
"""
|
||
|
||
def wrap(f):
|
||
if not hasattr(f, "_response_key_func_mappings"):
|
||
f._response_key_func_mappings = dict()
|
||
f._response_key_func_mappings[key] = func
|
||
return f
|
||
|
||
return wrap
|
||
|
||
|
||
def json_response(message,status,result):
|
||
return jsonify(
|
||
{
|
||
"message":message,
|
||
"status":status,
|
||
"result":result
|
||
}
|
||
)
|
||
|
||
|
||
|
||
import pysnooper
|
||
# @pysnooper.snoop(depth=5)
|
||
# 暴露url+视图函数。视图函数会被覆盖,暴露url也会被覆盖
|
||
class MyappModelRestApi(ModelRestApi):
|
||
|
||
# 定义主键列
|
||
label_title=''
|
||
primary_key = 'id'
|
||
api_type = 'json'
|
||
allow_browser_login = True
|
||
base_filters = []
|
||
page_size = 100
|
||
src_item_object = None # 原始model对象
|
||
src_item_json={} # 原始model对象的json
|
||
check_edit_permission = None
|
||
datamodel=None
|
||
post_list=None
|
||
pre_json_load=None
|
||
edit_form_extra_fields={}
|
||
add_form_extra_fields = {}
|
||
add_fieldsets = []
|
||
edit_fieldsets=[]
|
||
show_fieldsets = []
|
||
pre_add_get=None
|
||
pre_update_get=None
|
||
help_url = None
|
||
pre_show = None
|
||
default_filter={}
|
||
actions = {}
|
||
pre_list=None
|
||
user_permissions = {
|
||
"add": True,
|
||
"edit": True,
|
||
"delete": True,
|
||
"show": True
|
||
}
|
||
add_form_query_rel_fields = {}
|
||
edit_form_query_rel_fields={}
|
||
related_views=[]
|
||
add_more_info=None
|
||
remember_columns=[]
|
||
spec_label_columns={}
|
||
base_permissions=['can_add','can_show','can_edit','can_list','can_delete']
|
||
cols_width={}
|
||
import_data=False
|
||
pre_upload = None
|
||
|
||
# def pre_list(self,**kargs):
|
||
# return
|
||
|
||
# @pysnooper.snoop()
|
||
def csv_response(self,file_path,file_name=None):
|
||
# 下载csv
|
||
response = make_response(send_file(file_path,as_attachment=True,conditional=True))
|
||
if not file_name:
|
||
file_name = os.path.basename(file_path)
|
||
if '.csv' not in file_name:
|
||
file_name=file_name+".csv"
|
||
response.headers["Content-Disposition"] = f"attachment; filename={file_name}".format(file_name=file_name)
|
||
return response
|
||
|
||
# 建构响应体
|
||
@staticmethod
|
||
# @pysnooper.snoop()
|
||
def response(code, **kwargs):
|
||
"""
|
||
Generic HTTP JSON response method
|
||
|
||
:param code: HTTP code (int)
|
||
:param kwargs: Data structure for response (dict)
|
||
:return: HTTP Json response
|
||
"""
|
||
# 添flash的信息
|
||
flashes = session.get("_flashes", [])
|
||
|
||
# flashes.append((category, message))
|
||
session["_flashes"] = []
|
||
|
||
_ret_json = jsonify(kwargs)
|
||
resp = make_response(_ret_json, code)
|
||
flash_json=[]
|
||
for flash in flashes:
|
||
flash_json.append([flash[0],flash[1]])
|
||
resp.headers["api_flashes"] = json.dumps(flash_json)
|
||
resp.headers["Content-Type"] = "application/json; charset=utf-8"
|
||
return resp
|
||
|
||
def _init_titles(self):
|
||
"""
|
||
Init Titles if not defined
|
||
"""
|
||
super(ModelRestApi, self)._init_titles()
|
||
class_name = self.datamodel.model_name
|
||
if self.label_title:
|
||
self.list_title = "遍历 " + self.label_title
|
||
self.add_title = "添加 " + self.label_title
|
||
self.edit_title = "编辑 " + self.label_title
|
||
self.show_title = "查看 " + self.label_title
|
||
|
||
if not self.list_title:
|
||
self.list_title = "List " + self._prettify_name(class_name)
|
||
if not self.add_title:
|
||
self.add_title = "Add " + self._prettify_name(class_name)
|
||
if not self.edit_title:
|
||
self.edit_title = "Edit " + self._prettify_name(class_name)
|
||
if not self.show_title:
|
||
self.show_title = "Show " + self._prettify_name(class_name)
|
||
self.title = self.list_title
|
||
|
||
# @pysnooper.snoop()
|
||
def _init_properties(self):
|
||
"""
|
||
Init Properties
|
||
"""
|
||
super(MyappModelRestApi, self)._init_properties()
|
||
# 初始化action自耦段
|
||
self.actions = {}
|
||
for attr_name in dir(self):
|
||
func = getattr(self, attr_name)
|
||
if hasattr(func, "_action"):
|
||
action = ActionItem(*func._action, func=func)
|
||
self.actions[action.name] = action
|
||
|
||
|
||
# 初始化label字段
|
||
# 全局的label
|
||
if hasattr(self.datamodel.obj,'label_columns') and self.datamodel.obj.label_columns:
|
||
for col in self.datamodel.obj.label_columns:
|
||
self.label_columns[col] = self.datamodel.obj.label_columns[col]
|
||
|
||
# 本view特定的label
|
||
for col in self.spec_label_columns:
|
||
self.label_columns[col] = self.spec_label_columns[col]
|
||
|
||
self.primary_key = self.datamodel.get_pk_name()
|
||
|
||
self._init_cols_width()
|
||
|
||
# 帮助地址
|
||
self.help_url = conf.get('HELP_URL', {}).get(self.datamodel.obj.__tablename__, '') if self.datamodel else ''
|
||
|
||
|
||
def _init_model_schemas(self):
|
||
# Create Marshmalow schemas if one is not specified
|
||
if self.list_model_schema is None:
|
||
self.list_model_schema = self.model2schemaconverter.convert(
|
||
self.list_columns
|
||
)
|
||
if self.add_model_schema is None:
|
||
self.add_model_schema = self.model2schemaconverter.convert(
|
||
self.add_columns, nested=False, enum_dump_by_name=True
|
||
)
|
||
if self.edit_model_schema is None:
|
||
self.edit_model_schema = self.model2schemaconverter.convert(
|
||
list(set(list(self.edit_columns+self.show_columns+self.list_columns+self.search_columns))), nested=False, enum_dump_by_name=True
|
||
)
|
||
if self.show_model_schema is None:
|
||
self.show_model_schema = self.model2schemaconverter.convert(
|
||
self.show_columns
|
||
)
|
||
|
||
|
||
# @pysnooper.snoop(watch_explode=('value','column_type'))
|
||
def _init_cols_width(self):
|
||
# return
|
||
columns = self.datamodel.obj.__table__._columns
|
||
for column in columns:
|
||
if column.name not in self.cols_width and column.name in self.list_columns: # 只需要配置没有配置过的list_columns
|
||
column_type = column.type
|
||
if column_type.__class__.__name__ in ['Integer','Float','Numeric','Integer','Date','Enum']:
|
||
self.cols_width[column.name] = {
|
||
"type":'ellip2',
|
||
"width":100
|
||
}
|
||
elif column_type.__class__.__name__ in ['Time','Datetime']:
|
||
self.cols_width[column.name] = {
|
||
"type": 'ellip2',
|
||
"width": 300
|
||
}
|
||
elif column_type.__class__.__name__ in ['String','Text']:
|
||
width=100
|
||
if column_type.length and column_type.length>100 and column_type.length<500:
|
||
width = column_type.length
|
||
if column_type.length and column_type.length>500:
|
||
width =500
|
||
|
||
self.cols_width[column.name] = {
|
||
"type":'ellip2',
|
||
"width": width
|
||
}
|
||
|
||
|
||
for attr in self.list_columns:
|
||
if attr not in self.cols_width:
|
||
self.cols_width[attr] = {
|
||
"type": 'ellip2',
|
||
"width": 100
|
||
}
|
||
|
||
# 固定常用的几个字段的宽度
|
||
# print(self.cols_width)
|
||
|
||
|
||
def merge_cols_width(self, response, **kwargs):
|
||
response[API_COLS_WIDTH_RIS_KEY] = self.cols_width
|
||
|
||
def merge_import_data(self, response, **kwargs):
|
||
response[API_IMPORT_DATA_RIS_KEY] = self.import_data
|
||
|
||
|
||
# 根据columnsfields 转化为 info的json信息
|
||
# @pysnooper.snoop()
|
||
def columnsfield2info(self,columnsfields):
|
||
ret = list()
|
||
for col_name in columnsfields:
|
||
column_field = columnsfields[col_name]
|
||
# print(column_field)
|
||
col_info={}
|
||
col_info['name']=col_name
|
||
|
||
column_field_kwargs = column_field.kwargs
|
||
# type 类型 EnumField values
|
||
# aa = column_field
|
||
col_info['type'] = column_field.field_class.__name__.replace('Field', '')
|
||
# ret['description']=column_field_kwargs.get('description','')
|
||
col_info['description'] = self.description_columns.get(col_name, column_field_kwargs.get('description', ''))
|
||
col_info['label'] = self.label_columns.get(col_name, column_field_kwargs.get('label', ''))
|
||
col_info['default'] = column_field_kwargs.get('default', '')
|
||
col_info['validators'] = column_field_kwargs.get('validators', [])
|
||
col_info['choices'] = column_field_kwargs.get('choices', [])
|
||
if 'widget' in column_field_kwargs:
|
||
col_info['widget'] = column_field_kwargs['widget'].__class__.__name__.replace('Widget', '').replace('Field','').replace('My', '')
|
||
col_info['disable'] = column_field_kwargs['widget'].readonly if hasattr(column_field_kwargs['widget'],'readonly') else False
|
||
# if hasattr(column_field_kwargs['widget'],'can_input'):
|
||
# print(field.name,column_field_kwargs['widget'].can_input)
|
||
col_info['ui-type'] = 'input-select' if hasattr(column_field_kwargs['widget'], 'can_input') and column_field_kwargs['widget'].can_input else False
|
||
|
||
col_info = self.make_ui_info(col_info)
|
||
ret.append(col_info)
|
||
return ret
|
||
|
||
|
||
# 每个用户对当前记录的权限,base_permissions 是对所有记录的权限
|
||
def check_item_permissions(self,item):
|
||
self.user_permissions = {
|
||
"add": True,
|
||
"edit": True,
|
||
"delete": True,
|
||
"show": True
|
||
}
|
||
|
||
def merge_base_permissions(self, response, **kwargs):
|
||
response[API_PERMISSIONS_RIS_KEY] = [
|
||
permission
|
||
for permission in self.base_permissions
|
||
# if self.appbuilder.sm.has_access(permission, self.class_permission_name)
|
||
]
|
||
|
||
# @pysnooper.snoop()
|
||
def merge_user_permissions(self, response, **kwargs):
|
||
# print(self.user_permissions)
|
||
response[API_USER_PERMISSIONS_RIS_KEY] = self.user_permissions
|
||
|
||
# add_form_extra_fields 里面的字段要能拿到才对
|
||
# @pysnooper.snoop(watch_explode=())
|
||
def merge_add_field_info(self, response, **kwargs):
|
||
_kwargs = kwargs.get("add_columns", {})
|
||
# 将关联字段的查询限制条件加入
|
||
if self.add_form_query_rel_fields:
|
||
self.add_query_rel_fields = self.add_form_query_rel_fields
|
||
|
||
add_columns = self._get_fields_info(
|
||
self.add_columns,
|
||
self.add_model_schema,
|
||
self.add_query_rel_fields,
|
||
**_kwargs,
|
||
)
|
||
|
||
response[API_ADD_COLUMNS_RES_KEY]=add_columns
|
||
|
||
|
||
# @pysnooper.snoop(watch_explode=('edit_columns'))
|
||
def merge_edit_field_info(self, response, **kwargs):
|
||
_kwargs = kwargs.get("edit_columns", {})
|
||
if self.edit_form_query_rel_fields:
|
||
self.edit_query_rel_fields = self.edit_form_query_rel_fields
|
||
edit_columns = self._get_fields_info(
|
||
self.edit_columns,
|
||
self.edit_model_schema,
|
||
self.edit_query_rel_fields,
|
||
**_kwargs,
|
||
)
|
||
response[API_EDIT_COLUMNS_RES_KEY] = edit_columns
|
||
|
||
|
||
# @pysnooper.snoop(watch_explode=('edit_columns'))
|
||
def merge_add_fieldsets_info(self, response, **kwargs):
|
||
# if self.pre_add_get:
|
||
# self.pre_add_get()
|
||
add_fieldsets=[]
|
||
if self.add_fieldsets:
|
||
for group in self.add_fieldsets:
|
||
group_name = group[0]
|
||
group_fieldsets=group[1]
|
||
add_fieldsets.append({
|
||
"group":group_name,
|
||
"expanded":group_fieldsets.get('expanded',True),
|
||
"fields":group_fieldsets['fields']
|
||
})
|
||
|
||
response[API_ADD_FIELDSETS_RIS_KEY] = add_fieldsets
|
||
|
||
# @pysnooper.snoop(watch_explode=('edit_columns'))
|
||
def merge_edit_fieldsets_info(self, response, **kwargs):
|
||
edit_fieldsets=[]
|
||
if self.edit_fieldsets:
|
||
for group in self.edit_fieldsets:
|
||
group_name = group[0]
|
||
group_fieldsets=group[1]
|
||
edit_fieldsets.append({
|
||
"group":group_name,
|
||
"expanded":group_fieldsets.get('expanded',True),
|
||
"fields":group_fieldsets['fields']
|
||
})
|
||
response[API_EDIT_FIELDSETS_RIS_KEY] = edit_fieldsets
|
||
|
||
def merge_show_fieldsets_info(self, response, **kwargs):
|
||
show_fieldsets=[]
|
||
if self.show_fieldsets:
|
||
for group in self.show_fieldsets:
|
||
group_name = group[0]
|
||
group_fieldsets=group[1]
|
||
show_fieldsets.append({
|
||
"group":group_name,
|
||
"expanded":group_fieldsets.get('expanded',True),
|
||
"fields":group_fieldsets['fields']
|
||
})
|
||
response[API_SHOW_FIELDSETS_RIS_KEY] = show_fieldsets
|
||
|
||
# @pysnooper.snoop()
|
||
def merge_search_filters(self, response, **kwargs):
|
||
# Get possible search fields and all possible operations
|
||
search_filters = dict()
|
||
dict_filters = self._filters.get_search_filters()
|
||
for col in self.search_columns:
|
||
search_filters[col]={}
|
||
search_filters[col]['filter'] = [
|
||
{"name": as_unicode(flt.name), "operator": flt.arg_name}
|
||
for flt in dict_filters[col]
|
||
]
|
||
|
||
# print(col)
|
||
# print(self.datamodel.list_columns)
|
||
# 对于外键全部可选值返回,或者还需要现场查询(现场查询用哪个字段是个问题)
|
||
if self.datamodel and self.edit_model_schema: # 根据edit_column 生成的model_schema,编辑里面才会读取外键对象列表
|
||
# ao = self.edit_model_schema.fields
|
||
if col in self.edit_model_schema.fields:
|
||
|
||
field = self.edit_model_schema.fields[col]
|
||
# print(field)
|
||
if isinstance(field, Related) or isinstance(field, RelatedList):
|
||
filter_rel_field = self.edit_query_rel_fields.get(col, [])
|
||
# 获取外键对象list
|
||
search_filters[col]["count"], search_filters[col]["values"] = self._get_list_related_field(
|
||
field, filter_rel_field, page=0, page_size=1000
|
||
)
|
||
# if col in self.datamodel.list_columns:
|
||
# search_filters[col]["type"] = self.datamodel.list_columns[col].type
|
||
|
||
search_filters[col]["type"] = field.__class__.__name__ if 'type' not in search_filters[col] else search_filters[col]["type"]
|
||
|
||
|
||
# 用户可能会自定义字段的操作格式,比如字符串类型,显示和筛选可能是menu
|
||
if col in self.edit_form_extra_fields:
|
||
column_field = self.edit_form_extra_fields[col]
|
||
column_field_kwargs = column_field.kwargs
|
||
# type 类型 EnumField values
|
||
# aa = column_field
|
||
search_filters[col]['type'] = column_field.field_class.__name__.replace('Field', '').replace('My','')
|
||
search_filters[col]['choices'] = column_field_kwargs.get('choices', [])
|
||
# 选-填 字段在搜索时为填写字段
|
||
search_filters[col]['ui-type'] = 'input' if hasattr(column_field_kwargs.get('widget',{}),'can_input') and column_field_kwargs['widget'].can_input else False
|
||
|
||
search_filters[col] = self.make_ui_info(search_filters[col])
|
||
# 多选字段在搜索时为单选字段
|
||
if search_filters[col].get('ui-type','')=='select2':
|
||
search_filters[col]['ui-type']='select'
|
||
|
||
search_filters[col]['default']=self.default_filter.get(col,'')
|
||
response[API_FILTERS_RES_KEY] = search_filters
|
||
|
||
|
||
def merge_add_title(self, response, **kwargs):
|
||
response[API_ADD_TITLE_RES_KEY] = self.add_title
|
||
|
||
def merge_edit_title(self, response, **kwargs):
|
||
response[API_EDIT_TITLE_RES_KEY] = self.edit_title
|
||
|
||
def merge_label_columns(self, response, **kwargs):
|
||
_pruned_select_cols = kwargs.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
if _pruned_select_cols:
|
||
columns = _pruned_select_cols
|
||
else:
|
||
# Send the exact labels for the caller operation
|
||
if kwargs.get("caller") == "list":
|
||
columns = self.list_columns
|
||
elif kwargs.get("caller") == "show":
|
||
columns = self.show_columns
|
||
else:
|
||
columns = self.label_columns # pragma: no cover
|
||
response[API_LABEL_COLUMNS_RES_KEY] = self._label_columns_json(columns)
|
||
|
||
def merge_list_label_columns(self, response, **kwargs):
|
||
self.merge_label_columns(response, caller="list", **kwargs)
|
||
|
||
def merge_show_label_columns(self, response, **kwargs):
|
||
self.merge_label_columns(response, caller="show", **kwargs)
|
||
|
||
# @pysnooper.snoop()
|
||
def merge_show_columns(self, response, **kwargs):
|
||
_pruned_select_cols = kwargs.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
if _pruned_select_cols:
|
||
response[API_SHOW_COLUMNS_RES_KEY] = _pruned_select_cols
|
||
else:
|
||
response[API_SHOW_COLUMNS_RES_KEY] = self.show_columns
|
||
|
||
def merge_description_columns(self, response, **kwargs):
|
||
_pruned_select_cols = kwargs.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
if _pruned_select_cols:
|
||
response[API_DESCRIPTION_COLUMNS_RES_KEY] = self._description_columns_json(
|
||
_pruned_select_cols
|
||
)
|
||
else:
|
||
# Send all descriptions if cols are or request pruned
|
||
response[API_DESCRIPTION_COLUMNS_RES_KEY] = self._description_columns_json(
|
||
self.description_columns
|
||
)
|
||
|
||
def merge_list_columns(self, response, **kwargs):
|
||
_pruned_select_cols = kwargs.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
if _pruned_select_cols:
|
||
response[API_LIST_COLUMNS_RES_KEY] = _pruned_select_cols
|
||
else:
|
||
response[API_LIST_COLUMNS_RES_KEY] = self.list_columns
|
||
|
||
def merge_order_columns(self, response, **kwargs):
|
||
_pruned_select_cols = kwargs.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
if _pruned_select_cols:
|
||
response[API_ORDER_COLUMNS_RES_KEY] = [
|
||
order_col
|
||
for order_col in self.order_columns
|
||
if order_col in _pruned_select_cols
|
||
]
|
||
else:
|
||
response[API_ORDER_COLUMNS_RES_KEY] = self.order_columns
|
||
|
||
# @pysnooper.snoop(watch_explode=('aa'))
|
||
def merge_columns_info(self, response, **kwargs):
|
||
columns_info={}
|
||
for attr in dir(self.datamodel.obj):
|
||
value = getattr(self.datamodel.obj, attr) if hasattr(self.datamodel.obj,attr) else None
|
||
if type(value)==InstrumentedAttribute:
|
||
if type(value.comparator)==ColumnProperty.Comparator:
|
||
columns_info[value.key]={
|
||
"type":str(value.comparator.type)
|
||
}
|
||
if type(value.comparator)==RelationshipProperty.Comparator:
|
||
columns_info[value.key] = {
|
||
"type":"Relationship"
|
||
}
|
||
response[API_COLUMNS_INFO_RIS_KEY] = columns_info
|
||
|
||
def merge_help_url_info(self, response, **kwargs):
|
||
response[API_HELP_URL_RIS_KEY] = self.help_url
|
||
|
||
# @pysnooper.snoop(watch_explode='aa')
|
||
def merge_action_info(self, response, **kwargs):
|
||
actions_info = {}
|
||
for attr_name in self.actions:
|
||
action = self.actions[attr_name]
|
||
actions_info[action.name] = {
|
||
"name":action.name,
|
||
"text":action.text,
|
||
"confirmation":action.confirmation,
|
||
"icon":action.icon,
|
||
"multiple":action.multiple,
|
||
"single":action.single
|
||
}
|
||
response[API_ACTION_RIS_KEY] = actions_info
|
||
|
||
|
||
def merge_route_info(self, response, **kwargs):
|
||
response[API_ROUTE_RIS_KEY] = "/"+self.route_base.strip('/')+"/"
|
||
response['primary_key']=self.primary_key
|
||
response['label_title'] = self.label_title or self._prettify_name(self.datamodel.model_name)
|
||
|
||
# @pysnooper.snoop(watch_explode=())
|
||
# 添加关联model的字段
|
||
def merge_related_field_info(self, response, **kwargs):
|
||
try:
|
||
add_info={}
|
||
if self.related_views:
|
||
for related_views_class in self.related_views:
|
||
related_views = related_views_class()
|
||
related_views._init_model_schemas()
|
||
if related_views.add_form_query_rel_fields:
|
||
related_views.add_query_rel_fields = related_views.add_form_query_rel_fields
|
||
|
||
# print(related_views.add_columns)
|
||
# print(related_views.add_model_schema)
|
||
# print(related_views.add_query_rel_fields)
|
||
add_columns = related_views._get_fields_info(
|
||
cols=related_views.add_columns,
|
||
model_schema=related_views.add_model_schema,
|
||
filter_rel_fields=related_views.add_query_rel_fields,
|
||
**kwargs,
|
||
)
|
||
add_info[str(related_views.datamodel.obj.__name__).lower()] = add_columns
|
||
# add_info[related_views.__class__.__name__]=add_columns
|
||
response[API_RELATED_RIS_KEY] = add_info
|
||
except Exception as e:
|
||
print(e)
|
||
pass
|
||
|
||
def merge_list_title(self, response, **kwargs):
|
||
response[API_LIST_TITLE_RES_KEY] = self.list_title
|
||
|
||
def merge_show_title(self, response, **kwargs):
|
||
response[API_SHOW_TITLE_RES_KEY] = self.show_title
|
||
|
||
|
||
def merge_more_info(self,response,**kwargs):
|
||
# 将 配置根据历史填写值作为选项的字段配置出来。
|
||
if self.add_more_info:
|
||
try:
|
||
self.add_more_info(response,**kwargs)
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
|
||
|
||
def response_error(self,code,message='error',status=1,result={}):
|
||
back_data = {
|
||
'result': result,
|
||
"status": status,
|
||
'message': message
|
||
}
|
||
return self.response(code, **back_data)
|
||
|
||
|
||
|
||
@expose("/_info", methods=["GET"])
|
||
@merge_response_func(merge_more_info,'more_info')
|
||
@merge_response_func(merge_import_data, API_IMPORT_DATA_RIS_KEY)
|
||
@merge_response_func(merge_cols_width, API_COLS_WIDTH_RIS_KEY)
|
||
@merge_response_func(merge_base_permissions, API_PERMISSIONS_RIS_KEY)
|
||
@merge_response_func(merge_user_permissions, API_USER_PERMISSIONS_RIS_KEY)
|
||
@merge_response_func(merge_add_field_info, API_ADD_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_edit_field_info, API_EDIT_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_add_fieldsets_info, API_ADD_FIELDSETS_RIS_KEY)
|
||
@merge_response_func(merge_edit_fieldsets_info, API_EDIT_FIELDSETS_RIS_KEY)
|
||
@merge_response_func(merge_show_fieldsets_info, API_SHOW_FIELDSETS_RIS_KEY)
|
||
@merge_response_func(merge_search_filters, API_FILTERS_RIS_KEY)
|
||
@merge_response_func(merge_show_label_columns, API_LABEL_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_show_columns, API_SHOW_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_list_label_columns, API_LABEL_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_list_columns, API_LIST_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_list_title, API_LIST_TITLE_RIS_KEY)
|
||
@merge_response_func(merge_show_title, API_SHOW_TITLE_RIS_KEY)
|
||
@merge_response_func(merge_add_title, API_ADD_TITLE_RIS_KEY)
|
||
@merge_response_func(merge_edit_title, API_EDIT_TITLE_RIS_KEY)
|
||
@merge_response_func(merge_description_columns, API_DESCRIPTION_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_order_columns, API_ORDER_COLUMNS_RIS_KEY)
|
||
@merge_response_func(merge_columns_info, API_COLUMNS_INFO_RIS_KEY)
|
||
@merge_response_func(merge_help_url_info, API_HELP_URL_RIS_KEY)
|
||
@merge_response_func(merge_action_info, API_ACTION_RIS_KEY)
|
||
@merge_response_func(merge_route_info, API_ROUTE_RIS_KEY)
|
||
@merge_response_func(merge_related_field_info, API_RELATED_RIS_KEY)
|
||
def api_info(self, **kwargs):
|
||
_response = dict()
|
||
_args = kwargs.get("rison", {})
|
||
_args.update(request.args)
|
||
id = _args.get(self.primary_key,'')
|
||
if id:
|
||
item = self.datamodel.get(id)
|
||
if item and self.pre_update_get:
|
||
try:
|
||
self.pre_update_get(item)
|
||
except Exception as e:
|
||
print(e)
|
||
if item and self.check_item_permissions:
|
||
try:
|
||
self.check_item_permissions(item)
|
||
except Exception as e:
|
||
print(e)
|
||
elif self.pre_add_get:
|
||
try:
|
||
self.pre_add_get()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
self.set_response_key_mappings(_response, self.api_info, _args, **_args)
|
||
return self.response(200, **_response)
|
||
|
||
|
||
@expose("/<int:pk>", methods=["GET"])
|
||
# @pysnooper.snoop(depth=4)
|
||
def api_get(self, pk, **kwargs):
|
||
if self.pre_show:
|
||
src_item_object = self.datamodel.get(pk, self._base_filters)
|
||
self.pre_show(src_item_object)
|
||
|
||
# from flask_appbuilder.models.sqla.interface import SQLAInterface
|
||
item = self.datamodel.get(pk, self._base_filters)
|
||
if not item:
|
||
return self.response_error(404, "Not found")
|
||
|
||
_response = dict()
|
||
_args = kwargs.get("rison", {})
|
||
if 'form_data' in request.args:
|
||
_args.update(json.loads(request.args.get('form_data')))
|
||
|
||
select_cols = _args.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
_pruned_select_cols = [col for col in select_cols if col in self.show_columns]
|
||
self.set_response_key_mappings(
|
||
_response,
|
||
self.get,
|
||
_args,
|
||
**{API_SELECT_COLUMNS_RIS_KEY: _pruned_select_cols},
|
||
)
|
||
if _pruned_select_cols:
|
||
_show_model_schema = self.model2schemaconverter.convert(_pruned_select_cols)
|
||
else:
|
||
_show_model_schema = self.show_model_schema
|
||
|
||
data = _show_model_schema.dump(item, many=False).data
|
||
if int(_args.get('str_related',0)):
|
||
for key in data:
|
||
if type(data[key])==dict:
|
||
data[key]=str(getattr(item,key))
|
||
|
||
# 按show_columns的顺序显示
|
||
data = sorted(data.items(), key=lambda kv: self.show_columns.index(kv[0]) if kv[0] in self.show_columns else 1000)
|
||
data = dict(zip([x[0] for x in data], [x[1] for x in data]))
|
||
|
||
_response['data'] = data # item.to_json()
|
||
_response['data'][self.primary_key] = pk
|
||
|
||
back = self.pre_get(_response)
|
||
back_data = {
|
||
'result': back['data'] if back else _response['data'],
|
||
"status": 0,
|
||
'message': "success"
|
||
}
|
||
return self.response(200, **back_data)
|
||
|
||
|
||
@expose("/", methods=["GET"])
|
||
# @pysnooper.snoop(watch_explode=('_response','lst'))
|
||
def api_list(self, **kwargs):
|
||
_response = dict()
|
||
if self.pre_json_load:
|
||
req_json = self.pre_json_load(request.json)
|
||
else:
|
||
try:
|
||
req_json = request.json or {}
|
||
except Exception as e:
|
||
print(e)
|
||
req_json={}
|
||
|
||
_args = req_json or {}
|
||
_args.update(request.args)
|
||
# 应对那些get无法传递body的请求,也可以把body放在url里面
|
||
if 'form_data' in request.args:
|
||
_args.update(json.loads(request.args.get('form_data')))
|
||
|
||
if self.pre_list:
|
||
self.pre_list(**_args)
|
||
|
||
|
||
# handle select columns
|
||
select_cols = _args.get(API_SELECT_COLUMNS_RIS_KEY, [])
|
||
_pruned_select_cols = [col for col in select_cols if col in self.list_columns]
|
||
self.set_response_key_mappings(
|
||
_response,
|
||
self.get_list,
|
||
_args,
|
||
**{API_SELECT_COLUMNS_RIS_KEY: _pruned_select_cols},
|
||
)
|
||
|
||
if _pruned_select_cols:
|
||
_list_model_schema = self.model2schemaconverter.convert(_pruned_select_cols)
|
||
else:
|
||
_list_model_schema = self.list_model_schema
|
||
# handle filters
|
||
try:
|
||
# 参数缩写都在每个filter的arg_name
|
||
from flask_appbuilder.models.sqla.filters import FilterEqualFunction, FilterStartsWith
|
||
|
||
joined_filters = self._handle_filters_args(_args)
|
||
except FABException as e:
|
||
return self.response_error(400,message=str(e))
|
||
# handle base order
|
||
try:
|
||
order_column, order_direction = self._handle_order_args(_args)
|
||
except InvalidOrderByColumnFABException as e:
|
||
return self.response_error(400,message=str(e))
|
||
# handle pagination
|
||
page_index, page_size = self._handle_page_args(_args)
|
||
# Make the query
|
||
query_select_columns = _pruned_select_cols or self.list_columns
|
||
count, lst = self.datamodel.query(
|
||
joined_filters,
|
||
order_column,
|
||
order_direction,
|
||
page=page_index,
|
||
page_size=page_size,
|
||
select_columns=query_select_columns,
|
||
)
|
||
if self.post_list:
|
||
lst = self.post_list(lst)
|
||
# pks = self.datamodel.get_keys(lst)
|
||
# import marshmallow.schema
|
||
import marshmallow.marshalling
|
||
# for item in lst:
|
||
# if self.datamodel.is_relation(item)
|
||
# aa =
|
||
# item.project = 'aaa'
|
||
|
||
data = _list_model_schema.dump(lst, many=True).data
|
||
|
||
# 把外键换成字符串
|
||
if int(_args.get('str_related',0)):
|
||
for index in range(len(data)):
|
||
for key in data[index]:
|
||
if type(data[index][key])==dict:
|
||
data[index][key]=str(getattr(lst[index],key))
|
||
|
||
_response['data'] = data # [item.to_json() for item in lst]
|
||
# _response["ids"] = pks
|
||
_response["count"] = count # 这个是总个数
|
||
for index in range(len(lst)):
|
||
|
||
_response['data'][index][self.primary_key]= getattr(lst[index],self.primary_key)
|
||
|
||
try:
|
||
self.pre_get_list(_response)
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
back_data = {
|
||
'result': _response,# _response['data']
|
||
"status": 0,
|
||
'message': "success"
|
||
}
|
||
# print(back_data)
|
||
return self.response(200, **back_data)
|
||
|
||
# @pysnooper.snoop()
|
||
def json_to_item(self,data):
|
||
class Back:
|
||
pass
|
||
back = Back()
|
||
try:
|
||
item = self.datamodel.obj(**data)
|
||
# for key in data:
|
||
# if hasattr(item,key):
|
||
# setattr(item,key,data[key])
|
||
|
||
setattr(back,'data',item)
|
||
except Exception as e:
|
||
setattr(back, 'data', data)
|
||
setattr(back, 'errors', str(e))
|
||
return back
|
||
|
||
|
||
# @expose("/add", methods=["POST"])
|
||
# def add(self):
|
||
@expose("/", methods=["POST"])
|
||
# @pysnooper.snoop(watch_explode=('item', 'json_data'))
|
||
def api_add(self):
|
||
self.src_item_json = {}
|
||
if not request.is_json:
|
||
return self.response_error(400,message="Request is not JSON")
|
||
try:
|
||
if self.pre_json_load:
|
||
json_data = self.pre_json_load(request.json)
|
||
else:
|
||
json_data = request.json
|
||
|
||
item = self.add_model_schema.load(json_data)
|
||
# item = self.add_model_schema.load(data)
|
||
except ValidationError as err:
|
||
return self.response_error(422,message=err.messages)
|
||
# This validates custom Schema with custom validations
|
||
if isinstance(item.data, dict):
|
||
return self.response_error(422,message=item.errors)
|
||
try:
|
||
self.pre_add(item.data)
|
||
self.datamodel.add(item.data, raise_exception=True)
|
||
self.post_add(item.data)
|
||
result_data = self.add_model_schema.dump(
|
||
item.data, many=False
|
||
).data
|
||
result_data[self.primary_key] = self.datamodel.get_pk_value(item.data)
|
||
back_data={
|
||
'result': result_data,
|
||
"status":0,
|
||
'message':"success"
|
||
}
|
||
return self.response(
|
||
200,
|
||
**back_data,
|
||
)
|
||
except IntegrityError as e:
|
||
return self.response_error(422,message=str(e.orig))
|
||
except Exception as e1:
|
||
return self.response_error(500, message=str(e1))
|
||
|
||
|
||
@expose("/<int:pk>", methods=["PUT"])
|
||
# @pysnooper.snoop(watch_explode=('item','data'))
|
||
def api_edit(self, pk):
|
||
|
||
item = self.datamodel.get(pk, self._base_filters)
|
||
self.src_item_json = item.to_json()
|
||
|
||
# if self.check_redirect_list_url:
|
||
try:
|
||
if self.check_edit_permission:
|
||
has_permission = self.check_edit_permission(item)
|
||
if not has_permission:
|
||
return json_response(message='no permission to edit',status=1,result={})
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
return json_response(message='check edit permission'+str(e),status=1,result={})
|
||
|
||
|
||
if not request.is_json:
|
||
return self.response_error(400, message="Request is not JSON")
|
||
if not item:
|
||
return self.response_error(404,message='Not found')
|
||
try:
|
||
if self.pre_json_load:
|
||
json_data = self.pre_json_load(request.json)
|
||
else:
|
||
json_data = request.json
|
||
data = self._merge_update_item(item, json_data)
|
||
item = self.edit_model_schema.load(data, instance=item)
|
||
except ValidationError as err:
|
||
return self.response_error(422,message=err.messages)
|
||
# This validates custom Schema with custom validations
|
||
if isinstance(item.data, dict):
|
||
return self.response_error(422,message=item.errors)
|
||
self.pre_update(item.data)
|
||
|
||
|
||
try:
|
||
self.datamodel.edit(item.data, raise_exception=True)
|
||
self.post_update(item.data)
|
||
result = self.edit_model_schema.dump(
|
||
item.data, many=False
|
||
).data
|
||
result[self.primary_key] = self.datamodel.get_pk_value(item.data)
|
||
back_data={
|
||
"status":0,
|
||
"message":"success",
|
||
"result":result
|
||
}
|
||
return self.response(
|
||
200,
|
||
**back_data,
|
||
)
|
||
except IntegrityError as e:
|
||
return self.response_error(422,message=str(e.orig))
|
||
|
||
@expose("/<int:pk>", methods=["DELETE"])
|
||
# @pysnooper.snoop()
|
||
def api_delete(self, pk):
|
||
item = self.datamodel.get(pk, self._base_filters)
|
||
if not item:
|
||
return self.response_error(404,message='Not found')
|
||
self.pre_delete(item)
|
||
try:
|
||
self.datamodel.delete(item, raise_exception=True)
|
||
self.post_delete(item)
|
||
back_data={
|
||
"status":0,
|
||
"message":"success",
|
||
"result":item.to_json()
|
||
}
|
||
return self.response(200, **back_data)
|
||
except IntegrityError as e:
|
||
return self.response_error(422,message=str(e.orig))
|
||
|
||
|
||
|
||
|
||
@expose("/action/<string:name>/<int:pk>", methods=["GET"])
|
||
def single_action(self, name, pk):
|
||
"""
|
||
Action method to handle actions from a show view
|
||
"""
|
||
pk = self._deserialize_pk_if_composite(pk)
|
||
action = self.actions.get(name)
|
||
try:
|
||
res = action.func(self.datamodel.get(pk))
|
||
back = {
|
||
"status": 0,
|
||
"result": {},
|
||
"message": 'success'
|
||
}
|
||
return self.response(200,**back)
|
||
except Exception as e:
|
||
print(e)
|
||
back = {
|
||
"status": -1,
|
||
"message": str(e),
|
||
"result": {}
|
||
}
|
||
return self.response(200,**back)
|
||
|
||
|
||
@expose("/multi_action/<string:name>", methods=["POST"])
|
||
def multi_action(self,name):
|
||
"""
|
||
Action method to handle multiple records selected from a list view
|
||
"""
|
||
pks = request.json["ids"]
|
||
action = self.actions.get(name)
|
||
items = [
|
||
self.datamodel.get(self._deserialize_pk_if_composite(int(pk))) for pk in pks
|
||
]
|
||
try:
|
||
back = action.func(items)
|
||
message = back if type(back)==str else 'success'
|
||
back={
|
||
"status":0,
|
||
"result":{},
|
||
"message":message
|
||
}
|
||
return self.response(200,**back)
|
||
except Exception as e:
|
||
print(e)
|
||
back = {
|
||
"status":-1,
|
||
"message": str(e),
|
||
"result":{}
|
||
}
|
||
return self.response(200,**back)
|
||
|
||
|
||
|
||
@expose("/upload_demo/", methods=["GET"])
|
||
def upload_demo(self):
|
||
|
||
demostr=','.join(list(self.add_columns))+"\n"+','.join(['xx' for x in list(self.add_columns)])
|
||
|
||
file_name = self.datamodel.obj.__tablename__
|
||
csv_file='%s.csv'%file_name
|
||
if os.path.exists(csv_file):
|
||
os.remove(csv_file)
|
||
file = open(csv_file,mode='w',encoding='utf-8-sig')
|
||
file.writelines(demostr)
|
||
file.close()
|
||
csv_file = os.path.abspath(csv_file)
|
||
response = self.csv_response(csv_file,file_name=file_name)
|
||
return response
|
||
|
||
@expose("/upload/", methods=["POST"])
|
||
def upload(self):
|
||
csv_file = request.files.get('csv_file') # FileStorage
|
||
# 文件保存至指定路径
|
||
i_path = csv_file.filename
|
||
if os.path.exists(i_path):
|
||
os.remove(i_path)
|
||
csv_file.save(i_path)
|
||
# 读取csv,读取header,按行处理
|
||
import csv
|
||
csv_reader = csv.reader(open(i_path, mode='r', encoding='utf-8-sig'))
|
||
header = None
|
||
result = []
|
||
for line in csv_reader:
|
||
if not header:
|
||
header = line
|
||
continue
|
||
# 判断header里面的字段是否在数据库都有
|
||
for col_name in header:
|
||
# attr = self.datamodel.obj
|
||
if not hasattr(self.datamodel.obj,col_name):
|
||
flash('csv首行header与数据库字段不对应','warning')
|
||
back = {
|
||
"status":1,
|
||
"result":[],
|
||
"message":"csv首行header与数据库字段不对应"
|
||
}
|
||
return self.response(200,**back)
|
||
|
||
# 个数不对的去掉
|
||
if len(line)!=len(header):
|
||
continue
|
||
|
||
# 全是空值的去掉
|
||
ll = [l.strip() for l in line if l.strip()]
|
||
if not ll:
|
||
continue
|
||
|
||
data = dict(zip(header, line))
|
||
|
||
try:
|
||
if self.pre_upload:
|
||
data = self.pre_upload(data)
|
||
model = self.datamodel.obj(**data)
|
||
self.pre_add(model)
|
||
db.session.add(model)
|
||
self.post_add(model)
|
||
db.session.commit()
|
||
result.append('success')
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
print(e)
|
||
result.append('fail')
|
||
|
||
flash('成功导入%s行,失败导入%s行'%(len([x for x in result if x=='success']),len([x for x in result if x=='fail'])), 'warning')
|
||
back = {
|
||
"status": 0,
|
||
"message": "result为上传成功行,共成功%s" % len([x for x in result if x == 'success']),
|
||
"result": result
|
||
}
|
||
return self.response(200,**back)
|
||
|
||
|
||
@expose("/download/", methods=["GET"])
|
||
# @pysnooper.snoop()
|
||
def download(self):
|
||
import pandas
|
||
sqllchemy_uri = conf.get('SQLALCHEMY_DATABASE_URI','')
|
||
bind_key = getattr(self.datamodel.obj,'__bind_key__',None)
|
||
if bind_key:
|
||
bind_sqllchemy_uri = conf['SQLALCHEMY_BINDS'].get(bind_key,'')
|
||
if bind_sqllchemy_uri:
|
||
sqllchemy_uri=bind_sqllchemy_uri
|
||
|
||
import sqlalchemy.engine.url as url
|
||
uri = url.make_url(sqllchemy_uri)
|
||
sql_engine = create_engine(uri)
|
||
table_name = self.datamodel.obj.__tablename__
|
||
sql = 'select `%s` from %s' % ('`,`'.join(self.show_columns), table_name)
|
||
# print(sql)
|
||
results = pandas.read_sql_query(sql, sql_engine)
|
||
|
||
file_path = '%s.csv' % table_name
|
||
csv_file = os.path.abspath(file_path)
|
||
if os.path.exists(csv_file):
|
||
os.remove(csv_file)
|
||
results.to_csv(csv_file, index=False, sep=",") # index 是第几行的表示
|
||
response = self.csv_response(csv_file, file_name=table_name)
|
||
return response
|
||
|
||
|
||
@action("muldelete", __("Delete"), __("Delete all Really?"), "fa-trash", single=False)
|
||
# @pysnooper.snoop(watch_explode=('items'))
|
||
def muldelete(self, items):
|
||
if not items:
|
||
abort(404)
|
||
success=[]
|
||
fail=[]
|
||
for item in items:
|
||
try:
|
||
self.pre_delete(item)
|
||
db.session.delete(item)
|
||
success.append(item.to_json())
|
||
except Exception as e:
|
||
flash(str(e), "danger")
|
||
fail.append(item.to_json())
|
||
db.session.commit()
|
||
return json.dumps(
|
||
{
|
||
"success":success,
|
||
"fail":fail
|
||
},indent=4,ensure_ascii=False
|
||
)
|
||
|
||
"""
|
||
------------------------------------------------
|
||
HELPER FUNCTIONS
|
||
------------------------------------------------
|
||
"""
|
||
|
||
def _deserialize_pk_if_composite(self, pk):
|
||
def date_deserializer(obj):
|
||
if "_type" not in obj:
|
||
return obj
|
||
|
||
from dateutil import parser
|
||
|
||
if obj["_type"] == "datetime":
|
||
return parser.parse(obj["value"])
|
||
elif obj["_type"] == "date":
|
||
return parser.parse(obj["value"]).date()
|
||
return obj
|
||
|
||
if self.datamodel.is_pk_composite():
|
||
try:
|
||
pk = json.loads(pk, object_hook=date_deserializer)
|
||
except Exception:
|
||
pass
|
||
return pk
|
||
|
||
|
||
def _handle_page_args(self, rison_args):
|
||
"""
|
||
Helper function to handle rison page
|
||
arguments, sets defaults and impose
|
||
FAB_API_MAX_PAGE_SIZE
|
||
|
||
:param rison_args:
|
||
:return: (tuple) page, page_size
|
||
"""
|
||
page = rison_args.get(API_PAGE_INDEX_RIS_KEY, 0)
|
||
page_size = rison_args.get(API_PAGE_SIZE_RIS_KEY, self.page_size)
|
||
return self._sanitize_page_args(page, page_size)
|
||
|
||
# @pysnooper.snoop()
|
||
def _sanitize_page_args(self, page, page_size):
|
||
_page = page or 0
|
||
_page_size = page_size or self.page_size
|
||
max_page_size = self.max_page_size or current_app.config.get(
|
||
"FAB_API_MAX_PAGE_SIZE"
|
||
)
|
||
# Accept special -1 to uncap the page size
|
||
if max_page_size == -1:
|
||
if _page_size == -1:
|
||
return None, None
|
||
else:
|
||
return _page, _page_size
|
||
if _page_size > max_page_size or _page_size < 1:
|
||
_page_size = max_page_size
|
||
return _page, _page_size
|
||
|
||
def _handle_order_args(self, rison_args):
|
||
"""
|
||
Help function to handle rison order
|
||
arguments
|
||
|
||
:param rison_args:
|
||
:return:
|
||
"""
|
||
order_column = rison_args.get(API_ORDER_COLUMN_RIS_KEY, "")
|
||
order_direction = rison_args.get(API_ORDER_DIRECTION_RIS_KEY, "")
|
||
if not order_column and self.base_order:
|
||
return self.base_order
|
||
if not order_column:
|
||
return "", ""
|
||
elif order_column not in self.order_columns:
|
||
raise InvalidOrderByColumnFABException(
|
||
f"Invalid order by column: {order_column}"
|
||
)
|
||
return order_column, order_direction
|
||
|
||
def _handle_filters_args(self, rison_args):
|
||
self._filters.clear_filters()
|
||
self._filters.rest_add_filters(rison_args.get(API_FILTERS_RIS_KEY, []))
|
||
return self._filters.get_joined_filters(self._base_filters)
|
||
|
||
|
||
# @pysnooper.snoop(watch_explode=("column"))
|
||
def _description_columns_json(self, cols=None):
|
||
"""
|
||
Prepares dict with col descriptions to be JSON serializable
|
||
"""
|
||
ret = {}
|
||
cols = cols or []
|
||
d = {k: v for (k, v) in self.description_columns.items() if k in cols}
|
||
for key, value in d.items():
|
||
ret[key] = as_unicode(_(value).encode("UTF-8"))
|
||
|
||
edit_form_extra_fields = self.edit_form_extra_fields
|
||
for col in edit_form_extra_fields:
|
||
column = edit_form_extra_fields[col]
|
||
if hasattr(column, 'kwargs') and column.kwargs:
|
||
description = column.kwargs.get('description','')
|
||
if description:
|
||
ret[col] = description
|
||
|
||
return ret
|
||
|
||
|
||
def _label_columns_json(self, cols=None):
|
||
"""
|
||
Prepares dict with labels to be JSON serializable
|
||
"""
|
||
ret = {}
|
||
# 自动生成的label
|
||
cols = cols or []
|
||
d = {k: v for (k, v) in self.label_columns.items() if k in cols}
|
||
for key, value in d.items():
|
||
ret[key] = as_unicode(_(value).encode("UTF-8"))
|
||
|
||
# 全局的label
|
||
if hasattr(self.datamodel.obj,'label_columns') and self.datamodel.obj.label_columns:
|
||
for col in self.datamodel.obj.label_columns:
|
||
ret[col] = self.datamodel.obj.label_columns[col]
|
||
|
||
# 本view特定的label
|
||
for col in self.label_columns:
|
||
ret[col] = self.label_columns[col]
|
||
|
||
# 本view特定的label
|
||
for col in self.spec_label_columns:
|
||
ret[col] = self.spec_label_columns[col]
|
||
|
||
return ret
|
||
|
||
def make_ui_info(self,ret):
|
||
|
||
# 可序列化处理
|
||
if ret.get('default',None) and isfunction(ret['default']):
|
||
ret['default'] = None # 函数没法序列化
|
||
|
||
# 统一处理校验器
|
||
local_validators=[]
|
||
for v in ret.get('validators',[]):
|
||
# print(v)
|
||
# if ret.get('name', '') == "warehouse_level":
|
||
# print(ret)
|
||
# print(v)
|
||
# # from wtforms.validators import DataRequired
|
||
# print(type(v))
|
||
# print(v.__class__.__name__)
|
||
val = {}
|
||
val['type'] = v.__class__.__name__
|
||
|
||
if type(v) == validators.Regexp or type(v) == validate.Regexp: # 一种是数据库的校验器,一种是可视化的校验器
|
||
val['regex'] = str(v.regex.pattern)
|
||
elif type(v) == validators.Length or type(v) == validate.Length:
|
||
val['min'] = v.min
|
||
val['max'] = v.max
|
||
elif type(v) == validators.NumberRange or type(v) == validate.Range :
|
||
val['min'] = v.min
|
||
val['max'] = v.max
|
||
else:
|
||
pass
|
||
|
||
local_validators.append(val)
|
||
ret['validators']=local_validators
|
||
|
||
# 统一规范前端type和选择时value
|
||
# 选择器
|
||
if ret.get('type','') in ['QuerySelect','Select','Related','MySelectMultiple','SelectMultiple','Enum']:
|
||
choices = ret.get('choices',[])
|
||
values = ret.get('values',[])
|
||
if choices:
|
||
values=[]
|
||
for choice in choices:
|
||
if len(choice)==2:
|
||
values.append({
|
||
"id":choice[0],
|
||
"value":choice[1]
|
||
})
|
||
ret['values']=values
|
||
if not ret.get('ui-type',''):
|
||
ret['ui-type']='select2' if 'SelectMultiple' in ret['type'] else 'select'
|
||
|
||
# 字符串
|
||
if ret.get('type','') in ['String',]:
|
||
if ret.get('widget','BS3Text')=='BS3Text':
|
||
ret['ui-type'] = 'input'
|
||
else:
|
||
ret['ui-type'] = 'textArea'
|
||
# 长文本输入
|
||
if 'text' in ret.get('type','').lower():
|
||
ret['ui-type'] = 'textArea'
|
||
if 'varchar' in ret.get('type','').lower():
|
||
ret['ui-type'] = 'input'
|
||
|
||
# bool类型
|
||
if 'boolean' in ret.get('type','').lower():
|
||
ret['ui-type'] = 'radio'
|
||
ret['values']=[
|
||
{
|
||
"id":True,
|
||
"value":"yes",
|
||
},
|
||
{
|
||
"id": False,
|
||
"value": "no",
|
||
},
|
||
]
|
||
ret['default']=True if ret.get('default',0) else False
|
||
|
||
# 处理正则自动输入
|
||
default = ret.get('default',None)
|
||
if default and re.match('\$\{.*\}',str(default)):
|
||
ret['ui-type']='match-input'
|
||
|
||
return ret
|
||
|
||
# @pysnooper.snoop(watch_explode=('field_contents'))
|
||
def _get_field_info(self, field, filter_rel_field, page=None, page_size=None):
|
||
"""
|
||
Return a dict with field details
|
||
ready to serve as a response
|
||
|
||
:param field: marshmallow field
|
||
:return: dict with field details
|
||
"""
|
||
ret = dict()
|
||
ret["name"] = field.name
|
||
# print(ret["name"])
|
||
# print(type(field))
|
||
# print(field)
|
||
|
||
# 根据数据库信息添加
|
||
if self.datamodel:
|
||
list_columns = self.datamodel.list_columns # 只有数据库存储的字段,没有外键字段
|
||
if field.name in list_columns:
|
||
column = list_columns[field.name]
|
||
default = column.default
|
||
# print(type(column.type))
|
||
column_type=column.type
|
||
# aa=column_type
|
||
column_type_str = str(column_type.__class__.__name__)
|
||
if column_type_str=='Enum':
|
||
ret['values']=[
|
||
{
|
||
"id":x,
|
||
"value":x
|
||
} for x in column.type.enums
|
||
]
|
||
# print(column_type)
|
||
# if type(column_type)==
|
||
# print(column.__class__.__name__)
|
||
from sqlalchemy.sql.schema import Column
|
||
ret['type']=column_type_str
|
||
if default:
|
||
ret['default'] = default.arg
|
||
|
||
# print(column)
|
||
# print(column.type)
|
||
# print(type(column))
|
||
# from sqlalchemy.sql.schema import Column
|
||
# # if column
|
||
if field.name in self.remember_columns:
|
||
ret["remember"]=True
|
||
else:
|
||
ret["remember"] = False
|
||
ret["label"] = _(self.label_columns.get(field.name, ""))
|
||
ret["description"] = _(self.description_columns.get(field.name, ""))
|
||
# Handles related fields
|
||
if isinstance(field, Related) or isinstance(field, RelatedList):
|
||
ret["count"], ret["values"] = self._get_list_related_field(
|
||
field, filter_rel_field, page=page, page_size=page_size
|
||
)
|
||
|
||
if field.validate and isinstance(field.validate, list):
|
||
ret["validators"] = [v for v in field.validate]
|
||
elif field.validate:
|
||
ret["validators"] = [field.validate]
|
||
|
||
# 对于非数据库中字段使用字段信息描述类型
|
||
ret["type"] = field.__class__.__name__ if 'type' not in ret else ret["type"]
|
||
ret["required"] = field.required
|
||
ret["unique"] = getattr(field, "unique", False)
|
||
# When using custom marshmallow schemas fields don't have unique property
|
||
|
||
|
||
# 根据edit_form_extra_fields来确定
|
||
if self.edit_form_extra_fields:
|
||
if field.name in self.edit_form_extra_fields:
|
||
column_field = self.edit_form_extra_fields[field.name]
|
||
column_field_kwargs = column_field.kwargs
|
||
# type 类型 EnumField values
|
||
# aa = column_field
|
||
ret['type']=column_field.field_class.__name__.replace('Field','')
|
||
# ret['description']=column_field_kwargs.get('description','')
|
||
ret['description'] = self.description_columns.get(field.name,column_field_kwargs.get('description', ''))
|
||
ret['label'] = self.label_columns.get(field.name,column_field_kwargs.get('label', ''))
|
||
ret['default'] = column_field_kwargs.get('default', '')
|
||
ret['validators'] = column_field_kwargs.get('validators', [])
|
||
ret['choices'] = column_field_kwargs.get('choices', [])
|
||
if 'widget' in column_field_kwargs:
|
||
ret['widget']=column_field_kwargs['widget'].__class__.__name__.replace('Widget','').replace('Field','').replace('My','')
|
||
ret['disable']=column_field_kwargs['widget'].readonly if hasattr(column_field_kwargs['widget'],'readonly') else False
|
||
# if hasattr(column_field_kwargs['widget'],'can_input'):
|
||
# print(field.name,column_field_kwargs['widget'].can_input)
|
||
ret['ui-type'] = 'input-select' if hasattr(column_field_kwargs['widget'],'can_input') and column_field_kwargs['widget'].can_input else False
|
||
# 对于那种配置使用过往记录作为可选值的参数进行处理
|
||
if hasattr(column_field_kwargs['widget'], 'conten2choices') and column_field_kwargs['widget'].conten2choices:
|
||
try:
|
||
field_contents = db.session.query(getattr(self.datamodel.obj,field.name)).group_by(getattr(self.datamodel.obj,field.name)).all()
|
||
field_contents = [item[0] for item in field_contents]
|
||
if field_contents:
|
||
ret['choices']=[[x,x] for x in list(set(field_contents))]
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
|
||
# 补充数据库model中定义的是否必填
|
||
columns = [column for column in self.datamodel.obj.__table__._columns if column.name==field.name and hasattr(column,'nullable') and not column.nullable]
|
||
if columns:
|
||
if 'validators' in ret:
|
||
ret['validators'].append(validators.DataRequired())
|
||
else:
|
||
ret['validators']=[validators.DataRequired()]
|
||
|
||
# print(ret)
|
||
ret=self.make_ui_info(ret)
|
||
|
||
return ret
|
||
|
||
# @pysnooper.snoop()
|
||
def _get_fields_info(self, cols, model_schema, filter_rel_fields, **kwargs):
|
||
"""
|
||
Returns a dict with fields detail
|
||
from a marshmallow schema
|
||
|
||
:param cols: list of columns to show info for
|
||
:param model_schema: Marshmallow model schema
|
||
:param filter_rel_fields: expects add_query_rel_fields or
|
||
edit_query_rel_fields
|
||
:param kwargs: Receives all rison arguments for pagination
|
||
:return: dict with all fields details
|
||
"""
|
||
ret = list()
|
||
for col in cols:
|
||
page = page_size = None
|
||
col_args = kwargs.get(col, {})
|
||
if col_args:
|
||
page = col_args.get(API_PAGE_INDEX_RIS_KEY, None)
|
||
page_size = col_args.get(API_PAGE_SIZE_RIS_KEY, None)
|
||
|
||
page_size=1000
|
||
ret.append(
|
||
self._get_field_info(
|
||
model_schema.fields[col],
|
||
filter_rel_fields.get(col, []),
|
||
page=page,
|
||
page_size=page_size,
|
||
)
|
||
)
|
||
return ret
|
||
|
||
def _get_list_related_field(
|
||
self, field, filter_rel_field, page=None, page_size=None
|
||
):
|
||
"""
|
||
Return a list of values for a related field
|
||
|
||
:param field: Marshmallow field
|
||
:param filter_rel_field: Filters for the related field
|
||
:param page: The page index
|
||
:param page_size: The page size
|
||
:return: (int, list) total record count and list of dict with id and value
|
||
"""
|
||
ret = list()
|
||
if isinstance(field, Related) or isinstance(field, RelatedList):
|
||
datamodel = self.datamodel.get_related_interface(field.name)
|
||
filters = datamodel.get_filters(datamodel.get_search_columns_list())
|
||
page, page_size = self._sanitize_page_args(page, page_size)
|
||
order_field = self.order_rel_fields.get(field.name)
|
||
if order_field:
|
||
order_column, order_direction = order_field
|
||
else:
|
||
order_column, order_direction = "", ""
|
||
if filter_rel_field:
|
||
filters = filters.add_filter_list(filter_rel_field)
|
||
count, values = datamodel.query(
|
||
filters, order_column, order_direction, page=page, page_size=page_size
|
||
)
|
||
for value in values:
|
||
ret.append({"id": datamodel.get_pk_value(value), "value": str(value)})
|
||
return count, ret
|
||
|
||
def _merge_update_item(self, model_item, data):
|
||
"""
|
||
Merge a model with a python data structure
|
||
This is useful to turn PUT method into a PATCH also
|
||
:param model_item: SQLA Model
|
||
:param data: python data structure
|
||
:return: python data structure
|
||
"""
|
||
data_item = self.edit_model_schema.dump(model_item, many=False).data
|
||
for _col in data_item:
|
||
if _col not in data.keys():
|
||
data[_col] = data_item[_col]
|
||
return data
|
||
|