project.go 36 KB


  1. package front
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/tealeg/xlsx"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "io/ioutil"
  8. "math"
  9. "mime/multipart"
  10. "mongodb"
  11. qu "qfw/util"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. "util"
  18. )
  19. // ProjectList 项目列表
  20. func (f *Front) ProjectList() {
  21. defer qu.Catch()
  22. if f.Method() == "POST" {
  23. start, _ := f.GetInteger("start")
  24. limit, _ := f.GetInteger("length")
  25. draw, _ := f.GetInteger("draw")
  26. status := f.GetString("status")
  27. searchStr := f.GetString("search[value]")
  28. search := strings.TrimSpace(searchStr)
  29. //data := util.GetPostForm(f.Request)
  30. query := map[string]interface{}{}
  31. if status != "-1" {
  32. query["s_status"] = status
  33. }
  34. if search != "" {
  35. query["$or"] = []interface{}{
  36. map[string]interface{}{"s_name": map[string]interface{}{"$regex": search}},
  37. map[string]interface{}{"s_entname": map[string]interface{}{"$regex": search}},
  38. map[string]interface{}{"s_rule": map[string]interface{}{"$regex": search}},
  39. map[string]interface{}{"s_departname": map[string]interface{}{"$regex": search}},
  40. }
  41. }
  42. list, _ := util.Mgo.Find(util.PROJECTCOLLNAME, query, nil, nil, false, start, limit)
  43. count := util.Mgo.Count(util.PROJECTCOLLNAME, query)
  44. f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
  45. } else {
  46. query := map[string]interface{}{"s_type": "tag"}
  47. info, _ := util.Mgo.Find("v_field", query, nil, map[string]interface{}{"s_name": 1, "s_code": 1}, false, -1, -1)
  48. f.T["fields"] = *info
  49. _ = f.Render("project/project_list.html", &f.T)
  50. }
  51. }
  52. // ProjectSave 项目保存
  53. func (f *Front) ProjectSave() {
  54. defer qu.Catch()
  55. s_name := f.GetString("s_name") //项目名称
  56. if s_name == "" {
  57. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少项目名称字段"})
  58. return
  59. }
  60. success := false //导入数据是否成功
  61. msg := "" //异常信息
  62. successNum := int64(0) //导入成功条数
  63. importDataNum := 0 //查询数量
  64. var s_rulename []string //规则
  65. user := f.GetSession("user").(map[string]interface{})
  66. username := qu.ObjToString(user["s_name"]) //当前登录用户
  67. stype := f.GetString("s_type") //新建项目类型:数据库导入、excel导入
  68. s_sourceinfo := f.GetString("s_sourceinfo") //数据表
  69. s_sourceinfo = "f_sourceinfo_" + s_sourceinfo
  70. if s_sourceinfo == "" {
  71. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少数据存储表名"})
  72. return
  73. }
  74. s_departname, s_entname := "", ""
  75. query := map[string]interface{}{
  76. "s_name": s_name,
  77. }
  78. set := map[string]interface{}{}
  79. //导入数据
  80. if stype == "excel" { //excel导入
  81. s_entname = f.GetString("s_entname") //公司名称
  82. if s_entname == "" {
  83. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少公司名称字段"})
  84. return
  85. }
  86. s_departname = f.GetString("s_departname") //部门名称
  87. rulename := f.GetString("s_rulename") //规则名称
  88. s_rulename = strings.Split(rulename, ",")
  89. mf, _, err := f.GetFile("xlsx")
  90. if err == nil {
  91. importDataNum = ImportDataByExcel(s_sourceinfo, mf, &success, &msg, &successNum)
  92. }
  93. //保存项目信息
  94. set = map[string]interface{}{
  95. "s_name": s_name, //项目名称
  96. "s_entname": s_entname, //公司名称
  97. "s_departname": s_departname, //部门名称
  98. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  99. "i_importnum": importDataNum, //导入数量
  100. "s_sourceinfo": s_sourceinfo, //源数据表
  101. "s_createname": username, //创建人
  102. "s_status": "未开始", //项目状态
  103. "i_createtime": time.Now().Unix(), //创建时间
  104. "s_importtype": "excel", //导入类型
  105. }
  106. } else if stype == "coll" { //数据库导入
  107. historyid := f.GetString("s_historyid")
  108. if historyid == "" {
  109. f.ServeJson(map[string]interface{}{"success": false, "msg": "数据导出ID字段"})
  110. return
  111. }
  112. s_departname, s_entname, s_rulename, importDataNum = ImportDataByColl(s_sourceinfo, historyid, &success, &msg, &successNum)
  113. qu.Debug(s_departname, s_entname, s_rulename, importDataNum)
  114. //保存项目信息
  115. set = map[string]interface{}{
  116. "s_name": s_name, //项目名称
  117. "s_entname": s_entname, //公司名称
  118. "s_departname": s_departname, //部门名称
  119. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  120. "i_importnum": importDataNum, //导入数量
  121. "s_sourceinfo": s_sourceinfo, //源数据表
  122. "s_createname": username, //创建人
  123. "s_status": "未开始", //项目状态
  124. "i_createtime": time.Now().Unix(), //创建时间
  125. "s_importtype": "coll", //导入类型
  126. "s_historyid": historyid, //源数据集标识
  127. }
  128. } else if stype == "edit" { //编辑保存
  129. success = true
  130. //s_entname = f.GetString("s_entname") //公司名称
  131. s_departname = f.GetString("s_departname") //部门名称
  132. rulename := f.GetString("s_rulename") //规则名称
  133. s_rulename = strings.Split(rulename, ",")
  134. s_personname := f.GetString("s_personname")
  135. fields := f.GetString("v_fields")
  136. v_fields := map[string]interface{}{}
  137. if err := json.Unmarshal([]byte(fields), &v_fields); err != nil {
  138. qu.Debug("V_Filelds Unmarshal Failed:", err)
  139. f.ServeJson(map[string]interface{}{"success": false})
  140. return
  141. }
  142. set = map[string]interface{}{
  143. //"s_name": s_name, //项目名称
  144. //"s_entname": s_entname, //公司名称
  145. "s_departname": s_departname, //部门名称
  146. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  147. "v_fields": v_fields, //标注字段
  148. "i_updatetime": username, //更新人
  149. "i_createtime": time.Now().Unix(), //更新时间
  150. "s_personname": s_personname, //售后人员
  151. //"i_starttime":,//开始时间
  152. //"i_completetime",//结束时间
  153. }
  154. }
  155. if success {
  156. success = util.Mgo.Update(util.PROJECTCOLLNAME, query, map[string]interface{}{"$set": set}, true, false)
  157. qu.Debug("Save Project:", success)
  158. if !success { //保存项目失败
  159. msg = "新建项目失败\n" + msg
  160. } else {
  161. msg = "新建项目成功\n" + msg
  162. }
  163. }
  164. qu.Debug("Create Project:", success, "importnum:", importDataNum, "successnum:", successNum, "failnum:", int64(importDataNum)-successNum)
  165. qu.Debug("Msg:", msg)
  166. //返回信息
  167. if stype == "edit" {
  168. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  169. } else {
  170. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "importnum": importDataNum, "successnum": successNum, "failnum": int64(importDataNum) - successNum})
  171. }
  172. }
  173. // ProjectClear 项目清洗
  174. func (f *Front) ProjectClear() {
  175. defer qu.Catch()
  176. if f.Method() == "POST" {
  177. projectid := f.GetString("s_projectid") //项目id
  178. project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"s_status": 1, "s_sourceinfo": 1})
  179. if project != nil && len(*project) > 0 {
  180. if status := qu.ObjToString((*project)["s_status"]); status == "未开始" {
  181. //TODO:调用数据质量评估接口
  182. //点击清洗更新项目状态为进行中
  183. b := util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"$set": map[string]interface{}{"s_status": "进行中", "i_starttime": time.Now().Unix()}})
  184. qu.Debug("Update Porject:"+projectid+" Status Success:", b)
  185. }
  186. } else {
  187. qu.Debug("Search Porject Failed:", projectid)
  188. f.ServeJson("查询项目信息失败")
  189. return
  190. }
  191. sourceinfo := qu.ObjToString((*project)["s_sourceinfo"]) //数据源表
  192. noTagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false}) //达标数据总量
  193. noTagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgive": true}) //达标数据已分发量
  194. noTagNoGiveDataNum := noTagAllDataNum - noTagGiveDataNum //达标待分发量
  195. tagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true}) //未达标数据总量
  196. tagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_isgive": true}) //未达标数据已分发量
  197. tagNoGiveDataNum := tagAllDataNum - tagGiveDataNum //未达标待分发量
  198. allGiveDataNum := noTagGiveDataNum + tagGiveDataNum //总分发量
  199. allNoGiveDataNum := noTagNoGiveDataNum + tagNoGiveDataNum //总待分发量
  200. allDataNum := allGiveDataNum + allNoGiveDataNum
  201. data := make(map[string]interface{})
  202. data["allDataNum"] = allDataNum
  203. data["allGiveDataNum"] = allGiveDataNum
  204. data["allNoGiveDataNum"] = allNoGiveDataNum
  205. data["noTagAllDataNum"] = noTagAllDataNum
  206. data["noTagGiveDataNum"] = noTagGiveDataNum
  207. data["noTagNoGiveDataNum"] = noTagNoGiveDataNum
  208. data["tagAllDataNum"] = tagAllDataNum
  209. data["tagGiveDataNum"] = tagGiveDataNum
  210. data["tagNoGiveDataNum"] = tagNoGiveDataNum
  211. data["s_projectid"] = projectid
  212. qu.Debug(data)
  213. f.ServeJson(data)
  214. } else {
  215. pid := f.GetString("pid")
  216. f.T["s_projectid"] = pid
  217. _ = f.Render("project/project_clear.html", &f.T)
  218. }
  219. }
  220. // ProjectTaskList 用户组任务分发列表
  221. func (f *Front) ProjectTaskList() {
  222. defer qu.Catch()
  223. projectid := f.GetString("s_projectid") //项目id
  224. status := f.GetString("s_status") //任务状态
  225. searchStr := f.GetString("search[value]")
  226. search := strings.TrimSpace(searchStr)
  227. start, _ := f.GetInteger("start")
  228. limit, _ := f.GetInteger("length")
  229. draw, _ := f.GetInteger("draw")
  230. query := map[string]interface{}{ //查找用户组任务
  231. "s_projectid": projectid,
  232. "s_stype": "group",
  233. }
  234. if status != "-1" {
  235. query["s_status"] = status
  236. }
  237. if search != "" {
  238. query["$or"] = []interface{}{
  239. map[string]interface{}{"s_groupname": map[string]interface{}{"$regex": search}},
  240. }
  241. }
  242. list, _ := util.Mgo.Find(util.TASKCOLLNAME, query, nil, nil, false, start, limit)
  243. count := util.Mgo.Count(util.TASKCOLLNAME, query)
  244. for _, l := range *list {
  245. if status := qu.ObjToString(l["s_status"]); status == "进行中" { //更新任务进度
  246. groupId := qu.ObjToString(l["s_groupid"])
  247. giveNum := qu.IntAll(l["i_givenum"])
  248. sourceinfo := qu.ObjToString(l["s_sourceinfo"])
  249. tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": true})
  250. progress := fmt.Sprint(math.Ceil(float64(tagNum)/float64(giveNum))) + "%"
  251. l["s_progress"] = progress
  252. //同步数据库
  253. util.Mgo.UpdateById(util.TASKCOLLNAME, l["_id"], map[string]interface{}{"$set": map[string]interface{}{"s_progress": progress}})
  254. }
  255. }
  256. f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
  257. }
  258. // ProjectTaskSave 用户组任务分发
  259. func (f *Front) ProjectTaskSave() {
  260. defer qu.Catch()
  261. var groupArr []map[string]interface{}
  262. var taskArr []map[string]interface{}
  263. var groupIdArr []string
  264. groupIdTask := map[string]util.Task{}
  265. success := false
  266. msg := ""
  267. user := f.GetSession("user").(map[string]interface{})
  268. username := qu.ObjToString(user["s_name"]) //当前登录用户
  269. projectid := f.GetString("s_projectid") //项目标识
  270. projectname := f.GetString("s_projectname") //项目名称
  271. sourceinfo := f.GetString("s_sourceinfo") //源数据表
  272. sourcetaskinfo := "f_sourcetaskinfo_" + strings.ReplaceAll(sourceinfo, "f_sourceinfo_", "") //任务日志表
  273. group := f.GetString("s_group")
  274. stype := f.GetString("s_type")
  275. if err := json.Unmarshal([]byte(group), &groupArr); err != nil {
  276. qu.Debug("V_Filelds Unmarshal Failed:", err)
  277. msg = "用户组信息解析失败"
  278. } else {
  279. if stype == "notag" { //如果分发的是达标数据且进行了初步质检,将没有质检记录的字段从v_taginfo标注记录中删除
  280. DeleleDataTagInfo(sourceinfo)
  281. }
  282. for _, groupInfo := range groupArr {
  283. groupId := qu.ObjToString(groupInfo["s_groupid"])
  284. groupIdArr = append(groupIdArr, groupId)
  285. givenum := qu.IntAll(groupInfo["i_givenum"])
  286. //groupIdMap[groupId] = givenum
  287. _id := primitive.NewObjectID()
  288. sid := mongodb.BsonIdToSId(_id)
  289. gt := util.Task{
  290. GroupId: groupId,
  291. GiveNum: givenum,
  292. }
  293. groupIdTask[sid] = gt
  294. groupTask := map[string]interface{}{
  295. "_id": _id, //生成任务id
  296. "s_projectid": projectid, //项目标识
  297. "s_projectname": projectname, //项目名称
  298. "s_status": "未开始", //任务状态
  299. "s_personid": qu.ObjToString(groupInfo["s_personid"]), //任务负责人标识
  300. "s_personname": qu.ObjToString(groupInfo["s_personname"]), //任务负责人
  301. "s_groupname": qu.ObjToString(groupInfo["s_groupname"]), //用户组名称
  302. "s_groupid": groupId, //用户组标识
  303. "i_givenum": givenum, //分发数据量
  304. "s_createname": username, //创建人
  305. "i_createtime": time.Now().Unix(), //创建时间
  306. "s_progress": "0%", //完成进度
  307. "s_sourceinfo": sourceinfo, //源数据表
  308. "s_sourcetaskinfo": sourcetaskinfo, //任务日志表
  309. "s_stype": "group", //任务类型
  310. }
  311. taskArr = append(taskArr, groupTask)
  312. }
  313. }
  314. if len(taskArr) > 0 {
  315. //分发数据后更新项目中用户组标识信息
  316. success = util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{
  317. "$push": map[string]interface{}{
  318. "v_groupids": map[string]interface{}{
  319. "$each": groupIdArr,
  320. },
  321. },
  322. })
  323. if !success {
  324. msg = "更新项目:" + projectname + "用户组标识失败"
  325. } else { //用户组分发任务
  326. success = util.Mgo.SaveBulk(util.TASKCOLLNAME, taskArr...)
  327. if success {
  328. msg = "任务分发成功"
  329. UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype, groupIdTask) //用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
  330. }
  331. }
  332. }
  333. qu.Debug("Msg:", msg)
  334. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  335. }
  336. // ProjectGetEntnameList 模糊查询公司名称
  337. func (f *Front) ProjectGetEntnameList() {
  338. defer qu.Catch()
  339. var entnameList []string
  340. entname := f.GetString("entname")
  341. query := map[string]interface{}{
  342. "username": map[string]interface{}{
  343. "$regex": entname,
  344. },
  345. }
  346. list, _ := util.MgoJy.Find(util.JyUser, query, nil, map[string]interface{}{"username": 1}, false, -1, -1)
  347. for _, l := range *list {
  348. entnameList = append(entnameList, qu.ObjToString(l["username"]))
  349. }
  350. f.ServeJson(map[string]interface{}{"entname": entnameList})
  351. }
  352. // ProjectTaskRepulse 用户组任务打回
  353. // TODO 关联用户组下所有用户任务的打回
  354. func (f *Front) ProjectTaskRepulse() {
  355. defer qu.Catch()
  356. success := false
  357. msg := ""
  358. user := f.GetSession("user").(map[string]interface{})
  359. username := qu.ObjToString(user["s_name"])
  360. status := f.GetString("s_status")
  361. taskId := f.GetString("id")
  362. groupId := f.GetString("s_groupid")
  363. sourceinfo := f.GetString("s_sourceinfo")
  364. sourcetaskinfo := f.GetString("s_sourcetaskinfo")
  365. if status == "已完成" {
  366. //更新数据状态
  367. //1、更新源数据表
  368. success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId}, map[string]interface{}{
  369. "$set": map[string]interface{}{
  370. "b_istag": false,
  371. "i_updatetime": time.Now().Unix(),
  372. },
  373. "$unset": map[string]interface{}{
  374. "s_userid": "",
  375. },
  376. }, false, true)
  377. //2、删除临时任务表中对应数据
  378. success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId})
  379. if success1 && success2 {
  380. //清除最迟完成时间,更新任务状态
  381. success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
  382. "$set": map[string]interface{}{
  383. "s_status": "未开始",
  384. "s_updateperson": username,
  385. "i_updatetime": time.Now().Unix(),
  386. "s_progress": "0%",
  387. },
  388. "$unset": map[string]interface{}{
  389. "i_completetime": "",
  390. },
  391. })
  392. if !success {
  393. msg += "更新任务:" + taskId + "失败"
  394. }
  395. } else {
  396. qu.Debug("Update "+sourceinfo+":", success1, " Delete "+sourcetaskinfo+":", success2)
  397. if !success1 {
  398. msg += "更新" + sourceinfo + "数据失败;"
  399. }
  400. if !success2 {
  401. msg += "删除" + sourcetaskinfo + "数据失败;"
  402. }
  403. }
  404. }
  405. qu.Debug("Task Repulse:", success, " Msg:", msg)
  406. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  407. }
  408. // ProjectTaskRetrieve 用户组任务收回
  409. // TODO 关联用户组下所有用户任务的收回
  410. func (f *Front) ProjectTaskRetrieve() {
  411. defer qu.Catch()
  412. success := false
  413. msg := ""
  414. num := 0
  415. user := f.GetSession("user").(map[string]interface{})
  416. username := qu.ObjToString(user["s_name"])
  417. status := f.GetString("s_status")
  418. taskId := f.GetString("id")
  419. groupId := f.GetString("s_groupid")
  420. sourceinfo := f.GetString("s_sourceinfo")
  421. sourcetaskinfo := f.GetString("s_sourcetaskinfo")
  422. if status == "已完成" {
  423. count1 := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false})
  424. count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
  425. if count1 != count2 { //数据源表和临时表数量不一致
  426. qu.Debug("Count Is Not Same:", sourceinfo+":", count1, sourceinfo+":", count2)
  427. }
  428. num = count1
  429. if count1 == 0 { //收回数据量为0
  430. success = true
  431. //更新任务状态、完成时间
  432. success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
  433. "$set": map[string]interface{}{
  434. "s_status": "已完成",
  435. "s_updateperson": username,
  436. "i_updatetime": time.Now().Unix(),
  437. "i_completetime": time.Now().Unix(),
  438. "s_progress": "100%",
  439. },
  440. })
  441. if !success {
  442. msg += "更新任务:" + taskId + "失败"
  443. }
  444. } else {
  445. //1、更新源数据表
  446. success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false}, map[string]interface{}{
  447. "$set": map[string]interface{}{
  448. "b_isgive": false,
  449. "i_updatetime": time.Now().Unix(),
  450. },
  451. "$unset": map[string]interface{}{
  452. "s_groupid": "",
  453. },
  454. }, false, true)
  455. //2、删除临时任务表中对应未完成数据
  456. success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
  457. if success1 && success2 {
  458. //更新任务状态、完成时间
  459. success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
  460. "$set": map[string]interface{}{
  461. "s_status": "已完成",
  462. "s_updateperson": username,
  463. "i_updatetime": time.Now().Unix(),
  464. "i_completetime": time.Now().Unix(),
  465. },
  466. })
  467. if !success {
  468. msg += "更新任务:" + taskId + "失败"
  469. }
  470. } else {
  471. qu.Debug("Update "+sourceinfo+":", success1, " Delete "+sourcetaskinfo+":", success2)
  472. if !success1 {
  473. msg += "更新" + sourceinfo + "数据失败;"
  474. }
  475. if !success2 {
  476. msg += "删除" + sourcetaskinfo + "数据失败;"
  477. }
  478. }
  479. }
  480. }
  481. qu.Debug("Task Retrieve:", success, " num:", num, " Msg:", msg)
  482. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "num": num})
  483. }
  484. // ProjectTaskClose 用户组任务关闭
  485. func (f *Front) ProjectTaskClose() {
  486. defer qu.Catch()
  487. success := false
  488. user := f.GetSession("user").(map[string]interface{})
  489. username := qu.ObjToString(user["s_name"])
  490. status := f.GetString("s_status")
  491. taskId := f.GetString("id")
  492. if status == "已完成" {
  493. //更新任务状态
  494. success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
  495. "$set": map[string]interface{}{
  496. "s_status": "已关闭",
  497. "s_updateperson": username,
  498. "i_updatetime": time.Now().Unix(),
  499. "s_progress": "100%",
  500. },
  501. })
  502. }
  503. f.ServeJson(map[string]interface{}{"success": success})
  504. }
  505. // DeleleDataTagInfo 删除标注记录
  506. func DeleleDataTagInfo(sourceinfo string) {
  507. defer qu.Catch()
  508. sess := util.Mgo.GetMgoConn()
  509. defer util.Mgo.DestoryMongoConn(sess)
  510. ch := make(chan bool, 5)
  511. wg := &sync.WaitGroup{}
  512. lock := &sync.Mutex{}
  513. query := map[string]interface{}{ //达标数据可能会分发后收回、打回再分发
  514. "b_istagging": false, //达标数据
  515. "b_cleartag": false, //未进行一次标注信息清理
  516. }
  517. fields := map[string]interface{}{
  518. "v_taginfo": 1,
  519. "v_check": 1,
  520. }
  521. updateArr := [][]map[string]interface{}{}
  522. it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Iter()
  523. count, _ := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Count()
  524. qu.Debug("Find Needs To Clearn Data Count:", count)
  525. n := 0
  526. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  527. ch <- true
  528. wg.Add(1)
  529. go func(tmp map[string]interface{}) {
  530. defer func() {
  531. <-ch
  532. wg.Done()
  533. }()
  534. update := []map[string]interface{}{}
  535. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  536. tagInfo, _ := tmp["v_taginfo"].(map[string]interface{})
  537. checkInfo, _ := tmp["v_check"].(map[string]interface{})
  538. if len(tagInfo) == 0 || len(checkInfo) == 0 {
  539. return
  540. }
  541. for f, _ := range tagInfo {
  542. if checkInfo[f] == nil {
  543. delete(tagInfo, f)
  544. }
  545. }
  546. update = append(update, map[string]interface{}{
  547. "$set": map[string]interface{}{
  548. "v_taginfo": tagInfo,
  549. "b_cleartag": true,
  550. },
  551. })
  552. lock.Lock()
  553. updateArr = append(updateArr, update)
  554. if len(updateArr) > 500 {
  555. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  556. updateArr = [][]map[string]interface{}{}
  557. }
  558. lock.Unlock()
  559. }(tmp)
  560. if n%100 == 0 {
  561. qu.Debug("current:", n)
  562. }
  563. tmp = map[string]interface{}{}
  564. }
  565. wg.Wait()
  566. lock.Lock()
  567. if len(updateArr) > 0 {
  568. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  569. updateArr = [][]map[string]interface{}{}
  570. }
  571. lock.Unlock()
  572. }
  573. // UpdateSourceinfo 用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
  574. func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[string]util.Task) {
  575. defer qu.Catch()
  576. for groupTaskId, tInfo := range groupIdInfo {
  577. groupid := tInfo.GroupId
  578. num := tInfo.GiveNum
  579. sess := util.Mgo.GetMgoConn()
  580. defer util.Mgo.DestoryMongoConn(sess)
  581. ch := make(chan bool, 5)
  582. wg := &sync.WaitGroup{}
  583. lock := &sync.Mutex{}
  584. query := map[string]interface{}{ //查找未分配对应stype的数据分发
  585. "b_isgive": false,
  586. }
  587. if stype == "notag" { //达标数据
  588. query["b_istagging"] = false
  589. } else if stype == "tag" { //未达标数据
  590. query["b_istagging"] = true
  591. }
  592. fields := map[string]interface{}{
  593. "title": 1,
  594. }
  595. saveArr := []map[string]interface{}{}
  596. updateArr := [][]map[string]interface{}{}
  597. qu.Debug("Query:", query)
  598. it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Limit(int64(num)).Iter()
  599. n := 0
  600. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  601. ch <- true
  602. wg.Add(1)
  603. go func(tmp map[string]interface{}) {
  604. defer func() {
  605. <-ch
  606. wg.Done()
  607. }()
  608. id := mongodb.BsonIdToSId(tmp["_id"])
  609. title := qu.ObjToString(tmp["title"])
  610. update := []map[string]interface{}{}
  611. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  612. update = append(update, map[string]interface{}{
  613. "$set": map[string]interface{}{
  614. "s_groupid": groupid,
  615. "b_isgive": true,
  616. "i_updatetime": time.Now().Unix(),
  617. },
  618. })
  619. save := map[string]interface{}{
  620. "s_infoid": id,
  621. "s_infotitle": title,
  622. "s_groupid": groupid,
  623. "i_createtime": time.Now().Unix(),
  624. "s_complete": false,
  625. "s_grouptaskid": groupTaskId,
  626. }
  627. lock.Lock()
  628. updateArr = append(updateArr, update)
  629. saveArr = append(saveArr, save)
  630. if len(updateArr) > 500 {
  631. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  632. updateArr = [][]map[string]interface{}{}
  633. }
  634. if len(saveArr) > 500 {
  635. util.Mgo.SaveBulk(sourcetaskinfo, saveArr...)
  636. saveArr = []map[string]interface{}{}
  637. }
  638. lock.Unlock()
  639. }(tmp)
  640. if n%100 == 0 {
  641. qu.Debug("current:", n)
  642. }
  643. tmp = map[string]interface{}{}
  644. }
  645. wg.Wait()
  646. lock.Lock()
  647. if len(updateArr) > 0 {
  648. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  649. updateArr = [][]map[string]interface{}{}
  650. }
  651. if len(saveArr) > 0 {
  652. util.Mgo.SaveBulk(sourcetaskinfo, saveArr...)
  653. saveArr = []map[string]interface{}{}
  654. }
  655. lock.Unlock()
  656. }
  657. }
  658. //ImportDataByExcel 通过excel获取数据源
  659. func ImportDataByExcel(s_sourceinfo string, mf multipart.File, success *bool, msg *string, successNum *int64) (importDataNum int) {
  660. defer qu.Catch()
  661. binary, _ := ioutil.ReadAll(mf)
  662. xls, _ := xlsx.OpenBinary(binary)
  663. sheet := xls.Sheets[0]
  664. rows := sheet.Rows
  665. idcolnum := -1
  666. cellFieldName := map[int]string{} //记录客户需求字段所在的列
  667. idInfoMap := map[string]map[string]interface{}{} //记录数据id及需要保存的字段信息
  668. for rn, row := range rows {
  669. if rn == 0 {
  670. for index, cell := range row.Cells {
  671. title := cell.Value
  672. if fieldName := util.CustomerFieldMap_HE[title]; fieldName != "" { //客户需求字段
  673. cellFieldName[index] = fieldName
  674. }
  675. if title == "唯一标识" || title == "信息标识" { //id所在列
  676. idcolnum = index
  677. }
  678. }
  679. if idcolnum == -1 {
  680. break
  681. }
  682. continue
  683. }
  684. if len(row.Cells) < len(rows[0].Cells) {
  685. break
  686. }
  687. tmp := map[string]interface{}{}
  688. for index, f := range cellFieldName {
  689. if val := row.Cells[index].Value; val != "" {
  690. if f == "capital" { //注册资金(万元)
  691. cf, _ := row.Cells[index].Float()
  692. tmp[f] = cf
  693. } else if f == "createtime" { //创建时间
  694. ci, _ := row.Cells[index].Int64()
  695. tmp[f] = ci
  696. } else {
  697. tmp[f] = val
  698. }
  699. }
  700. }
  701. id := row.Cells[idcolnum].String() //加密的id
  702. if id == "" {
  703. break
  704. }
  705. id = util.SE.DecodeString(id) //解密后id
  706. idInfoMap[id] = tmp
  707. }
  708. importDataNum = len(idInfoMap)
  709. qu.Debug("Load Excel Count:", importDataNum)
  710. if importDataNum > 0 {
  711. GetDataById(idInfoMap, "excel", s_sourceinfo, success, msg, successNum)
  712. } else {
  713. *success = false
  714. *msg = "查询数据失败"
  715. }
  716. idInfoMap = map[string]map[string]interface{}{}
  717. return
  718. }
  719. //ImportDataByColl 通过表获取数据源
  720. func ImportDataByColl(s_sourceinfo, historyid string, success *bool, msg *string, successNum *int64) (departname, entname string, rulename []string, importDataNum int) {
  721. defer qu.Catch()
  722. rulenameMap := map[string]bool{}
  723. sess := util.MgoJy.GetMgoConn()
  724. defer util.MgoJy.DestoryMongoConn(sess)
  725. ch := make(chan bool, 3)
  726. wg := &sync.WaitGroup{}
  727. lock := &sync.Mutex{}
  728. idInfoMap := map[string]map[string]interface{}{} //记录数据id及需要保存的字段信息
  729. query := map[string]interface{}{
  730. "historyId": historyid,
  731. }
  732. it := sess.DB(util.MgoJy.DbName).C(util.JyHistory).Find(&query).Iter()
  733. n := 0
  734. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  735. ch <- true
  736. wg.Add(1)
  737. go func(tmp map[string]interface{}) {
  738. defer func() {
  739. <-ch
  740. wg.Done()
  741. }()
  742. id := qu.ObjToString(tmp["id"]) //bidding id
  743. appid := qu.ObjToString(tmp["appid"]) //根据appid查user表获取公司名称
  744. departname = qu.ObjToString(tmp["departname"]) //部门名称。所有数据都应部门名称,若不一致,随机取
  745. needField := map[string]interface{}{}
  746. for f, _ := range util.CustomerFieldMap_EH {
  747. if tmp[f] != nil {
  748. needField[f] = tmp[f]
  749. }
  750. }
  751. if entname == "" { //获取一次公司名称即可
  752. user, _ := util.MgoJy.FindOne(util.JyUser, map[string]interface{}{"appid": appid})
  753. entname = qu.ObjToString((*user)["username"]) //公司名称
  754. }
  755. rname := qu.ObjToString(tmp["rulename"])
  756. lock.Lock()
  757. rulenameMap[rname] = true
  758. //rulename = append(rulename, qu.ObjToString(tmp["rulename"])) //规则名称
  759. idInfoMap[id] = needField
  760. lock.Unlock()
  761. }(tmp)
  762. if n%100 == 0 {
  763. qu.Debug("current:", n)
  764. }
  765. tmp = map[string]interface{}{}
  766. }
  767. wg.Wait()
  768. for r, _ := range rulenameMap {
  769. rulename = append(rulename, r)
  770. }
  771. importDataNum = len(idInfoMap) //查询数据总数
  772. if importDataNum > 0 {
  773. GetDataById(idInfoMap, "coll", s_sourceinfo, success, msg, successNum)
  774. } else {
  775. *msg = "查询数据失败"
  776. }
  777. idInfoMap = map[string]map[string]interface{}{}
  778. return
  779. }
  780. //GetDataById 通过id集从bidding、extract、project获取数据所有信息
  781. func GetDataById(idsInfo map[string]map[string]interface{}, importType, s_sourceinfo string, success *bool, msg *string, successNum *int64) {
  782. *success = true
  783. var msgArr []string
  784. wg := &sync.WaitGroup{}
  785. lock := &sync.Mutex{}
  786. ch := make(chan bool, 10)
  787. num := int64(0) //计数
  788. for id, info := range idsInfo {
  789. wg.Add(1)
  790. ch <- true
  791. go func(id string, tmp map[string]interface{}) {
  792. defer func() {
  793. wg.Done()
  794. <-ch
  795. }()
  796. /*
  797. 1.查bidding
  798. 2.查extract
  799. 3.extract合并到bidding(删除item字段与客户需要的item不是一个含义)
  800. 4.对比marked表,替换已标注过的字段值,补充标记
  801. 5.合并客户所需字段信息,补充id字段
  802. //6.若为同步时,删除原有id对应的信息,新增该id对应的_id信息
  803. 6.mgo查询项目信息
  804. */
  805. tagInfoMap := map[string]interface{}{} //记录数据已标注过的信息
  806. baseInfoMap := map[string]interface{}{} //记录其他信息
  807. //1.查bidding
  808. tmpBidColl := util.BidColl1 //bidding
  809. //查询bidding
  810. if id < util.BIDDINGSTARTID {
  811. tmpBidColl = util.BidColl2 //bidding_back
  812. }
  813. bidData, _ := util.MgoB.FindById(tmpBidColl, id, nil)
  814. if bidData != nil && len(*bidData) > 0 { //bidding表数据存在
  815. //2.查extract
  816. extData, _ := util.MgoE.FindById(util.ExtColl1, id, nil)
  817. if extData == nil || len(*extData) == 0 {
  818. extData, _ = util.MgoE.FindById(util.ExtColl2, id, nil)
  819. }
  820. //抽取表字段合并到bidding
  821. if extData != nil && len(*extData) > 0 {
  822. for k, v := range *extData {
  823. (*bidData)[k] = v
  824. }
  825. }
  826. //3.删除item
  827. //删除item
  828. delete((*bidData), "item")
  829. //4.对比marked表,对比marked表是否已标注该数据
  830. markData, _ := util.Mgo.FindById(util.AllToColl, id, nil)
  831. if markData != nil && len(*markData) > 0 {
  832. UpdateMarkColl(bidData, markData, &tagInfoMap, &baseInfoMap) //比对更新数据
  833. } else {
  834. baseInfoMap["i_ckdata"] = 0 //设置ck_data默认值0
  835. //多包、中标候选人、标的信息是否抽取
  836. //if packageMap, ok := (*bidData)["package"].(map[string]interface{}); ok && len(packageMap) > 0 {
  837. // baseInfoMap["b_pkgisext"] = true
  838. //} else {
  839. // baseInfoMap["b_pkgisext"] = false
  840. //}
  841. //if winorderArr, ok := (*bidData)["winnerorder"].([]interface{}); ok && len(winorderArr) > 0 {
  842. // baseInfoMap["b_wodrisext"] = true
  843. //} else {
  844. // baseInfoMap["b_wodrisext"] = false
  845. //}
  846. //if purchArr, ok := (*bidData)["purchasinglist"].([]interface{}); ok && len(purchArr) > 0 {
  847. // baseInfoMap["b_pclisext"] = true
  848. //} else {
  849. // baseInfoMap["b_pclisext"] = false
  850. //}
  851. }
  852. //合并导入表中客户所需的字段
  853. if len(tmp) > 0 {
  854. for k, v := range tmp {
  855. (*bidData)[k] = v
  856. }
  857. }
  858. //补充id
  859. //(*bidData)["id"] = id
  860. //if stype == "syncoll" { //同步数据时删除原始数据
  861. // if util.MgoM.Delete(coll, `{"id":"`+id+`"}`) == 0 {
  862. // lock.Lock()
  863. // *msg += "同步未删除成功数据id:" + id + ";\n"
  864. // *success = false
  865. // lock.Unlock()
  866. // }
  867. //}
  868. // 处理 package winner_all
  869. if p, o1 := (*bidData)["package"].(map[string]interface{}); o1 {
  870. for _, v := range p {
  871. v1 := v.(map[string]interface{})
  872. t := make(map[string]interface{})
  873. if v1["winner"] != nil {
  874. t["winner"] = v1["winner"]
  875. }
  876. if v1["bidamount"] != nil {
  877. t["bidamount"] = qu.Float64All(v1["bidamount"])
  878. }
  879. if len(t) > 0 {
  880. v1["winner_all"] = append([]map[string]interface{}{}, t)
  881. }
  882. }
  883. }
  884. // 补充filetext
  885. (*bidData)["filetext"] = util.GetFileText(*bidData)
  886. // 6.es查询项目合并信息
  887. //esQ := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
  888. //info := util.Es.Get("projectset", "projectset", esQ)
  889. projectId := qu.ObjToString((*bidData)["projectId"])
  890. project, _ := util.MgoE.FindById(util.ProjectColl, projectId, map[string]interface{}{"ids": 1})
  891. if project != nil && len(*project) > 0 {
  892. ids := qu.ObjArrToStringArr((*project)["ids"].([]interface{}))
  893. if len(ids) > 0 {
  894. var infolist []map[string]interface{}
  895. for _, v := range ids {
  896. if v == id { // 当前公告
  897. continue
  898. }
  899. if v < util.BIDDINGSTARTID {
  900. tmpBidColl = util.BidColl2 //bidding_back
  901. }
  902. bid, b := util.MgoB.FindById(tmpBidColl, v, nil)
  903. if b && len(*bid) > 0 {
  904. tmp := make(map[string]interface{})
  905. tmp["id"] = v
  906. tmp["title"] = (*bid)["title"]
  907. tmp["href"] = (*bid)["href"]
  908. tmp["toptype"] = (*bid)["toptype"]
  909. tmp["subtype"] = (*bid)["subtype"]
  910. tmp["publishtime"] = (*bid)["publishtime"]
  911. tmp["detail"] = (*bid)["detail"]
  912. tmp["filetext"] = util.GetFileText(*bid)
  913. infolist = append(infolist, tmp)
  914. }
  915. }
  916. (*bidData)["info"] = infolist
  917. }
  918. } else {
  919. qu.Debug("Projectset Find Error", projectId)
  920. }
  921. baseInfoMap["id"] = id
  922. _id := (*bidData)["_id"]
  923. delete(*bidData, "_id")
  924. //保存数据
  925. baseInfoMap["_id"] = _id
  926. baseInfoMap["v_baseinfo"] = bidData
  927. if len(tagInfoMap) > 0 {
  928. baseInfoMap["v_taginfo"] = tagInfoMap
  929. }
  930. baseInfoMap["i_createtime"] = time.Now().Unix()
  931. baseInfoMap["b_isgive"] = false //是否分配
  932. baseInfoMap["b_istag"] = false //是否已标注
  933. baseInfoMap["b_cleartag"] = false //是否清理标注信息
  934. if util.Mgo.SaveByOriID(s_sourceinfo, baseInfoMap) {
  935. atomic.AddInt64(successNum, 1) //保存成功计数
  936. } else {
  937. lock.Lock()
  938. *success = false
  939. if importType == "excel" {
  940. msgArr = append(msgArr, "第"+fmt.Sprint(num+2)+"行未导入id:"+id)
  941. //*msg += "第" + fmt.Sprint(num+2) + "行未保存成功数据_id:" + id + ";\n"
  942. } else {
  943. msgArr = append(msgArr, "未导入id:"+id)
  944. //*msg += "未保存成功数据_id:" + id + ";\n"
  945. }
  946. lock.Unlock()
  947. }
  948. } else {
  949. lock.Lock()
  950. *success = false
  951. if importType == "excel" {
  952. msgArr = append(msgArr, "第"+fmt.Sprint(num+2)+"行未查询id:"+id)
  953. //*msg += "第" + fmt.Sprint(num+2) + "行未查询到数据:" + id + ";\n"
  954. } else {
  955. msgArr = append(msgArr, "未查询id:"+id)
  956. //*msg += "未查询到数据_id:" + id + ";\n"
  957. }
  958. lock.Unlock()
  959. }
  960. }(id, info)
  961. }
  962. wg.Wait()
  963. sort.Strings(msgArr)
  964. *msg = strings.Join(msgArr, ";\n")
  965. }
  966. // UpdateMarkColl 更新数据
  967. func UpdateMarkColl(bidData, markData, tagInfoMap, baseInfoMap *map[string]interface{}) {
  968. defer qu.Catch()
  969. ckdata := qu.IntAll((*markData)["i_ckdata"])
  970. v_taginfo := (*markData)["v_taginfo"].(map[string]interface{}) //标注信息
  971. v_datainfo := (*markData)["v_datainfo"].(map[string]interface{}) //基本信息
  972. for fk, _ := range v_taginfo {
  973. if v_datainfo[fk] != nil {
  974. (*bidData)[fk] = v_datainfo[fk] //字段更新
  975. }
  976. }
  977. (*tagInfoMap) = v_taginfo //marked中已有的标注信息保存到新数据上
  978. if ckdata == 2 { //某些字段已标注
  979. (*baseInfoMap)["i_ckdata"] = 0 //marked表中该条数据如果为字段验证,临时表ck_data:0;若为数据验证ck_data:1
  980. } else if ckdata == 1 {
  981. (*baseInfoMap)["i_ckdata"] = 1
  982. }
  983. }