forked from mirror/MrDoc
273 lines
12 KiB
Python
273 lines
12 KiB
Python
# coding:utf-8
|
||
# @文件: import_utils.py
|
||
# @创建者:州的先生
|
||
# #日期:2020/6/17
|
||
# 博客地址:zmister.com
|
||
# 文集导入相关方法
|
||
|
||
from django.utils.translation import gettext_lazy as _
|
||
from app_doc.models import Doc,Project,Image
|
||
from app_doc.util_upload_img import upload_generation_dir
|
||
from django.db import transaction
|
||
from django.conf import settings
|
||
from loguru import logger
|
||
from markdownify import markdownify
|
||
import mammoth
|
||
import shutil
|
||
import os
|
||
import time
|
||
import re
|
||
import yaml
|
||
|
||
|
||
# 导入Zip文集
|
||
class ImportZipProject():
|
||
# 读取 Zip 压缩包
|
||
def read_zip(self,zip_file_path,create_user):
|
||
# 导入流程:
|
||
# 1、解压zip压缩包文件到temp文件夹
|
||
# 2、遍历temp文件夹内的解压后的.md文件
|
||
# 3、读取.md文件的文本内容
|
||
# 4、如果里面匹配到相对路径的静态文件,从指定文件夹里面读取
|
||
# 5、上传图片,写入数据库,修改.md文件里面的url路径
|
||
|
||
# 新建一个临时文件夹,用于存放解压的文件
|
||
self.temp_dir = zip_file_path[:-3]
|
||
os.mkdir(self.temp_dir)
|
||
# 解压 zip 文件到指定临时文件夹
|
||
shutil.unpack_archive(zip_file_path, extract_dir=self.temp_dir)
|
||
|
||
# 处理文件夹和文件名的中文乱码
|
||
for root, dirs, files in os.walk(self.temp_dir):
|
||
for dir in dirs:
|
||
try:
|
||
new_dir = dir.encode('cp437').decode('gbk')
|
||
except:
|
||
new_dir = dir.encode('utf-8').decode('utf-8')
|
||
# print(new_dir)
|
||
os.rename(os.path.join(root, dir), os.path.join(root, new_dir))
|
||
|
||
for file in files:
|
||
try:
|
||
new_file = file.encode('cp437').decode('gbk')
|
||
except:
|
||
new_file = file.encode('utf-8').decode('utf-8')
|
||
# print(root, new_file)
|
||
os.rename(os.path.join(root, file), os.path.join(root, new_file))
|
||
|
||
# 读取yaml文件
|
||
try:
|
||
with open(os.path.join(self.temp_dir ,'mrdoc.yaml'),'r',encoding='utf-8') as yaml_file:
|
||
yaml_str = yaml.load(yaml_file.read())
|
||
project_name = yaml_str['project_name'] \
|
||
if 'project_name' in yaml_str.keys() else zip_file_path[:-4].split('/')[-1]
|
||
project_desc = yaml_str['project_desc'] if 'project_desc' in yaml_str.keys() else ''
|
||
project_role = yaml_str['project_role'] if 'project_role' in yaml_str.keys() else 1
|
||
editor_mode = yaml_str['editor_mode'] if 'editor_mode' in yaml_str.keys() else 1
|
||
project_toc = yaml_str['toc']
|
||
toc_item_list = []
|
||
for toc in project_toc:
|
||
# print(toc)
|
||
item = {
|
||
'name': toc['name'],
|
||
'file': toc['file'],
|
||
'parent': 0,
|
||
}
|
||
toc_item_list.append(item)
|
||
if 'children' in toc.keys():
|
||
for b in toc['children']:
|
||
item = {
|
||
'name': b['name'],
|
||
'file': b['file'],
|
||
'parent': toc['name']
|
||
}
|
||
toc_item_list.append(item)
|
||
if 'children' in b.keys():
|
||
for c in b['children']:
|
||
item = {
|
||
'name': c['name'],
|
||
'file': c['file'],
|
||
'parent': b['name']
|
||
}
|
||
toc_item_list.append(item)
|
||
|
||
|
||
except:
|
||
logger.error(_("未发现yaml文件"))
|
||
project_name = zip_file_path[:-4].split('/')[-1]
|
||
project_desc = ''
|
||
project_role = 1
|
||
editor_mode = 1
|
||
project_toc = False
|
||
|
||
# 开启事务
|
||
with transaction.atomic():
|
||
save_id = transaction.savepoint()
|
||
try:
|
||
# 新建文集
|
||
project = Project.objects.create(
|
||
name=project_name,
|
||
intro=project_desc,
|
||
role=project_role,
|
||
create_user=create_user
|
||
)
|
||
if project_toc is False:
|
||
# 遍历临时文件夹中的所有文件和文件夹
|
||
for f in os.listdir(self.temp_dir):
|
||
# 获取 .md 文件
|
||
if f.endswith('.md'):
|
||
# print(f)
|
||
# 读取 .md 文件文本内容
|
||
with open(os.path.join(self.temp_dir,f),'r',encoding='utf-8') as md_file:
|
||
md_content = md_file.read()
|
||
md_content = self.operat_md_media(md_content,create_user)
|
||
# 新建文档
|
||
doc = Doc.objects.create(
|
||
name = f[:-3],
|
||
pre_content = md_content,
|
||
top_doc = project.id,
|
||
status = 0,
|
||
editor_mode = editor_mode,
|
||
create_user = create_user
|
||
)
|
||
else:
|
||
for i in toc_item_list:
|
||
with open(os.path.join(self.temp_dir,i['file']),'r',encoding='utf-8') as md_file:
|
||
md_content = md_file.read()
|
||
md_content = self.operat_md_media(md_content, create_user)
|
||
# 新建文档
|
||
doc = Doc.objects.create(
|
||
name=i['name'],
|
||
pre_content=md_content,
|
||
top_doc=project.id,
|
||
parent_doc = (Doc.objects.get(top_doc=project.id,name=i['parent'])).id \
|
||
if i['parent'] != 0 else 0,
|
||
status=0,
|
||
editor_mode=editor_mode,
|
||
create_user=create_user
|
||
)
|
||
except:
|
||
logger.exception(_("解析导入文件异常"))
|
||
# 回滚事务
|
||
transaction.savepoint_rollback(save_id)
|
||
|
||
transaction.savepoint_commit(save_id)
|
||
try:
|
||
shutil.rmtree(self.temp_dir)
|
||
os.remove(zip_file_path)
|
||
return project.id
|
||
except:
|
||
logger.exception(_("删除临时文件异常"))
|
||
return None
|
||
|
||
# 处理MD内容中的静态文件
|
||
def operat_md_media(self,md_content,create_user):
|
||
# 查找MD内容中的静态文件
|
||
pattern = r"\!\[.*?\]\(.*?\)"
|
||
media_list = re.findall(pattern, md_content)
|
||
# print(media_list)
|
||
# 存在静态文件,进行遍历
|
||
if len(media_list) > 0:
|
||
for media in media_list:
|
||
media_filename = media.split("(")[-1].split(")")[0] # 媒体文件的文件名
|
||
# 存在本地图片路径
|
||
if media_filename.startswith("./") or media_filename.startswith("/"):
|
||
# 获取文件后缀
|
||
file_suffix = media_filename.split('.')[-1]
|
||
if file_suffix.lower() not in settings.ALLOWED_IMG:
|
||
continue
|
||
# 判断本地图片路径是否存在
|
||
if media_filename.startswith("./"):
|
||
temp_media_file_path = os.path.join(self.temp_dir,media_filename[2:])
|
||
else :
|
||
temp_media_file_path = os.path.join(self.temp_dir, media_filename[1:])
|
||
if os.path.exists(temp_media_file_path):
|
||
# 如果存在,上传本地图片
|
||
dir_name = upload_generation_dir() # 获取当月文件夹名称
|
||
|
||
# 复制文件到媒体文件夹
|
||
copy2_filename = dir_name + '/' + str(time.time()) + '.' + file_suffix
|
||
new_media_file_path = shutil.copy2(
|
||
temp_media_file_path,
|
||
settings.MEDIA_ROOT + copy2_filename
|
||
)
|
||
|
||
# 替换MD内容的静态文件链接
|
||
new_media_filename = new_media_file_path.split(settings.MEDIA_ROOT,1)[-1]
|
||
new_media_filename = '/media' + new_media_filename
|
||
|
||
# 图片数据写入数据库
|
||
Image.objects.create(
|
||
user=create_user,
|
||
file_path=new_media_filename,
|
||
file_name=str(time.time())+'.'+file_suffix,
|
||
remark=_('本地上传'),
|
||
)
|
||
md_content = md_content.replace(media_filename, new_media_filename)
|
||
else:
|
||
pass
|
||
return md_content
|
||
# 不存在静态文件,直接返回MD内容
|
||
else:
|
||
return md_content
|
||
|
||
|
||
# 导入Word文档(.docx)
|
||
class ImportDocxDoc():
|
||
def __init__(self,docx_file_path,editor_mode,create_user):
|
||
self.docx_file_path = docx_file_path # docx文件绝对路径
|
||
self.tmp_img_dir = self.docx_file_path.split('.')
|
||
self.create_user = create_user
|
||
self.editor_mode = int(editor_mode)
|
||
|
||
# 转存docx文件中的图片
|
||
def convert_img(self,image):
|
||
with image.open() as image_bytes:
|
||
file_suffix = image.content_type.split("/")[1]
|
||
file_time_name = str(time.time())
|
||
dir_name = upload_generation_dir() # 获取当月文件夹名称
|
||
# 图片在媒体文件夹内的路径,形如 /202012/12542542.jpg
|
||
copy2_filename = dir_name + '/' + file_time_name + '.' + file_suffix
|
||
# 文件的绝对路径 形如/home/MrDoc/media/202012/12542542.jpg
|
||
new_media_file_path = settings.MEDIA_ROOT + copy2_filename
|
||
# 图片文件的相对url路径
|
||
new_media_filename = '/media' + copy2_filename
|
||
|
||
# 图片数据写入数据库
|
||
Image.objects.create(
|
||
user=self.create_user,
|
||
file_path=new_media_filename,
|
||
file_name=file_time_name + '.' + file_suffix,
|
||
remark=_('本地上传'),
|
||
)
|
||
with open(new_media_file_path, 'wb') as f:
|
||
f.write(image_bytes.read())
|
||
return {"src": new_media_filename}
|
||
|
||
# 转换docx文件内容为HTML和Markdown
|
||
def convert_docx(self):
|
||
# 读取Word文件
|
||
with open(self.docx_file_path, "rb") as docx_file:
|
||
# 转化Word文档为HTML
|
||
result = mammoth.convert_to_html(docx_file, convert_image=mammoth.images.img_element(self.convert_img))
|
||
# 获取HTML内容
|
||
html = result.value
|
||
if self.editor_mode in [1,2]:
|
||
# 转化HTML为Markdown
|
||
md = markdownify(html, heading_style="ATX")
|
||
return md
|
||
else:
|
||
return html
|
||
|
||
def run(self):
|
||
try:
|
||
result = self.convert_docx()
|
||
os.remove(self.docx_file_path)
|
||
return {'status':True,'data':result}
|
||
except:
|
||
os.remove(self.docx_file_path)
|
||
return {'status':False,'data':_('读取异常')}
|
||
|
||
if __name__ == '__main__':
|
||
imp = ImportZipProject()
|
||
imp.read_zip(r"D:\Python XlsxWriter模块中文文档_2020-06-16.zip") |