123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # -*- coding: utf-8 -*-
- """
- Created on 2018-08-28 17:38:43
- ---------
- @summary: 创建item
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import getpass
- import os
- import feapder.utils.tools as tools
- from feapder import setting
- from feapder.db.mysqldb import MysqlDB
- from .create_init import CreateInit
- def deal_file_info(file):
- file = file.replace("{DATE}", tools.get_current_date())
- file = file.replace("{USER}", getpass.getuser())
- return file
- class CreateItem:
- def __init__(self):
- self._db = MysqlDB()
- self._create_init = CreateInit()
- def select_columns(self, table_name):
- # sql = 'SHOW COLUMNS FROM ' + table_name
- sql = f"SELECT COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_DEFAULT, EXTRA, COLUMN_KEY, COLUMN_COMMENT FROM INFORMATION_SCHEMA.Columns WHERE table_name = '{table_name}' and table_schema = '{setting.MYSQL_DB}'"
- columns = self._db.find(sql)
- return columns
- def select_tables_name(self, tables_name):
- """
- @summary:
- ---------
- @param tables_name: 一类tables 如 qidian*
- ---------
- @result:
- """
- sql = f"select table_name from information_schema.tables where table_name like '{tables_name}' and table_schema = '{setting.MYSQL_DB}'"
- tables_name = self._db.find(sql)
- return tables_name
- def convert_table_name_to_hump(self, table_name):
- """
- @summary: 格式化表明为驼峰格式
- ---------
- @param table:
- ---------
- @result:
- """
- table_hump_format = ""
- words = table_name.split("_")
- for word in words:
- table_hump_format += word.capitalize() # 首字母大写
- return table_hump_format
- def get_item_template(self):
- template_path = os.path.abspath(
- os.path.join(__file__, "../../../templates/item_template.tmpl")
- )
- with open(template_path, "r", encoding="utf-8") as file:
- item_template = file.read()
- return item_template
- def create_item(self, item_template, columns, table_name, support_dict):
- table_name_hump_format = self.convert_table_name_to_hump(table_name)
- # 组装 类名
- item_template = item_template.replace("${item_name}", table_name_hump_format)
- if support_dict:
- item_template = item_template.replace("${table_name}", table_name + " 1")
- else:
- item_template = item_template.replace("${table_name}", table_name)
- # 组装 属性
- propertys = ""
- for column in columns:
- column_name = column[0]
- column_type = column[1]
- is_nullable = column[2]
- column_default = column[3]
- column_extra = column[4]
- column_key = column[5]
- column_comment = column[6]
- try:
- value = (
- "kwargs.get('{column_name}')".format(column_name=column_name)
- if support_dict
- else (
- column_default != "CURRENT_TIMESTAMP" and column_default or None
- )
- and eval(column_default)
- )
- except:
- value = (
- "kwargs.get('{column_name}')".format(column_name=column_name)
- if support_dict
- else (
- column_default != "CURRENT_TIMESTAMP" and column_default or None
- )
- and column_default
- )
- if column_extra == "auto_increment" or column_default is not None:
- propertys += f"# self.{column_name} = {value}"
- else:
- if value is None or isinstance(value, (float, int)) or support_dict:
- propertys += f"self.{column_name} = {value}"
- else:
- propertys += f"self.{column_name} = '{value}'"
- if column_comment:
- propertys += f" # {column_comment}"
- propertys += "\n" + " " * 8
- item_template = item_template.replace("${propertys}", propertys.strip())
- item_template = deal_file_info(item_template)
- return item_template
- def save_template_to_file(self, item_template, table_name):
- item_file = table_name + "_item.py"
- if os.path.exists(item_file):
- confirm = input("%s 文件已存在 是否覆盖 (y/n). " % item_file)
- if confirm != "y":
- print("取消覆盖 退出")
- return
- with open(item_file, "w", encoding="utf-8") as file:
- file.write(item_template)
- print("\n%s 生成成功" % item_file)
- self._create_init.create()
- def create(self, tables_name, support_dict):
- input_tables_name = tables_name
- tables_name = self.select_tables_name(tables_name)
- if not tables_name:
- print(tables_name)
- tip = "mysql数据库中无 %s 表 " % input_tables_name
- raise KeyError(tip)
- for table_name in tables_name:
- table_name = table_name[0]
- columns = self.select_columns(table_name)
- item_template = self.get_item_template()
- item_template = self.create_item(
- item_template, columns, table_name, support_dict
- )
- self.save_template_to_file(item_template, table_name)
|