# coding:utf-8 from apps.models import app from flask import request from apps.models import db from apps.models import Customer from apps.models import CustomerUpdateLogs from apps.models import CallRecords, CustomerFollowUp from typing import Dict from loguru import logger from apps.send_email import sendMail from apps.normal_field import FieldCleanMap import datetime from hashlib import md5 from docs.config import voice_secret_key import json import urllib.parse import re logger.add('./logs/runtime_{time}.log', rotation='00:00') SpecialField = {"regtime": "time", "lastlogintime": "time", "lastUpdateTime": "time", "isclickkf": "int", "isDelete": "int", "createTime": "time"} def insert_body(json_body: Dict) -> bool: try: customer = Customer(**json_body) db.session.add(customer) db.session.commit() except Exception as e: logger.warning(f"{json_body.get('unique_id')}更新失败--->{e}") db.session.rollback() sendMail(f"{json_body.get('unique_id')}插入失败--->{e}") return False return True def insert2db(db_object: Dict, errcode: str) -> bool: try: db.session.add(db_object) db.session.commit() except Exception as e: print(e) db.session.rollback() sendMail(f"{errcode}插入失败--->{e}") return False return True def update_body(exists_body, json_body: Dict) -> bool: try: for state, value in json_body.items(): setattr(exists_body, state, value) db.session.commit() except Exception as e: db.session.rollback() logger.warning(f"{json_body.get('unique_id')}更新失败--->{e}") sendMail(f"{json_body.get('unique_id')}更新失败--->{e}") return False return True def insert_logs(json_body: Dict) -> bool: try: logs = CustomerUpdateLogs(**json_body) db.session.add(logs) db.session.commit() except Exception as e: logger.warning(f"{json_body.get('unique_id')}日志更新失败--->{e}") db.session.rollback() sendMail(f"{json_body.get('unique_id')}日志插入失败--->{e}") return False return True def verify_code(verify_body: dict) -> bool: """ 请求头密钥验证 :param verify_body: :return: """ timestamp = verify_body.get("TimestampKey").strip() code = verify_body.get("EncryptionToken").strip() verify_data = str(timestamp) + voice_secret_key real_key = md5(verify_data.encode("utf-8")).hexdigest() if real_key == code: return True return False def replace_pic(body, fields): ''' 检查替换头像特殊编码 :param body: :param fields: :return: ''' for field in fields: if body.get(field): body[field] = re.sub(u'[\U00010000-\U0010ffff]', " ", body.get(field, "")) return body @app.route('/', methods=['POST']) def tell(): if request.method == 'POST': body = dict(request.form) print(body) if "phoneMemo" in body: del body["phoneMemo"] unique_id = body.get("unique_id", "") body = replace_pic(body, ["usernickname", "company", "username"]) if not unique_id: return "false" # 字段类型转换 change_fields = [] for field, f_type in SpecialField.items(): body = FieldCleanMap[f_type](body, field) # 更新时间 body["lastUpdateTime"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询 exists_body = Customer.query.filter(Customer.unique_id == unique_id).first() if not exists_body: body["createTime"] = body["lastUpdateTime"] state = insert_body(body) body["logCreateTime"] = body["lastUpdateTime"] body["updateStatus"] = 1 if state else 0 body["updateType"] = "插入" change_fields = [key for key, value in body.items()] body["changeField"] = ";".join(change_fields) insert_logs(body) else: try: for key, val in body.items(): if str(exists_body.__dict__.get(key)) != str(val): if key not in ['createTime', 'lastUpdateTime']: change_fields.append(key) except Exception as e: logger.warning(e) state = update_body(exists_body, body) if change_fields: body["logCreateTime"] = body["lastUpdateTime"] body["updateStatus"] = 1 if state else 0 body["updateType"] = "更新" body["changeField"] = ";".join(change_fields) insert_logs(body) else: print("未更新") if not state: return "false" return "200" @app.route('/', methods=['POST']) def tell(): pass @app.route('/voice', methods=['GET']) def voice_sync(): time_dict = {"Ring": "time", "RingingTime": "time", "Begin": "time", 'End': "time", 'QueueTime': "time"} if request.method == 'GET': body = dict(request.args) is_real = verify_code(body) if not is_real: return for field, f_type in time_dict.items(): body = FieldCleanMap[f_type](body, field) call_sheet_id = body.get("CallSheetID") body = replace_pic(body, ["CustomerName"]) callId = body["callId"] if callId: body["callHistoryId"] = callId exists_body = CallRecords.query.filter(CallRecords.CallSheetID == call_sheet_id).first() if exists_body: update_body(exists_body, body) return "succeed" call = CallRecords() this_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") setattr(call, "createTime", this_time) for attr_key, value in body.items(): if attr_key in CallRecords.__dict__: value = value.strip() setattr(call, attr_key, value) state = insert2db(call, f"语音同步失败-->{call_sheet_id}") if not state: return "error" return "succeed" @app.route('/followup', methods=['POST']) def followup(): TimeStr = {"followRemindTime": "time"} if request.method == 'POST': body = request.get_data() if body is None or body == "": return "200" body = json.loads(body) body = replace_pic(body, ["customerName"]) convert_body = {} for key, val in body.items(): convert_body[key] = urllib.parse.unquote(val) body = convert_body print(body) for field, f_type in SpecialField.items(): body = FieldCleanMap[f_type](body, field) customerId = body.get("customerId", "") if not customerId: return "200" try: body["createTime"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") customerFollowUp = CustomerFollowUp() for attr in ["customerId", "customerName", "createTime", "remark", "followRemindTime", "isNotFollow", "dataType", "dbTypeName"]: if attr in body: if attr in TimeStr: if not body[attr]: continue setattr(customerFollowUp, attr, body[attr]) state = insert2db(customerFollowUp, f"跟进客户{customerId}状态同步错误") if not state: return "error" except Exception as e: logger.warning(e) return "error" return "200" if __name__ == '__main__': app.run(host="0.0.0.0", port=15001, processes=True)