project.go 54 KB


  1. package front
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/shopspring/decimal"
  6. "github.com/tealeg/xlsx"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. "io/ioutil"
  10. "mime/multipart"
  11. "mongodb"
  12. qu "qfw/util"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "sync/atomic"
  18. "time"
  19. "util"
  20. )
  21. // ProjectIsExists 获取所有项目名称
  22. func (f *Front) ProjectIsExists() {
  23. defer qu.Catch()
  24. name := f.GetString("s_name")
  25. exists := false
  26. project, _ := util.Mgo.FindOne(util.PROJECTCOLLNAME, map[string]interface{}{"s_name": name})
  27. if project != nil && len(*project) > 0 {
  28. exists = true
  29. }
  30. f.ServeJson(map[string]interface{}{"exists": exists})
  31. }
  32. // ProjectList 项目列表
  33. func (f *Front) ProjectList() {
  34. defer qu.Catch()
  35. if f.Method() == "POST" {
  36. start, _ := f.GetInteger("start")
  37. limit, _ := f.GetInteger("length")
  38. draw, _ := f.GetInteger("draw")
  39. status := f.GetString("s_status")
  40. searchStr := f.GetString("search[value]")
  41. search := strings.TrimSpace(searchStr)
  42. //data := util.GetPostForm(f.Request)
  43. query := map[string]interface{}{}
  44. if status != "-1" {
  45. query["s_status"] = status
  46. } else {
  47. query["s_status"] = map[string]interface{}{
  48. "$in": []string{"未开始", "进行中", "已完成"},
  49. }
  50. }
  51. if search != "" {
  52. query["$or"] = []interface{}{
  53. map[string]interface{}{"s_name": map[string]interface{}{"$regex": search}},
  54. map[string]interface{}{"s_entname": map[string]interface{}{"$regex": search}},
  55. map[string]interface{}{"s_rule": map[string]interface{}{"$regex": search}},
  56. map[string]interface{}{"s_departname": map[string]interface{}{"$regex": search}},
  57. }
  58. }
  59. list, _ := util.Mgo.Find(util.PROJECTCOLLNAME, query, map[string]interface{}{"_id": -1}, nil, false, start, limit)
  60. count := util.Mgo.Count(util.PROJECTCOLLNAME, query)
  61. f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
  62. } else {
  63. _ = f.Render("project/project_list.html", &f.T)
  64. }
  65. }
  66. // ProjectSave 项目保存
  67. func (f *Front) ProjectSave() {
  68. defer qu.Catch()
  69. s_name := f.GetString("s_name") //项目名称
  70. if s_name == "" {
  71. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少项目名称字段"})
  72. return
  73. }
  74. success := false //导入数据是否成功
  75. msg := "" //异常信息
  76. successNum := int64(0) //导入成功条数
  77. importDataNum := 0 //查询数量
  78. var s_rulename []string //规则
  79. user := f.GetSession("user").(map[string]interface{})
  80. username := qu.ObjToString(user["s_login"]) //当前登录用户
  81. stype := f.GetString("s_type") //新建项目类型:数据库导入、excel导入
  82. s_sourceinfoTmp := f.GetString("s_sourceinfo") //数据表
  83. s_sourceinfo := "f_sourceinfo_" + s_sourceinfoTmp
  84. if stype != "edit" && s_sourceinfo == "" {
  85. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少数据存储表名"})
  86. return
  87. }
  88. s_departname, s_entname := "", ""
  89. query := map[string]interface{}{
  90. "s_name": s_name,
  91. }
  92. set := map[string]interface{}{}
  93. //导入数据
  94. if stype == "excel" { //excel导入
  95. s_entname = f.GetString("s_entname") //公司名称
  96. if s_entname == "" {
  97. f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少公司名称字段"})
  98. return
  99. }
  100. s_departname = f.GetString("s_departname") //部门名称
  101. rulename := f.GetString("s_rulename") //规则名称
  102. s_rulename = strings.Split(rulename, ",")
  103. mf, _, err := f.GetFile("xlsx")
  104. qu.Debug(s_entname, s_departname, s_rulename)
  105. if err == nil {
  106. importDataNum = ImportDataByExcel(s_sourceinfo, mf, &success, &msg, &successNum, true)
  107. }
  108. //保存项目信息
  109. set = map[string]interface{}{
  110. "s_name": s_name, //项目名称
  111. "s_entname": s_entname, //公司名称
  112. "s_departname": s_departname, //部门名称
  113. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  114. "i_importnum": importDataNum, //导入数量
  115. "s_sourceinfo": s_sourceinfo, //源数据表
  116. "s_createname": username, //创建人
  117. "s_status": "未开始", //项目状态
  118. "i_createtime": time.Now().Unix(), //创建时间
  119. "s_importtype": "excel", //导入类型
  120. "b_isassessment": false, //是否进行了质量评估
  121. }
  122. } else if stype == "coll" { //数据库导入
  123. historyid := f.GetString("s_historyid")
  124. if historyid == "" {
  125. f.ServeJson(map[string]interface{}{"success": false, "msg": "数据导出ID字段"})
  126. return
  127. }
  128. s_departname, s_entname, s_rulename, importDataNum = ImportDataByColl(s_sourceinfo, historyid, &success, &msg, &successNum, true)
  129. qu.Debug(s_departname, s_entname, s_rulename, importDataNum)
  130. //保存项目信息
  131. set = map[string]interface{}{
  132. "s_name": s_name, //项目名称
  133. "s_entname": s_entname, //公司名称
  134. "s_departname": s_departname, //部门名称
  135. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  136. "i_importnum": importDataNum, //导入数量
  137. "s_sourceinfo": s_sourceinfo, //源数据表
  138. "s_createname": username, //创建人
  139. "s_status": "未开始", //项目状态
  140. "i_createtime": time.Now().Unix(), //创建时间
  141. "s_importtype": "coll", //导入类型
  142. "s_historyid": historyid, //源数据集标识
  143. "b_isassessment": false, //是否进行了质量评估
  144. }
  145. } else if stype == "edit" { //编辑保存
  146. success = true
  147. //s_entname = f.GetString("s_entname") //公司名称
  148. s_departname = f.GetString("s_departname") //部门名称
  149. rulename := f.GetString("s_rulename") //规则名称
  150. s_rulename = strings.Split(rulename, ",")
  151. s_personname := f.GetString("s_personname")
  152. set = map[string]interface{}{
  153. //"s_name": s_name, //项目名称
  154. //"s_entname": s_entname, //公司名称
  155. "s_departname": s_departname, //部门名称
  156. "s_rulename": strings.Join(s_rulename, ","), //规则名称
  157. "s_updateperson": username, //更新人
  158. "i_createtime": time.Now().Unix(), //更新时间
  159. "s_personname": s_personname, //售后人员
  160. //"i_starttime":,//开始时间
  161. //"i_completetime",//结束时间
  162. }
  163. }
  164. if success {
  165. success = util.Mgo.Update(util.PROJECTCOLLNAME, query, map[string]interface{}{"$set": set}, true, false)
  166. qu.Debug("Save Project:", success)
  167. if !success { //保存项目失败
  168. msg = "新建项目失败\n" + msg
  169. } else {
  170. msg = "保存项目成功"
  171. }
  172. }
  173. //qu.Debug("Msg:", msg)
  174. //返回信息
  175. if stype == "edit" {
  176. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  177. } else {
  178. qu.Debug("Create Project:", success, "importnum:", importDataNum, "successnum:", successNum, "failnum:", int64(importDataNum)-successNum)
  179. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "importnum": importDataNum, "successnum": successNum, "failnum": int64(importDataNum) - successNum})
  180. }
  181. }
  182. func (f *Front) ProjectAddData() {
  183. defer qu.Catch()
  184. if f.Method() == "POST" {
  185. user := f.GetSession("user").(map[string]interface{})
  186. username := qu.ObjToString(user["s_login"])
  187. projectid := f.GetString("projectid")
  188. stype := f.GetString("s_type")
  189. info, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, nil)
  190. if len(*info) > 0 {
  191. success := false //导入数据是否成功
  192. msg := "" //异常信息
  193. importDataNum, successNum := 0, int64(0) //导入成功条数
  194. s_sourceinfo := qu.ObjToString((*info)["s_sourceinfo"])
  195. if stype == "excel" {
  196. mf, _, err := f.GetFile("xlsx")
  197. if err == nil {
  198. importDataNum = ImportDataByExcel(s_sourceinfo, mf, &success, &msg, &successNum, false)
  199. var addDataTag []map[string]interface{}
  200. if (*info)["v_add_tag"] != nil {
  201. arr := qu.ObjArrToMapArr((*info)["v_add_data"].([]interface{}))
  202. addDataTag = append(addDataTag, arr...)
  203. } else {
  204. addDataTag = append(addDataTag, map[string]interface{}{
  205. "s_importtype": stype,
  206. "i_importnum": importDataNum,
  207. "s_updateperson": username,
  208. "i_updatetime": time.Now().Unix(),
  209. })
  210. }
  211. s_status := ""
  212. if status := qu.ObjToString((*info)["s_status"]); status == "未开始" || status == "进行中" {
  213. s_status = status
  214. } else if status == "已完成" {
  215. s_status = "进行中"
  216. }
  217. set := map[string]interface{}{
  218. "i_importnum": importDataNum + qu.IntAll((*info)["i_importnum"]), //导入数量
  219. "s_status": s_status, //项目状态
  220. "i_updatetime": time.Now().Unix(),
  221. "v_add_data": addDataTag,
  222. }
  223. util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"$set": set})
  224. }
  225. } else if stype == "coll" {
  226. historyid := f.GetString("s_historyid")
  227. if historyid == "" {
  228. f.ServeJson(map[string]interface{}{"success": false, "msg": "数据导出ID字段"})
  229. return
  230. }
  231. _, _, _, importDataNum = ImportDataByColl(s_sourceinfo, historyid, &success, &msg, &successNum, false)
  232. if !success {
  233. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  234. return
  235. }
  236. var addDataTag []map[string]interface{}
  237. if (*info)["v_add_tag"] != nil {
  238. arr := qu.ObjArrToMapArr((*info)["v_add_data"].([]interface{}))
  239. addDataTag = append(addDataTag, arr...)
  240. } else {
  241. addDataTag = append(addDataTag, map[string]interface{}{
  242. "s_importtype": stype,
  243. "i_importnum": importDataNum,
  244. "s_updateperson": username,
  245. "i_updatetime": time.Now().Unix(),
  246. })
  247. }
  248. s_status := ""
  249. if status := qu.ObjToString((*info)["s_status"]); status == "未开始" || status == "进行中" {
  250. s_status = status
  251. } else if status == "已完成" {
  252. s_status = "进行中"
  253. }
  254. //保存项目信息
  255. set := map[string]interface{}{
  256. "i_importnum": importDataNum + qu.IntAll((*info)["i_importnum"]), //导入数量
  257. "s_status": s_status, //项目状态
  258. "i_updatetime": time.Now().Unix(),
  259. "v_add_data": addDataTag,
  260. }
  261. util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"$set": set})
  262. }
  263. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "importnum": importDataNum, "successnum": successNum, "failnum": int64(importDataNum) - successNum})
  264. } else {
  265. f.ServeJson(map[string]interface{}{"success": false, "msg": "项目查询失败"})
  266. }
  267. }
  268. }
  269. // ProjectComplete 项目提交完成
  270. func (f *Front) ProjectComplete() {
  271. defer qu.Catch()
  272. user := f.GetSession("user").(map[string]interface{})
  273. username := qu.ObjToString(user["s_login"]) //当前登录用户
  274. success := false
  275. msg := ""
  276. projectId := f.GetString("s_projectid")
  277. sourceInfo := f.GetString("s_sourceinfo")
  278. //status := f.GetString("s_status")
  279. info, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectId, `{"s_status": 1}`)
  280. if len(*info) <= 0 {
  281. f.ServeJson(map[string]interface{}{"success": false, "msg": "查询项目失败"})
  282. return
  283. }
  284. status := (*info)["s_status"]
  285. if status == "进行中" {
  286. //查询该项目下未完成的用户组和用户任务
  287. query := map[string]interface{}{
  288. "s_projectid": projectId,
  289. "s_status": map[string]interface{}{
  290. "$in": []string{"未开始", "进行中"},
  291. },
  292. }
  293. taskCount := util.Mgo.Count(util.TASKCOLLNAME, query)
  294. dataCount := util.Mgo.Count(sourceInfo, map[string]interface{}{"b_istag": false}) //未标注数据个数
  295. qu.Debug("No Tag Count:", dataCount)
  296. if dataCount == 0 && taskCount == 0 { //全部完成
  297. success = util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectId, map[string]interface{}{
  298. "$set": map[string]interface{}{
  299. "s_status": "已完成",
  300. "i_completetime": time.Now().Unix(),
  301. "s_updateperson": username,
  302. "i_updatetime": time.Now().Unix(),
  303. },
  304. })
  305. if !success {
  306. msg = "更新项目失败"
  307. }
  308. }
  309. if taskCount != 0 {
  310. msg += "任务未全部完成,"
  311. }
  312. if dataCount != 0 {
  313. msg += "数据未全部标注"
  314. }
  315. } else {
  316. msg = "项目未开始"
  317. }
  318. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  319. }
  320. // ProjectQualityAssessment 数据质量评估
  321. func (f *Front) ProjectQualityAssessment() {
  322. defer qu.Catch()
  323. msg := ""
  324. success := false
  325. //质量评估
  326. projectid := f.GetString("pid") //项目id
  327. project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"b_isassessment": 1, "s_sourceinfo": 1, "v_fields": 1})
  328. if project != nil && len(*project) > 0 {
  329. if isAssessment, ok := (*project)["b_isassessment"].(bool); ok && !isAssessment {
  330. if fields := qu.ObjArrToMapArr((*project)["v_fields"].([]interface{})); len(fields) > 0 {
  331. var fieldsArr []string
  332. for _, v := range fields {
  333. fieldsArr = append(fieldsArr, qu.ObjToString(v["key"]))
  334. }
  335. sourceinfo := qu.ObjToString((*project)["s_sourceinfo"])
  336. qu.Debug("质量评估字段:", fieldsArr)
  337. success = QuaFieldScore(fieldsArr, sourceinfo) //调用数据质量评估接口
  338. if success {
  339. //点击清洗更新项目状态为进行中
  340. b := util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"$set": map[string]interface{}{"b_isassessment": true, "s_status": "进行中", "i_starttime": time.Now().Unix()}})
  341. qu.Debug("Update Porject:"+projectid+" Status Success:", b)
  342. } else {
  343. msg = "质量评估失败"
  344. }
  345. } else {
  346. msg = "项目标注字段查询失败"
  347. }
  348. } else if ok && isAssessment {
  349. success = true
  350. } else {
  351. msg = "查询项目失败"
  352. }
  353. } else {
  354. msg = "查询项目失败"
  355. }
  356. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  357. }
  358. // ProjectGroupTaskList 用户组任务分发列表
  359. func (f *Front) ProjectGroupTaskList() {
  360. defer qu.Catch()
  361. projectid := f.GetString("pid") //项目id
  362. if f.Method() == "POST" {
  363. status := f.GetString("s_status") //任务状态
  364. searchStr := f.GetString("search[value]")
  365. search := strings.TrimSpace(searchStr)
  366. start, _ := f.GetInteger("start")
  367. limit, _ := f.GetInteger("length")
  368. draw, _ := f.GetInteger("draw")
  369. query := map[string]interface{}{ //查找用户组任务
  370. "s_projectid": projectid,
  371. "s_stype": "group",
  372. }
  373. if status != "-1" {
  374. query["s_status"] = status
  375. }
  376. if search != "" {
  377. query["$or"] = []interface{}{
  378. map[string]interface{}{"s_groupname": map[string]interface{}{"$regex": search}},
  379. }
  380. }
  381. qu.Debug("Query:", query)
  382. list, _ := util.Mgo.Find(util.TASKCOLLNAME, query, map[string]interface{}{"_id": -1}, nil, false, start, limit)
  383. count := util.Mgo.Count(util.TASKCOLLNAME, query)
  384. for _, l := range *list {
  385. if status := qu.ObjToString(l["s_status"]); status == "进行中" { //更新任务进度
  386. //groupId := qu.ObjToString(l["s_groupid"])
  387. groupTaskId := mongodb.BsonIdToSId(l["_id"])
  388. giveNum := qu.IntAll(l["i_givenum"])
  389. sourceinfo := qu.ObjToString(l["s_sourceinfo"])
  390. //tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_isgivegroup": true, "s_grouptaskid": groupTaskId, "b_istag": true})
  391. tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_istag": true})
  392. progressFloat := float64(tagNum) / float64(giveNum)
  393. value, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", progressFloat), 64)
  394. decimalValue := decimal.NewFromFloat(value)
  395. decimalValue = decimalValue.Mul(decimal.NewFromInt(100))
  396. value, _ = decimalValue.Float64()
  397. progress := fmt.Sprint(value) + "%"
  398. l["s_progress"] = progress
  399. //同步数据库
  400. util.Mgo.UpdateById(util.TASKCOLLNAME, l["_id"], map[string]interface{}{"$set": map[string]interface{}{"s_progress": progress}})
  401. }
  402. }
  403. f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
  404. } else {
  405. project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"s_sourceinfo": 1})
  406. if project != nil && len(*project) > 0 {
  407. sourceinfo := qu.ObjToString((*project)["s_sourceinfo"]) //数据源表
  408. okAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false}) //达标数据总量
  409. okIsGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgivegroup": true}) //达标数据已分发量
  410. okNotGiveDataNum := okAllDataNum - okIsGiveDataNum //达标待分发量
  411. okIsTagDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_istag": true}) //达标已标注量
  412. IsNoOkAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true}) //未达标数据总量
  413. IsNoOkIsGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_isgivegroup": true}) //未达标数据已分发量
  414. IsNotOkNotGiveDataNum := IsNoOkAllDataNum - IsNoOkIsGiveDataNum //未达标待分发量
  415. IsNotOkIsTagDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_istag": true}) //未达标已标注量
  416. allGiveDataNum := okIsGiveDataNum + IsNoOkIsGiveDataNum //总分发量
  417. allNoGiveDataNum := okNotGiveDataNum + IsNotOkNotGiveDataNum //总待分发量
  418. allIsTagDataNum := okIsTagDataNum + IsNotOkIsTagDataNum //已标注总量
  419. allDataNum := allGiveDataNum + allNoGiveDataNum
  420. // 查询全部实际可分发数据量(未分发、未标注)
  421. okRealGiveNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_isgivegroup": false, "b_istag": false})
  422. //qu.Debug("数据总量:", allDataNum, "已分发总量:", allGiveDataNum, "待分发总量:", allNoGiveDataNum, "已标注总量:", allIsTagDataNum)
  423. //qu.Debug("达标量:", okAllDataNum, "达标已分发量:", okIsGiveDataNum, "达标待分发量:", okNotGiveDataNum, "达标已标注量:", okIsTagDataNum)
  424. //qu.Debug(" 未达标量:", IsNoOkAllDataNum, " 未达标已分发量:", IsNoOkIsGiveDataNum, " 未达标待分发量:", IsNotOkNotGiveDataNum, " 未达标已标注量:", IsNotOkIsTagDataNum)
  425. f.T["s_projectid"] = projectid
  426. f.T["s_sourceinfo"] = sourceinfo
  427. f.T["allDataNum"] = allDataNum
  428. f.T["okAllDataNum"] = okAllDataNum
  429. f.T["okIsGiveDataNum"] = okIsGiveDataNum
  430. f.T["okNotGiveDataNum"] = okNotGiveDataNum
  431. f.T["IsNoOkAllDataNum"] = IsNoOkAllDataNum
  432. f.T["IsNoOkIsGiveDataNum"] = IsNoOkIsGiveDataNum
  433. f.T["IsNotOkNotGiveDataNum"] = IsNotOkNotGiveDataNum
  434. f.T["allGiveDataNum"] = allGiveDataNum
  435. f.T["allNoGiveDataNum"] = allNoGiveDataNum
  436. f.T["allIsTagDataNum"] = allIsTagDataNum
  437. f.T["okIsTagDataNum"] = okIsTagDataNum
  438. f.T["IsNotOkIsTagDataNum"] = IsNotOkIsTagDataNum
  439. f.T["okRealGiveNum"] = okRealGiveNum
  440. _ = f.Render("project/project_clear.html", &f.T)
  441. } else {
  442. qu.Debug("Project Find Error")
  443. f.ServeJson("查询项目失败")
  444. }
  445. }
  446. }
  447. // ProjectGroupTaskSave 用户组任务分发
  448. func (f *Front) ProjectGroupTaskSave() {
  449. defer qu.Catch()
  450. var groupArr []map[string]interface{}
  451. var taskArr []map[string]interface{}
  452. var groupIdArr []string
  453. var groupTaskIdArr []string
  454. groupIdTask := map[string]util.Task{}
  455. success := false
  456. msg := ""
  457. user := f.GetSession("user").(map[string]interface{})
  458. username := qu.ObjToString(user["s_login"]) //当前登录用户
  459. projectid := f.GetString("s_projectid") //项目标识
  460. project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, nil)
  461. projectname := qu.ObjToString((*project)["s_name"]) //项目名称
  462. sourceinfo := qu.ObjToString((*project)["s_sourceinfo"]) //源数据表
  463. group := f.GetString("s_group")
  464. stype := f.GetString("s_type")
  465. qu.Debug("项目id:", projectid, " 项目名称:", projectname)
  466. if err := json.Unmarshal([]byte(group), &groupArr); err != nil {
  467. qu.Debug("GroupInfo Unmarshal Failed:", err)
  468. msg = "用户组信息解析失败"
  469. } else {
  470. qu.Debug("用户组信息:", groupArr, stype)
  471. //if stype != "tag" { //如果分发的是达标数据或者全部数据且进行了初步质检,将没有质检记录的字段从v_taginfo标注记录中删除
  472. // DeleleDataTagInfo(sourceinfo)
  473. //}
  474. // 查询实际可分发数据量(未分发、未标注)
  475. realNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_isgivegroup": false, "b_istag": false})
  476. for _, groupInfo := range groupArr {
  477. groupId := qu.ObjToString(groupInfo["s_groupid"])
  478. groupIdArr = append(groupIdArr, groupId)
  479. givenum := qu.IntAll(groupInfo["i_givenum"])
  480. if givenum <= 0 {
  481. continue
  482. }
  483. if realNum <= 0 {
  484. break
  485. }
  486. if realNum < givenum {
  487. givenum = realNum
  488. }
  489. groupTaskId := primitive.NewObjectID()
  490. groupTaskIdStr := mongodb.BsonIdToSId(groupTaskId)
  491. groupTaskIdArr = append(groupTaskIdArr, groupTaskIdStr)
  492. gt := util.Task{
  493. UserId: groupId,
  494. GiveNum: givenum,
  495. }
  496. groupIdTask[groupTaskIdStr] = gt
  497. groupTask := map[string]interface{}{
  498. "_id": groupTaskId, //生成任务id
  499. "s_projectid": projectid, //项目标识
  500. "s_projectname": projectname, //项目名称
  501. "s_status": "未开始", //任务状态
  502. "s_personid": qu.ObjToString(groupInfo["s_personid"]), //任务负责人标识
  503. "s_personname": qu.ObjToString(groupInfo["s_personname"]), //任务负责人
  504. "s_groupname": qu.ObjToString(groupInfo["s_groupname"]), //用户组名称
  505. "s_groupid": groupId, //用户组标识
  506. "i_givenum": givenum, //分发数据量
  507. "s_createname": username, //创建人
  508. "i_createtime": time.Now().Unix(), //创建时间
  509. "s_progress": "0%", //完成进度
  510. "s_sourceinfo": sourceinfo, //源数据表
  511. "s_stype": "group", //任务类型
  512. "s_entname": qu.ObjToString((*project)["s_entname"]), //公司名称
  513. "s_departname": qu.ObjToString((*project)["s_departname"]), //部门名称
  514. "s_rulename": qu.ObjToString((*project)["s_rulename"]), //规则名称
  515. "s_datatype": qu.ObjToString((*project)["s_datatype"]), //数据类型
  516. }
  517. realNum = realNum - givenum
  518. taskArr = append(taskArr, groupTask)
  519. }
  520. }
  521. if len(taskArr) > 0 {
  522. //分发数据后更新项目中用户组标识信息和用户组任务id
  523. success = util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{
  524. "$push": map[string]interface{}{
  525. "v_groupids": map[string]interface{}{
  526. "$each": groupIdArr,
  527. },
  528. "v_grouptaskids": map[string]interface{}{
  529. "$each": groupTaskIdArr,
  530. },
  531. },
  532. })
  533. if !success {
  534. msg = "更新项目:" + projectname + "用户组标识失败"
  535. } else { //用户组分发任务
  536. success = util.Mgo.SaveBulk(util.TASKCOLLNAME, taskArr...)
  537. if success {
  538. msg = "任务分发成功"
  539. UpdateSourceInfoByGroup(sourceinfo, stype, groupIdTask) //用户组分发任务成功后,给数据源打上用户组标识
  540. } else {
  541. msg = "任务分发失败"
  542. }
  543. }
  544. }
  545. qu.Debug("Success:", success, "Msg:", msg)
  546. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  547. }
  548. // ProjectGetEntnameList 模糊查询公司名称
  549. func (f *Front) ProjectGetEntnameList() {
  550. defer qu.Catch()
  551. var entnameList []string
  552. entname := f.GetString("entname")
  553. query := map[string]interface{}{
  554. "username": map[string]interface{}{
  555. "$regex": entname,
  556. },
  557. }
  558. list, _ := util.MgoJy.Find(util.JyUser, query, nil, map[string]interface{}{"username": 1}, false, -1, -1)
  559. if len(*list) > 0 {
  560. for _, l := range *list {
  561. entnameList = append(entnameList, qu.ObjToString(l["username"]))
  562. }
  563. f.ServeJson(map[string]interface{}{"entname": entnameList})
  564. } else {
  565. f.ServeJson(map[string]interface{}{"entname": []string{}})
  566. }
  567. }
  568. // ProjectGroupTaskRepulse 用户组任务打回
  569. func (f *Front) ProjectGroupTaskRepulse() {
  570. defer qu.Catch()
  571. success := false
  572. msg := ""
  573. user := f.GetSession("user").(map[string]interface{})
  574. username := qu.ObjToString(user["s_login"])
  575. //status := f.GetString("s_status")
  576. groupTaskId := f.GetString("taskid")
  577. sourceinfo := f.GetString("s_sourceinfo")
  578. currenttime := time.Now().Unix()
  579. //更新数据源
  580. success = util.Mgo.Update(sourceinfo, map[string]interface{}{"s_grouptaskid": groupTaskId}, map[string]interface{}{
  581. "$set": map[string]interface{}{
  582. "b_istag": false,
  583. "b_check": false, // 质检标记
  584. "i_ckdata": 0,
  585. "b_isgiveuser": false,
  586. "i_updatetime": currenttime,
  587. },
  588. "$unset": map[string]interface{}{
  589. "s_userid": "",
  590. "s_usertaskid": "",
  591. "s_login": "",
  592. "v_taginfo": "",
  593. "v_checkinfo": "",
  594. },
  595. }, false, true)
  596. if success {
  597. util.Mgo.Update(util.TASKCOLLNAME, map[string]interface{}{
  598. "s_stype": "user",
  599. "s_parentid": groupTaskId,
  600. }, map[string]interface{}{
  601. "$set": map[string]interface{}{
  602. "s_status": "已关闭",
  603. "s_progress": "%0",
  604. "i_updatetime": currenttime,
  605. "s_updateperson": username,
  606. },
  607. }, false, true)
  608. //更新用户组任务 清除最迟完成时间,更新任务状态
  609. success = util.Mgo.UpdateById(util.TASKCOLLNAME, groupTaskId, map[string]interface{}{
  610. "$set": map[string]interface{}{
  611. "s_status": "未开始",
  612. "s_updateperson": username,
  613. "i_updatetime": currenttime,
  614. "s_progress": "0%",
  615. },
  616. "$unset": map[string]interface{}{
  617. "i_completetime": "",
  618. },
  619. })
  620. if !success {
  621. msg = "更新用户组任务失败"
  622. }
  623. } else {
  624. msg = "更新数据源信息失败"
  625. }
  626. qu.Debug("Task Repulse:", success, " Msg:", msg)
  627. f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
  628. }
  629. // ProjectGroupTaskRetrieve 用户组任务收回
  630. func (f *Front) ProjectGroupTaskRetrieve() {
  631. defer qu.Catch()
  632. user := f.GetSession("user").(map[string]interface{})
  633. username := qu.ObjToString(user["s_login"])
  634. groupTaskId := f.GetString("taskid")
  635. sourceInfo := f.GetString("s_sourceinfo")
  636. //status := f.GetString("s_status") //未开始、进行中
  637. //giveNum, _ := f.GetInteger("i_givenum") //收回时要更新的分发数据量
  638. msg := ""
  639. success := false
  640. count := 0
  641. groupTask, _ := util.Mgo.FindById(util.TASKCOLLNAME, groupTaskId, map[string]interface{}{"v_sonids": 1, "s_status": 1})
  642. if len(*groupTask) <= 0 {
  643. f.ServeJson(map[string]interface{}{"success": false, "msg": "查询任务失败"})
  644. return
  645. }
  646. status := qu.ObjToString((*groupTask)["s_status"])
  647. if status == "未开始" { //未开始的用户组任务,暂未给用户分发任务
  648. success = true
  649. } else { //进行中的用户组任务需更新其下用户信息
  650. if groupTask != nil && len(*groupTask) > 0 {
  651. if sonIds, ok := (*groupTask)["v_sonids"].([]interface{}); ok && len(sonIds) > 0 { //更新所有用户任务信息
  652. userTaskIdStatus := map[string]string{} //封装要回收的用户任务信息
  653. for _, sId := range sonIds {
  654. userTaskId := qu.ObjToString(sId)
  655. userTask, _ := util.Mgo.FindById(util.TASKCOLLNAME, userTaskId, map[string]interface{}{"s_status": 1})
  656. if userTask != nil && len(*userTask) > 0 {
  657. if statusTmp := qu.ObjToString((*userTask)["s_status"]); statusTmp != "已完成" && statusTmp != "已关闭" { //已完成和已关闭的任务不收回
  658. userTaskIdStatus[userTaskId] = statusTmp
  659. }
  660. } else {
  661. qu.Debug("Find User Task:", userTaskId, "Error")
  662. }
  663. }
  664. qu.Debug("userTaskIdStatus:", len(userTaskIdStatus))
  665. if len(userTaskIdStatus) > 0 { //收回用户组下所有用户信息
  666. //用户组收回时,若已有用户任务在未开始时收回或关闭,调用RetrieveTaskByUser返回的总收回量count就遗漏了用户收回或关闭任务的量
  667. msg, _, success = RetrieveCloseTaskByUser(sourceInfo, username, userTaskIdStatus) //用户信息收回
  668. } else { //用户组下所有用户任务都已完成
  669. success = true
  670. }
  671. } else { //没有分配给用户任务
  672. success = true
  673. }
  674. } else {
  675. msg = "用户组任务查找失败"
  676. }
  677. }
  678. if success { //所有用户信息收回成功后,更新用户组任务相关信息
  679. count = util.Mgo.Count(sourceInfo, map[string]interface{}{ //统计该用户组任务下未标注的数据量
  680. "s_grouptaskid": groupTaskId,
  681. "b_istag": false,
  682. })
  683. UpdateGroupTaskAndSourceInfo(groupTaskId, sourceInfo, username, status, count, &msg, &success)
  684. }
  685. qu.Debug(success, count, msg)
  686. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "count": count})
  687. }
  688. // ProjectGroupTaskClose 用户组任务关闭
  689. func (f *Front) ProjectGroupTaskClose() {
  690. defer qu.Catch()
  691. user := f.GetSession("user").(map[string]interface{})
  692. username := qu.ObjToString(user["s_login"])
  693. groupTaskId := f.GetString("taskid")
  694. sourceInfo := f.GetString("s_sourceinfo")
  695. //status := f.GetString("s_status") //未开始、进行中
  696. msg := ""
  697. success := false
  698. count := 0
  699. groupTask, _ := util.Mgo.FindById(util.TASKCOLLNAME, groupTaskId, map[string]interface{}{"s_status": 1})
  700. if len(*groupTask) <= 0 {
  701. f.ServeJson(map[string]interface{}{"success": false, "msg": "查询任务失败"})
  702. return
  703. }
  704. status := qu.ObjToString((*groupTask)["s_status"])
  705. if status == "未开始" { //未开始的用户组任务,暂未给用户分发任务;已完成只更新用户组任务
  706. success = true
  707. } else { //进行中的用户组任务需更新其下用户信息
  708. //groupTask, _ := util.Mgo.FindById(util.TASKCOLLNAME, groupTaskId, map[string]interface{}{"v_sonids": 1})
  709. //if groupTask != nil && len(*groupTask) > 0 {
  710. // if sonIds, ok := (*groupTask)["v_sonids"].([]interface{}); ok && len(sonIds) > 0 { //更新所有用户任务信息
  711. // userTaskIdStatus := map[string]string{} //封装要关闭的用户任务信息
  712. // for _, sId := range sonIds {
  713. // userTaskId := qu.ObjToString(sId)
  714. // userTask, _ := util.Mgo.FindById(util.TASKCOLLNAME, userTaskId, map[string]interface{}{"s_status": 1})
  715. // if userTask != nil && len(*userTask) > 0 {
  716. // if statusTmp := qu.ObjToString((*userTask)["s_status"]); statusTmp == "已完成" && statusTmp != "已关闭" { //已关闭的任务不更新
  717. // userTaskIdStatus[userTaskId] = statusTmp
  718. // }
  719. // } else {
  720. // qu.Debug("Find User Task:", userTaskId, "Error")
  721. // }
  722. // }
  723. // if len(userTaskIdStatus) > 0 { //关闭用户组下所有用户信息
  724. // msg, _, success = RetrieveCloseTaskByUser(sourceInfo, username, userTaskIdStatus) //用户信息收回
  725. // } else { //用户组下所有用户任务都已关闭
  726. // success = true
  727. // }
  728. // } else { //没有分配给用户任务
  729. // success = true
  730. // }
  731. //} else {
  732. // msg = "用户组任务查找失败"
  733. //}
  734. }
  735. if success { //所有用户信息关闭成功后,更新用户组任务相关信息
  736. count = util.Mgo.Count(sourceInfo, map[string]interface{}{ //统计该用户组任务下未标注的数据量
  737. "s_grouptaskid": groupTaskId,
  738. "b_istag": false,
  739. })
  740. UpdateGroupTaskAndSourceInfo(groupTaskId, sourceInfo, username, status, count, &msg, &success)
  741. }
  742. qu.Debug(success, count, msg)
  743. f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "count": count})
  744. }
  745. // UpdateGroupTaskAndSourceInfo 更新用户组任务相关信息
  746. func UpdateGroupTaskAndSourceInfo(groupTaskId, sourceInfo, username, status string, count int, msg *string, success *bool) {
  747. defer qu.Catch()
  748. qu.Debug("GroupTaskStatus:", status, " Count:", count)
  749. if count != 0 { //更新数据源
  750. query := map[string]interface{}{
  751. "s_grouptaskid": groupTaskId,
  752. "b_istag": false,
  753. }
  754. set := map[string]interface{}{
  755. "b_isgivegroup": false,
  756. "i_updatetime": time.Now().Unix(),
  757. }
  758. unset := map[string]interface{}{
  759. "s_groupid": "",
  760. "s_grouptaskid": "",
  761. }
  762. *success = util.Mgo.Update(sourceInfo, query, map[string]interface{}{"$set": set, "$unset": unset}, false, true)
  763. }
  764. //更新用户组任务
  765. if *success {
  766. taskSet := map[string]interface{}{
  767. "s_status": "已完成",
  768. "s_updateperson": username,
  769. "i_updatetime": time.Now().Unix(),
  770. "i_completetime": time.Now().Unix(),
  771. "s_progress": "100%",
  772. }
  773. if status == "未开始" {
  774. taskSet["i_starttime"] = time.Now().Unix()
  775. taskSet["s_status"] = "已关闭"
  776. }
  777. inc := map[string]interface{}{
  778. "i_givenum": -count,
  779. }
  780. *success = util.Mgo.UpdateById(util.TASKCOLLNAME, groupTaskId, map[string]interface{}{"$set": taskSet, "$inc": inc})
  781. if !*success {
  782. *msg = "更新用户组任务失败"
  783. }
  784. } else {
  785. *msg = "更新数据源信息失败"
  786. }
  787. }
  788. // DeleleDataTagInfo 删除标注记录
  789. func DeleleDataTagInfo(sourceinfo string) {
  790. defer qu.Catch()
  791. sess := util.Mgo.GetMgoConn()
  792. defer util.Mgo.DestoryMongoConn(sess)
  793. ch := make(chan bool, 5)
  794. wg := &sync.WaitGroup{}
  795. lock := &sync.Mutex{}
  796. query := map[string]interface{}{ //达标数据可能会分发后收回、打回再分发
  797. "b_istagging": false, //达标数据
  798. "b_cleartag": false, //未进行一次标注信息清理
  799. }
  800. fields := map[string]interface{}{
  801. "v_taginfo": 1,
  802. "v_check": 1,
  803. }
  804. updateArr := [][]map[string]interface{}{}
  805. it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Iter()
  806. count, _ := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Count()
  807. qu.Debug("Find Needs To Clearn Data Count:", count)
  808. n := 0
  809. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  810. ch <- true
  811. wg.Add(1)
  812. go func(tmp map[string]interface{}) {
  813. defer func() {
  814. <-ch
  815. wg.Done()
  816. }()
  817. update := []map[string]interface{}{}
  818. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  819. tagInfo, _ := tmp["v_taginfo"].(map[string]interface{})
  820. checkInfo, _ := tmp["v_check"].(map[string]interface{})
  821. set := map[string]interface{}{
  822. "b_cleartag": true,
  823. }
  824. unset := map[string]interface{}{}
  825. if len(tagInfo) != 0 && len(checkInfo) != 0 { //有质检信息,删除v_taginfo未质检的字段
  826. for f := range tagInfo {
  827. if checkInfo[f] == nil {
  828. delete(tagInfo, f)
  829. }
  830. }
  831. set["v_taginfo"] = tagInfo
  832. } else if len(tagInfo) != 0 && len(checkInfo) == 0 { //没有质检删除v_taginfo字段
  833. unset["v_taginfo"] = ""
  834. }
  835. if len(unset) > 0 {
  836. update = append(update, map[string]interface{}{
  837. "$set": set,
  838. "$unset": unset,
  839. })
  840. } else {
  841. update = append(update, map[string]interface{}{
  842. "$set": set,
  843. })
  844. }
  845. lock.Lock()
  846. updateArr = append(updateArr, update)
  847. if len(updateArr) > 500 {
  848. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  849. updateArr = [][]map[string]interface{}{}
  850. }
  851. lock.Unlock()
  852. }(tmp)
  853. if n%100 == 0 {
  854. qu.Debug("current:", n)
  855. }
  856. tmp = map[string]interface{}{}
  857. }
  858. wg.Wait()
  859. lock.Lock()
  860. if len(updateArr) > 0 {
  861. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  862. updateArr = [][]map[string]interface{}{}
  863. }
  864. lock.Unlock()
  865. }
  866. // UpdateSourceInfoByGroup 用户组分发任务成功后,给数据源打上用户组标识
  867. func UpdateSourceInfoByGroup(sourceinfo, stype string, groupIdInfo map[string]util.Task) {
  868. defer qu.Catch()
  869. for groupTaskId, tInfo := range groupIdInfo {
  870. groupId := tInfo.UserId
  871. num := tInfo.GiveNum
  872. sess := util.Mgo.GetMgoConn()
  873. defer util.Mgo.DestoryMongoConn(sess)
  874. ch := make(chan bool, 5)
  875. wg := &sync.WaitGroup{}
  876. lock := &sync.Mutex{}
  877. query := map[string]interface{}{ //查找未分配且未标注对应stype的数据分发
  878. "b_isgivegroup": false,
  879. "b_istag": false,
  880. }
  881. if stype == "notag" { //达标数据
  882. query["b_istagging"] = false
  883. } else if stype == "tag" { //未达标数据
  884. query["b_istagging"] = true
  885. }
  886. fields := map[string]interface{}{
  887. "v_baseinfo": 1,
  888. }
  889. updateArr := [][]map[string]interface{}{}
  890. qu.Debug("Query:", query)
  891. it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Limit(int64(num)).Iter()
  892. n := 0
  893. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  894. ch <- true
  895. wg.Add(1)
  896. go func(tmp map[string]interface{}) {
  897. defer func() {
  898. <-ch
  899. wg.Done()
  900. }()
  901. update := []map[string]interface{}{}
  902. update = append(update, map[string]interface{}{"_id": tmp["_id"]})
  903. update = append(update, map[string]interface{}{
  904. "$set": map[string]interface{}{
  905. "s_groupid": groupId,
  906. "s_grouptaskid": groupTaskId,
  907. "b_isgivegroup": true,
  908. "i_updatetime": time.Now().Unix(),
  909. },
  910. // 分发时 删除程序检测的标记
  911. "$unset": map[string]interface{}{
  912. "s_excp": "",
  913. "s_excp_info": "",
  914. },
  915. })
  916. lock.Lock()
  917. updateArr = append(updateArr, update)
  918. //saveArr = append(saveArr, save)
  919. if len(updateArr) > 500 {
  920. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  921. updateArr = [][]map[string]interface{}{}
  922. }
  923. lock.Unlock()
  924. }(tmp)
  925. if n%100 == 0 {
  926. qu.Debug("current:", n)
  927. }
  928. tmp = map[string]interface{}{}
  929. }
  930. wg.Wait()
  931. lock.Lock()
  932. if len(updateArr) > 0 {
  933. util.Mgo.UpdateBulk(sourceinfo, updateArr...)
  934. updateArr = [][]map[string]interface{}{}
  935. }
  936. lock.Unlock()
  937. }
  938. }
  939. // ImportDataByExcel 通过excel获取数据源
  940. func ImportDataByExcel(s_sourceinfo string, mf multipart.File, success *bool, msg *string, successNum *int64, createindex bool) (importDataNum int) {
  941. defer qu.Catch()
  942. binary, _ := ioutil.ReadAll(mf)
  943. xls, _ := xlsx.OpenBinary(binary)
  944. sheet := xls.Sheets[0]
  945. rows := sheet.Rows
  946. idcolnum := -1
  947. cellFieldName := map[int]string{} //记录客户需求字段所在的列
  948. idInfoArr := []util.Data{} //记录数据id及需要保存的字段信息
  949. for rn, row := range rows {
  950. if rn == 0 {
  951. for index, cell := range row.Cells {
  952. title := cell.Value
  953. if fieldName := util.CustomerFieldMap_HE[title]; fieldName != "" { //客户需求字段
  954. cellFieldName[index] = fieldName
  955. }
  956. if title == "唯一标识" || title == "信息标识" { //id所在列
  957. idcolnum = index
  958. }
  959. }
  960. if idcolnum == -1 {
  961. break
  962. }
  963. continue
  964. }
  965. if len(row.Cells) < len(rows[0].Cells) {
  966. break
  967. }
  968. tmp := map[string]interface{}{}
  969. for index, f := range cellFieldName {
  970. if val := row.Cells[index].Value; val != "" {
  971. if f == "capital" { //注册资金(万元)
  972. cf, _ := row.Cells[index].Float()
  973. tmp[f] = cf
  974. } else if f == "createtime" { //创建时间
  975. ci, _ := row.Cells[index].Int64()
  976. tmp[f] = ci
  977. } else {
  978. tmp[f] = val
  979. }
  980. }
  981. }
  982. id := row.Cells[idcolnum].String() //加密的id
  983. if id == "" {
  984. break
  985. }
  986. id = util.SE.DecodeString(id) //解密后id
  987. idInfoArr = append(idInfoArr, util.Data{
  988. ID: id,
  989. Info: tmp,
  990. })
  991. //idInfoMap[id] = tmp
  992. }
  993. importDataNum = len(idInfoArr)
  994. qu.Debug("Load Excel Count:", importDataNum)
  995. if importDataNum > 0 {
  996. GetDataById(idInfoArr, "excel", s_sourceinfo, success, msg, successNum, !createindex)
  997. if *success && createindex {
  998. if !util.Mgo.CreateIndex(s_sourceinfo, util.SourceInfoIndexArr) { //创建数据源表同时生成字段索引
  999. qu.Debug("创建数据源表:", s_sourceinfo, "失败")
  1000. return
  1001. }
  1002. }
  1003. } else {
  1004. *success = false
  1005. *msg = "查询数据失败"
  1006. }
  1007. idInfoArr = []util.Data{}
  1008. return
  1009. }
  1010. // ImportDataByColl 通过表获取数据源
  1011. func ImportDataByColl(s_sourceinfo, historyid string, success *bool, msg *string, successNum *int64, createindex bool) (departname, entname string, rulename []string, importDataNum int) {
  1012. defer qu.Catch()
  1013. rulenameMap := map[string]bool{}
  1014. sess := util.MgoJy.GetMgoConn()
  1015. defer util.MgoJy.DestoryMongoConn(sess)
  1016. ch := make(chan bool, 3)
  1017. wg := &sync.WaitGroup{}
  1018. lock := &sync.Mutex{}
  1019. idInfoArr := []util.Data{} //记录数据id及需要保存的字段信息
  1020. query := map[string]interface{}{
  1021. "historyId": historyid,
  1022. }
  1023. it := sess.DB(util.MgoJy.DbName).C(util.JyHistory).Find(&query).Iter()
  1024. n := 0
  1025. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  1026. ch <- true
  1027. wg.Add(1)
  1028. go func(tmp map[string]interface{}) {
  1029. defer func() {
  1030. <-ch
  1031. wg.Done()
  1032. }()
  1033. id := qu.ObjToString(tmp["id"]) //bidding id
  1034. appid := qu.ObjToString(tmp["appid"]) //根据appid查user表获取公司名称
  1035. departname = qu.ObjToString(tmp["departname"]) //部门名称。所有数据都应部门名称,若不一致,随机取
  1036. needField := map[string]interface{}{}
  1037. for f := range util.CustomerFieldMap_EH {
  1038. if tmp[f] != nil {
  1039. needField[f] = tmp[f]
  1040. }
  1041. }
  1042. if entname == "" { //获取一次公司名称即可
  1043. user, _ := util.MgoJy.FindOne(util.JyUser, map[string]interface{}{"appid": appid})
  1044. entname = qu.ObjToString((*user)["username"]) //公司名称
  1045. }
  1046. rname := qu.ObjToString(tmp["rulename"])
  1047. lock.Lock()
  1048. for _, r := range strings.Split(rname, ",") {
  1049. rulenameMap[r] = true
  1050. }
  1051. //rulename = append(rulename, qu.ObjToString(tmp["rulename"])) //规则名称
  1052. //idInfoMap[id] = needField
  1053. idInfoArr = append(idInfoArr, util.Data{
  1054. ID: id,
  1055. Info: needField,
  1056. })
  1057. lock.Unlock()
  1058. }(tmp)
  1059. if n%100 == 0 {
  1060. qu.Debug("current:", n)
  1061. }
  1062. tmp = map[string]interface{}{}
  1063. }
  1064. wg.Wait()
  1065. for r := range rulenameMap {
  1066. rulename = append(rulename, r)
  1067. }
  1068. importDataNum = len(idInfoArr) //查询数据总数
  1069. if importDataNum > 0 {
  1070. GetDataById(idInfoArr, "coll", s_sourceinfo, success, msg, successNum, !createindex)
  1071. if *success && createindex {
  1072. if !util.Mgo.CreateIndex(s_sourceinfo, util.SourceInfoIndexArr) { //创建数据源表同时生成字段索引
  1073. qu.Debug("创建数据源表:", s_sourceinfo, "失败")
  1074. return
  1075. }
  1076. }
  1077. } else {
  1078. *msg = "查询数据失败"
  1079. }
  1080. idInfoArr = []util.Data{}
  1081. return
  1082. }
  1083. // GetDataById 通过id集从bidding、extract、project获取数据所有信息
  1084. func GetDataById(idInfoArr []util.Data, importType, s_sourceinfo string, success *bool, msg *string, successNum *int64, newAdd bool) {
  1085. *success = true
  1086. var msgArr []string
  1087. wg := &sync.WaitGroup{}
  1088. lock := &sync.Mutex{}
  1089. ch := make(chan bool, 10)
  1090. //num := int64(0) //计数
  1091. for i, data := range idInfoArr {
  1092. wg.Add(1)
  1093. ch <- true
  1094. go func(index int, tmpData util.Data) {
  1095. defer func() {
  1096. wg.Done()
  1097. <-ch
  1098. }()
  1099. /*
  1100. 1.查bidding
  1101. 2.查extract
  1102. 3.extract合并到bidding(删除item字段与客户需要的item不是一个含义)
  1103. 4.对比marked表,替换已标注过的字段值,补充标记
  1104. 5.合并客户所需字段信息,补充id字段
  1105. //6.若为同步时,删除原有id对应的信息,新增该id对应的_id信息
  1106. 6.mgo查询项目信息
  1107. */
  1108. tagInfoMap := map[string]interface{}{} //记录数据已标注过的信息
  1109. baseInfoMap := map[string]interface{}{} //记录其他信息
  1110. id := tmpData.ID
  1111. tmp := tmpData.Info
  1112. //1.查bidding
  1113. tmpBidColl := util.BidColl1 //bidding
  1114. //查询bidding
  1115. if id < util.BIDDINGSTARTID {
  1116. tmpBidColl = util.BidColl2 //bidding_back
  1117. }
  1118. bidData, _ := util.MgoB.FindById(tmpBidColl, id, nil)
  1119. if qu.ObjToString((*bidData)["toptype"]) != "采购意向" {
  1120. // 导入数据时,删掉purchasinglist(排除采购意向数据)
  1121. delete(*bidData, "purchasinglist")
  1122. // 删除多包字段 20230518
  1123. delete(*bidData, "package")
  1124. }
  1125. if bidData != nil && len(*bidData) > 0 { //bidding表数据存在
  1126. //2.查extract
  1127. extData, _ := util.MgoE.FindById(util.ExtColl1, id, map[string]interface{}{"attach_text": 0})
  1128. if extData == nil || len(*extData) == 0 {
  1129. extData, _ = util.MgoE.FindById(util.ExtColl2, id, map[string]interface{}{"attach_text": 0, "field_source": 0})
  1130. }
  1131. //抽取表字段合并到bidding
  1132. if extData != nil && len(*extData) > 0 {
  1133. for k, v := range *extData {
  1134. if k == "publishtime" {
  1135. continue
  1136. }
  1137. (*bidData)[k] = v
  1138. }
  1139. }
  1140. //3.删除item
  1141. //删除item
  1142. delete((*bidData), "item")
  1143. //4.对比marked表,对比marked表是否已标注该数据
  1144. markData, _ := util.Mgo.FindById(util.AllToColl, id, nil)
  1145. if markData != nil && len(*markData) > 0 {
  1146. UpdateMarkColl(bidData, markData, &tagInfoMap, &baseInfoMap) //比对更新数据
  1147. } else {
  1148. baseInfoMap["i_ckdata"] = 0 //设置ck_data默认值0
  1149. //多包、中标候选人、标的信息是否抽取
  1150. //if packageMap, ok := (*bidData)["package"].(map[string]interface{}); ok && len(packageMap) > 0 {
  1151. // baseInfoMap["b_pkgisext"] = true
  1152. //} else {
  1153. // baseInfoMap["b_pkgisext"] = false
  1154. //}
  1155. //if winorderArr, ok := (*bidData)["winnerorder"].([]interface{}); ok && len(winorderArr) > 0 {
  1156. // baseInfoMap["b_wodrisext"] = true
  1157. //} else {
  1158. // baseInfoMap["b_wodrisext"] = false
  1159. //}
  1160. //if purchArr, ok := (*bidData)["purchasinglist"].([]interface{}); ok && len(purchArr) > 0 {
  1161. // baseInfoMap["b_pclisext"] = true
  1162. //} else {
  1163. // baseInfoMap["b_pclisext"] = false
  1164. //}
  1165. }
  1166. //合并导入表中客户所需的字段
  1167. if len(tmp) > 0 {
  1168. for k, v := range tmp {
  1169. (*bidData)[k] = v
  1170. }
  1171. }
  1172. //补充id
  1173. //(*bidData)["id"] = id
  1174. //if stype == "syncoll" { //同步数据时删除原始数据
  1175. // if util.MgoM.Delete(coll, `{"id":"`+id+`"}`) == 0 {
  1176. // lock.Lock()
  1177. // *msg += "同步未删除成功数据id:" + id + ";\n"
  1178. // *success = false
  1179. // lock.Unlock()
  1180. // }
  1181. //}
  1182. // 处理 package winner_all
  1183. if p, o1 := (*bidData)["package"].(map[string]interface{}); o1 {
  1184. for _, v := range p {
  1185. v1 := v.(map[string]interface{})
  1186. t := make(map[string]interface{})
  1187. if v1["winner"] != nil {
  1188. t["winner"] = v1["winner"]
  1189. }
  1190. if v1["bidamount"] != nil {
  1191. t["bidamount"] = qu.Float64All(v1["bidamount"])
  1192. }
  1193. if len(t) > 0 {
  1194. v1["winner_all"] = append([]map[string]interface{}{}, t)
  1195. }
  1196. }
  1197. }
  1198. // 补充filetext
  1199. (*bidData)["filetext"] = util.GetFileText(*bidData)
  1200. // 6.项目合并信息
  1201. project, _ := util.MgoE.Find(util.ProjectColl, bson.M{"ids": id}, nil, nil, true, -1, -1)
  1202. //esQ := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
  1203. //info := util.Es.Get("projectset", esQ)
  1204. if project != nil && len((*project)[0]) > 0 {
  1205. qu.Debug(*project)
  1206. ids := qu.ObjArrToStringArr((*project)[0]["ids"].([]interface{}))
  1207. if len(ids) > 0 {
  1208. var infolist []map[string]interface{}
  1209. for _, v := range ids {
  1210. if v == id { // 当前公告
  1211. continue
  1212. }
  1213. if v < util.BIDDINGSTARTID {
  1214. tmpBidColl = util.BidColl2 //bidding_back
  1215. }
  1216. bid, b := util.MgoB.FindById(tmpBidColl, v, nil)
  1217. if b && len(*bid) > 0 {
  1218. tmp := make(map[string]interface{})
  1219. tmp["id"] = v
  1220. tmp["title"] = (*bid)["title"]
  1221. tmp["href"] = (*bid)["href"]
  1222. tmp["toptype"] = (*bid)["toptype"]
  1223. tmp["subtype"] = (*bid)["subtype"]
  1224. tmp["publishtime"] = (*bid)["publishtime"]
  1225. tmp["detail"] = (*bid)["detail"]
  1226. tmp["filetext"] = util.GetFileText(*bid)
  1227. infolist = append(infolist, tmp)
  1228. }
  1229. }
  1230. (*bidData)["info"] = infolist
  1231. }
  1232. } else {
  1233. qu.Debug("Projectset Find Error", id)
  1234. }
  1235. baseInfoMap["id"] = id
  1236. _id := (*bidData)["_id"]
  1237. delete(*bidData, "_id")
  1238. //保存数据
  1239. baseInfoMap["_id"] = _id
  1240. baseInfoMap["v_baseinfo"] = bidData
  1241. if len(tagInfoMap) > 0 {
  1242. baseInfoMap["v_taginfo"] = tagInfoMap
  1243. }
  1244. baseInfoMap["i_createtime"] = time.Now().Unix()
  1245. baseInfoMap["b_isgivegroup"] = false //是否分配给用户组
  1246. baseInfoMap["b_istag"] = false //是否已标注
  1247. //baseInfoMap["b_cleartag"] = false //是否清理标注信息
  1248. baseInfoMap["b_isgiveuser"] = false //是否分配给用户
  1249. baseInfoMap["b_check"] = false // 质检标记
  1250. baseInfoMap["b_isEff"] = false // 标的物有效性
  1251. if newAdd {
  1252. // 新增数据带上数据达标标记
  1253. baseInfoMap["b_isprchasing"] = true
  1254. baseInfoMap["b_istagging"] = true
  1255. }
  1256. if util.Mgo.SaveByOriID(s_sourceinfo, baseInfoMap) {
  1257. atomic.AddInt64(successNum, 1) //保存成功计数
  1258. } else {
  1259. lock.Lock()
  1260. //*success = false
  1261. if importType == "excel" {
  1262. msgArr = append(msgArr, "第"+fmt.Sprint(index+2)+"行未导入id:"+id)
  1263. //*msg += "第" + fmt.Sprint(num+2) + "行未保存成功数据_id:" + id + ";\n"
  1264. } else {
  1265. msgArr = append(msgArr, "未导入id:"+id)
  1266. //*msg += "未保存成功数据_id:" + id + ";\n"
  1267. }
  1268. lock.Unlock()
  1269. }
  1270. } else {
  1271. lock.Lock()
  1272. *success = false
  1273. if importType == "excel" {
  1274. msgArr = append(msgArr, "第"+fmt.Sprint(index+2)+"行,未查询到id:"+id)
  1275. //*msg += "第" + fmt.Sprint(num+2) + "行未查询到数据:" + id + ";\n"
  1276. } else {
  1277. msgArr = append(msgArr, "未查询id:"+id)
  1278. //*msg += "未查询到数据_id:" + id + ";\n"
  1279. }
  1280. lock.Unlock()
  1281. }
  1282. }(i, data)
  1283. }
  1284. wg.Wait()
  1285. sort.Strings(msgArr)
  1286. *msg = strings.Join(msgArr, ";<br>")
  1287. }
  1288. // UpdateMarkColl 更新数据
  1289. func UpdateMarkColl(bidData, markData, tagInfoMap, baseInfoMap *map[string]interface{}) {
  1290. defer qu.Catch()
  1291. ckdata := qu.IntAll((*markData)["i_ckdata"])
  1292. v_taginfo := (*markData)["v_taginfo"].(map[string]interface{}) //标注信息
  1293. v_baseinfo := (*markData)["v_baseinfo"].(map[string]interface{}) //基本信息
  1294. for fk := range v_taginfo {
  1295. if v_baseinfo[fk] != nil {
  1296. (*bidData)[fk] = v_baseinfo[fk] //字段更新
  1297. }
  1298. }
  1299. (*tagInfoMap) = v_taginfo //marked中已有的标注信息保存到新数据上
  1300. if ckdata == 2 { //某些字段已标注
  1301. (*baseInfoMap)["i_ckdata"] = 0 //marked表中该条数据如果为字段验证,临时表ck_data:0;若为数据验证ck_data:1
  1302. } else if ckdata == 1 {
  1303. (*baseInfoMap)["i_ckdata"] = 1
  1304. }
  1305. }
  1306. func (f *Front) ProjectCheckSuc() {
  1307. defer qu.Catch()
  1308. if f.Method() == "POST" {
  1309. sourceinfo := f.GetString("s_sourceinfo")
  1310. query := map[string]interface{}{
  1311. "b_istagging": false,
  1312. }
  1313. b := util.Mgo.Update(sourceinfo, query, map[string]interface{}{"$set": map[string]interface{}{"b_istag": true}}, false, true)
  1314. f.ServeJson(map[string]interface{}{"success": b, "msg": "更新数据失败"})
  1315. }
  1316. }
  1317. func (f *Front) ProjectPassSuc() {
  1318. defer qu.Catch()
  1319. if f.Method() == "POST" {
  1320. sourceinfo := f.GetString("s_sourceinfo")
  1321. query := map[string]interface{}{
  1322. "b_istag": false,
  1323. }
  1324. b := util.Mgo.Update(sourceinfo, query, map[string]interface{}{"$set": map[string]interface{}{"b_istag": true, "i_ckdata": 2}}, false, true)
  1325. f.ServeJson(map[string]interface{}{"success": b, "msg": "更新数据失败"})
  1326. }
  1327. }
  1328. func (f *Front) ProjectTagNum() {
  1329. defer qu.Catch()
  1330. if f.Method() == "POST" {
  1331. sourceinfo := f.GetString("s_sourceinfo")
  1332. count := util.Mgo.Count(sourceinfo, map[string]interface{}{"i_ckdata": 2})
  1333. if count > 0 {
  1334. f.ServeJson(map[string]interface{}{"success": true})
  1335. } else {
  1336. f.ServeJson(map[string]interface{}{"success": false, "msg": "暂时没有可质检的数据"})
  1337. }
  1338. }
  1339. }
  1340. func (f *Front) ProjectField() {
  1341. defer qu.Catch()
  1342. if f.Method() == "POST" {
  1343. pid := f.GetString("pid")
  1344. d1 := f.GetString("fields") //配置字段
  1345. var bObj []map[string]interface{}
  1346. err := json.Unmarshal([]byte(d1), &bObj)
  1347. if err != nil {
  1348. qu.Debug("Json Unmarshal Error")
  1349. f.ServeJson(map[string]interface{}{"success": false, "msg": "解析数据失败"})
  1350. return
  1351. }
  1352. b := util.Mgo.UpdateById(util.PROJECTCOLLNAME, pid, bson.M{"$set": bson.M{"v_fields": bObj}})
  1353. if b {
  1354. f.ServeJson(map[string]interface{}{"success": b, "msg": "保存数据成功"})
  1355. } else {
  1356. f.ServeJson(map[string]interface{}{"success": b, "msg": "字段保存失败"})
  1357. }
  1358. } else {
  1359. pid := f.GetString("id")
  1360. project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, pid, map[string]interface{}{"v_fields": 1})
  1361. if (*project)["v_fields"] != nil {
  1362. fields := util.Copy(util.FieldsArr).([]map[string]interface{})
  1363. var a1 []string // 标注基本字段
  1364. a2 := make(map[string]interface{}) // 多包、标的物、采购意向、中标候选
  1365. for _, m := range qu.ObjArrToMapArr((*project)["v_fields"].([]interface{})) {
  1366. if m["child"] != nil {
  1367. if qu.ObjToString(m["key"]) == "extend" {
  1368. m["enable"] = true
  1369. f.T["diy_fields"] = m["child"]
  1370. } else {
  1371. var a []string
  1372. for _, m2 := range qu.ObjArrToMapArr(m["child"].([]interface{})) {
  1373. a = append(a, qu.ObjToString(m2["key"]))
  1374. }
  1375. a2[qu.ObjToString(m["key"])] = a
  1376. }
  1377. } else {
  1378. a1 = append(a1, qu.ObjToString(m["key"]))
  1379. }
  1380. }
  1381. for _, m := range fields {
  1382. if qu.ObjToString(m["descript"]) == "基本字段" {
  1383. if len(a1) > 0 {
  1384. m["enable"] = true
  1385. s := strings.Join(a1, ",")
  1386. for _, m2 := range m["child"].([]map[string]interface{}) {
  1387. if strings.Contains(s, qu.ObjToString(m2["key"])) {
  1388. m2["enable"] = true
  1389. }
  1390. }
  1391. }
  1392. } else {
  1393. if a3 := a2[qu.ObjToString(m["key"])]; a3 != nil {
  1394. m["enable"] = true
  1395. for _, m2 := range m["child"].([]map[string]interface{}) {
  1396. s := strings.Join(a3.([]string), ",")
  1397. if strings.Contains(s, qu.ObjToString(m2["key"])) {
  1398. m2["enable"] = true
  1399. }
  1400. }
  1401. } else {
  1402. continue
  1403. }
  1404. }
  1405. }
  1406. f.T["fields"] = fields
  1407. } else {
  1408. f.T["fields"] = util.FieldsArr
  1409. }
  1410. f.T["pid"] = pid
  1411. _ = f.Render("project/project_field.html", &f.T)
  1412. }
  1413. }
  1414. func (f *Front) ProjectData() {
  1415. defer qu.Catch() //项目id
  1416. pid := f.GetString("pid")
  1417. sourceinfo := f.GetString("sourceinfo")
  1418. if f.Method() == "POST" {
  1419. start, _ := f.GetInteger("start")
  1420. limit, _ := f.GetInteger("length")
  1421. draw, _ := f.GetInteger("draw")
  1422. searchStr := f.GetString("search[value]")
  1423. searchStr = strings.TrimSpace(searchStr)
  1424. query := map[string]interface{}{}
  1425. if searchStr != "" {
  1426. query["$or"] = []interface{}{
  1427. map[string]interface{}{"v_baseinfo.projectname": map[string]interface{}{"$regex": searchStr}},
  1428. map[string]interface{}{"v_baseinfo.title": map[string]interface{}{"$regex": searchStr}},
  1429. }
  1430. }
  1431. list, _ := util.Mgo.Find(sourceinfo, query, bson.M{"_id": 1}, nil, false, start, limit)
  1432. count := util.Mgo.Count(sourceinfo, query)
  1433. f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
  1434. } else {
  1435. f.T["pid"] = pid
  1436. f.T["sourceinfo"] = sourceinfo
  1437. _ = f.Render("project/project_data.html", &f.T)
  1438. }
  1439. }