|
@@ -5,7 +5,9 @@ import (
|
|
|
"fmt"
|
|
|
"github.com/tealeg/xlsx"
|
|
|
"io/ioutil"
|
|
|
+ "math"
|
|
|
"mime/multipart"
|
|
|
+ "mongodb"
|
|
|
qu "qfw/util"
|
|
|
"sort"
|
|
|
"strings"
|
|
@@ -52,6 +54,11 @@ func (f *Front) ProjectList() {
|
|
|
// ProjectSave 项目保存
|
|
|
func (f *Front) ProjectSave() {
|
|
|
defer qu.Catch()
|
|
|
+ s_name := f.GetString("s_name") //项目名称
|
|
|
+ if s_name == "" {
|
|
|
+ f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少项目名称字段"})
|
|
|
+ return
|
|
|
+ }
|
|
|
success := false //导入数据是否成功
|
|
|
msg := "" //异常信息
|
|
|
successNum := int64(0) //导入成功条数
|
|
@@ -60,9 +67,12 @@ func (f *Front) ProjectSave() {
|
|
|
user := f.GetSession("user").(map[string]interface{})
|
|
|
username := qu.ObjToString(user["s_name"]) //当前登录用户
|
|
|
stype := f.GetString("s_type") //新建项目类型:数据库导入、excel导入
|
|
|
- s_name := f.GetString("s_name") //项目名称
|
|
|
s_sourceinfo := f.GetString("s_sourceinfo") //数据表
|
|
|
s_sourceinfo = "f_sourceinfo_" + s_sourceinfo
|
|
|
+ if s_sourceinfo == "" {
|
|
|
+ f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少数据存储表名"})
|
|
|
+ return
|
|
|
+ }
|
|
|
s_departname, s_entname := "", ""
|
|
|
query := map[string]interface{}{
|
|
|
"s_name": s_name,
|
|
@@ -70,7 +80,11 @@ func (f *Front) ProjectSave() {
|
|
|
set := map[string]interface{}{}
|
|
|
//导入数据
|
|
|
if stype == "excel" { //excel导入
|
|
|
- s_entname = f.GetString("s_entname") //公司名称
|
|
|
+ s_entname = f.GetString("s_entname") //公司名称
|
|
|
+ if s_entname == "" {
|
|
|
+ f.ServeJson(map[string]interface{}{"success": false, "msg": "缺少公司名称字段"})
|
|
|
+ return
|
|
|
+ }
|
|
|
s_departname = f.GetString("s_departname") //部门名称
|
|
|
rulename := f.GetString("s_rulename") //规则名称
|
|
|
s_rulename = strings.Split(rulename, ",")
|
|
@@ -93,8 +107,12 @@ func (f *Front) ProjectSave() {
|
|
|
}
|
|
|
} else if stype == "coll" { //数据库导入
|
|
|
historyid := f.GetString("s_historyid")
|
|
|
+ if historyid == "" {
|
|
|
+ f.ServeJson(map[string]interface{}{"success": false, "msg": "数据导出ID字段"})
|
|
|
+ return
|
|
|
+ }
|
|
|
s_departname, s_entname, s_rulename, importDataNum = ImportDataByColl(s_sourceinfo, historyid, &success, &msg, &successNum)
|
|
|
- qu.Debug(s_departname, s_entname, s_rulename)
|
|
|
+ qu.Debug(s_departname, s_entname, s_rulename, importDataNum)
|
|
|
//保存项目信息
|
|
|
set = map[string]interface{}{
|
|
|
"s_name": s_name, //项目名称
|
|
@@ -110,6 +128,7 @@ func (f *Front) ProjectSave() {
|
|
|
"s_historyid": historyid, //源数据集标识
|
|
|
}
|
|
|
} else if stype == "edit" { //编辑保存
|
|
|
+ success = true
|
|
|
//s_entname = f.GetString("s_entname") //公司名称
|
|
|
s_departname = f.GetString("s_departname") //部门名称
|
|
|
rulename := f.GetString("s_rulename") //规则名称
|
|
@@ -135,11 +154,20 @@ func (f *Front) ProjectSave() {
|
|
|
//"i_completetime",//结束时间
|
|
|
}
|
|
|
}
|
|
|
- b := util.Mgo.Update(util.PROJECTCOLLNAME, query, map[string]interface{}{"$set": set}, true, false)
|
|
|
- qu.Debug("Create Project:", b)
|
|
|
+ if success {
|
|
|
+ success = util.Mgo.Update(util.PROJECTCOLLNAME, query, map[string]interface{}{"$set": set}, true, false)
|
|
|
+ qu.Debug("Save Project:", success)
|
|
|
+ if !success { //保存项目失败
|
|
|
+ msg = "新建项目失败\n" + msg
|
|
|
+ } else {
|
|
|
+ msg = "新建项目成功\n" + msg
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qu.Debug("Create Project:", success, "importnum:", importDataNum, "successnum:", successNum, "failnum:", int64(importDataNum)-successNum)
|
|
|
+ qu.Debug("Msg:", msg)
|
|
|
//返回信息
|
|
|
if stype == "edit" {
|
|
|
- f.ServeJson(map[string]interface{}{"success": b})
|
|
|
+ f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
|
|
|
} else {
|
|
|
f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "importnum": importDataNum, "successnum": successNum, "failnum": int64(importDataNum) - successNum})
|
|
|
}
|
|
@@ -195,8 +223,9 @@ func (f *Front) ProjectTaskList() {
|
|
|
start, _ := f.GetInteger("start")
|
|
|
limit, _ := f.GetInteger("length")
|
|
|
draw, _ := f.GetInteger("draw")
|
|
|
- query := map[string]interface{}{
|
|
|
+ query := map[string]interface{}{ //查找用户组任务
|
|
|
"s_projectid": projectid,
|
|
|
+ "s_stype": "group",
|
|
|
}
|
|
|
if status != "-1" {
|
|
|
query["s_status"] = status
|
|
@@ -208,6 +237,18 @@ func (f *Front) ProjectTaskList() {
|
|
|
}
|
|
|
list, _ := util.Mgo.Find(util.TASKCOLLNAME, query, nil, nil, false, start, limit)
|
|
|
count := util.Mgo.Count(util.TASKCOLLNAME, query)
|
|
|
+ for _, l := range *list {
|
|
|
+ if status := qu.ObjToString(l["s_status"]); status == "进行中" { //更新任务进度
|
|
|
+ groupId := qu.ObjToString(l["s_groupid"])
|
|
|
+ giveNum := qu.IntAll(l["i_givenum"])
|
|
|
+ sourceinfo := qu.ObjToString(l["s_sourceinfo"])
|
|
|
+ tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": true})
|
|
|
+ progress := fmt.Sprint(math.Ceil(float64(tagNum)/float64(giveNum))) + "%"
|
|
|
+ l["s_progress"] = progress
|
|
|
+ //同步数据库
|
|
|
+ util.Mgo.UpdateById(util.TASKCOLLNAME, l["_id"], map[string]interface{}{"$set": map[string]interface{}{"s_progress": progress}})
|
|
|
+ }
|
|
|
+ }
|
|
|
f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
|
|
|
}
|
|
|
|
|
@@ -217,6 +258,7 @@ func (f *Front) ProjectTaskSave() {
|
|
|
var groupArr []map[string]interface{}
|
|
|
var taskArr []map[string]interface{}
|
|
|
var groupIdArr []string
|
|
|
+ groupIdMap := map[string]int{}
|
|
|
success := false
|
|
|
msg := ""
|
|
|
user := f.GetSession("user").(map[string]interface{})
|
|
@@ -226,12 +268,19 @@ func (f *Front) ProjectTaskSave() {
|
|
|
sourceinfo := f.GetString("s_sourceinfo") //源数据表
|
|
|
sourcetaskinfo := "f_sourcetaskinfo_" + strings.ReplaceAll(sourceinfo, "f_sourceinfo_", "") //任务日志表
|
|
|
group := f.GetString("s_group")
|
|
|
+ stype := f.GetString("s_type")
|
|
|
if err := json.Unmarshal([]byte(group), &groupArr); err != nil {
|
|
|
qu.Debug("V_Filelds Unmarshal Failed:", err)
|
|
|
+ msg = "用户组信息解析失败"
|
|
|
} else {
|
|
|
+ if stype == "notag" { //如果分发的是达标数据且进行了初步质检,将没有质检记录的字段从v_taginfo标注记录中删除
|
|
|
+ DeleleDataTagInfo(sourceinfo)
|
|
|
+ }
|
|
|
for _, groupInfo := range groupArr {
|
|
|
groupId := qu.ObjToString(groupInfo["s_groupid"])
|
|
|
groupIdArr = append(groupIdArr, groupId)
|
|
|
+ givenum := qu.IntAll(groupInfo["i_givenum"])
|
|
|
+ groupIdMap[groupId] = givenum
|
|
|
task := map[string]interface{}{
|
|
|
"s_projectid": projectid, //项目标识
|
|
|
"s_projectname": projectname, //项目名称
|
|
@@ -240,29 +289,35 @@ func (f *Front) ProjectTaskSave() {
|
|
|
"s_personname": qu.ObjToString(groupInfo["s_personname"]), //任务负责人
|
|
|
"s_groupname": qu.ObjToString(groupInfo["s_groupname"]), //用户组名称
|
|
|
"s_groupid": groupId, //用户组标识
|
|
|
- "i_givenum": qu.ObjToString(groupInfo["i_givenum"]), //分发数据量
|
|
|
+ "i_givenum": givenum, //分发数据量
|
|
|
"s_createname": username, //创建人
|
|
|
"i_createtime": time.Now().Unix(), //创建时间
|
|
|
"s_progress": "0%", //完成进度
|
|
|
"s_sourceinfo": sourceinfo, //源数据表
|
|
|
"s_sourcetaskinfo": sourcetaskinfo, //任务日志表
|
|
|
+ "s_stype": "group", //任务类型
|
|
|
}
|
|
|
taskArr = append(taskArr, task)
|
|
|
}
|
|
|
}
|
|
|
- //分发数据后更新项目中用户组标识信息
|
|
|
- success = util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{
|
|
|
- "$push": map[string]interface{}{
|
|
|
- "v_groupids": map[string]interface{}{
|
|
|
- "$each": groupIdArr,
|
|
|
+ if len(taskArr) > 0 {
|
|
|
+ //分发数据后更新项目中用户组标识信息
|
|
|
+ success = util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{
|
|
|
+ "$push": map[string]interface{}{
|
|
|
+ "v_groupids": map[string]interface{}{
|
|
|
+ "$each": groupIdArr,
|
|
|
+ },
|
|
|
},
|
|
|
- },
|
|
|
- })
|
|
|
- if !success {
|
|
|
- msg = "更新项目:" + projectname + "用户组标识失败"
|
|
|
- } else { //分发任务
|
|
|
- success = util.Mgo.SaveBulk(util.TASKCOLLNAME, taskArr...)
|
|
|
- msg = "任务分发成功"
|
|
|
+ })
|
|
|
+ if !success {
|
|
|
+ msg = "更新项目:" + projectname + "用户组标识失败"
|
|
|
+ } else { //用户组分发任务
|
|
|
+ success = util.Mgo.SaveBulk(util.TASKCOLLNAME, taskArr...)
|
|
|
+ if success {
|
|
|
+ msg = "任务分发成功"
|
|
|
+ UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype, groupIdMap) //用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
qu.Debug("Msg:", msg)
|
|
|
f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
|
|
@@ -285,6 +340,312 @@ func (f *Front) ProjectGetEntnameList() {
|
|
|
f.ServeJson(map[string]interface{}{"entname": entnameList})
|
|
|
}
|
|
|
|
|
|
+// ProjectTaskRepulse 用户组任务打回
|
|
|
+func (f *Front) ProjectTaskRepulse() {
|
|
|
+ defer qu.Catch()
|
|
|
+ success := false
|
|
|
+ msg := ""
|
|
|
+ user := f.GetSession("user").(map[string]interface{})
|
|
|
+ username := qu.ObjToString(user["s_name"])
|
|
|
+ status := f.GetString("s_status")
|
|
|
+ taskId := f.GetString("id")
|
|
|
+ groupId := f.GetString("s_groupid")
|
|
|
+ sourceinfo := f.GetString("s_sourceinfo")
|
|
|
+ sourcetaskinfo := f.GetString("s_sourcetaskinfo")
|
|
|
+ if status == "已完成" {
|
|
|
+ //更新数据状态
|
|
|
+ //1、更新源数据表
|
|
|
+ success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId}, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "b_istag": false,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ "$unset": map[string]interface{}{
|
|
|
+ "s_userid": "",
|
|
|
+ },
|
|
|
+ }, false, true)
|
|
|
+ //2、删除临时任务表中对应数据
|
|
|
+ success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId})
|
|
|
+ if success1 && success2 {
|
|
|
+ //清除最迟完成时间,更新任务状态
|
|
|
+ success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_status": "未开始",
|
|
|
+ "s_updateperson": username,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ "s_progress": "0%",
|
|
|
+ },
|
|
|
+ "$unset": map[string]interface{}{
|
|
|
+ "i_completetime": "",
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if !success {
|
|
|
+ msg += "更新任务:" + taskId + "失败"
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ qu.Debug("Update "+sourceinfo+":", success1, " Delete "+sourcetaskinfo+":", success2)
|
|
|
+ if !success1 {
|
|
|
+ msg += "更新" + sourceinfo + "数据失败;"
|
|
|
+ }
|
|
|
+ if !success2 {
|
|
|
+ msg += "删除" + sourcetaskinfo + "数据失败;"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qu.Debug("Task Repulse:", success, " Msg:", msg)
|
|
|
+ f.ServeJson(map[string]interface{}{"success": success, "msg": msg})
|
|
|
+}
|
|
|
+
|
|
|
+// ProjectTaskRetrieve 用户组任务收回
|
|
|
+func (f *Front) ProjectTaskRetrieve() {
|
|
|
+ defer qu.Catch()
|
|
|
+ success := false
|
|
|
+ msg := ""
|
|
|
+ num := 0
|
|
|
+ user := f.GetSession("user").(map[string]interface{})
|
|
|
+ username := qu.ObjToString(user["s_name"])
|
|
|
+ status := f.GetString("s_status")
|
|
|
+ taskId := f.GetString("id")
|
|
|
+ groupId := f.GetString("s_groupid")
|
|
|
+ sourceinfo := f.GetString("s_sourceinfo")
|
|
|
+ sourcetaskinfo := f.GetString("s_sourcetaskinfo")
|
|
|
+ if status == "已完成" {
|
|
|
+ count1 := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false})
|
|
|
+ count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
|
|
|
+ if count1 != count2 { //数据源表和临时表数量不一致
|
|
|
+ qu.Debug("Count Is Not Same:", sourceinfo+":", count1, sourceinfo+":", count2)
|
|
|
+ }
|
|
|
+ num = count1
|
|
|
+ if count1 == 0 { //收回数据量为0
|
|
|
+ success = true
|
|
|
+ //更新任务状态、完成时间
|
|
|
+ success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_status": "已完成",
|
|
|
+ "s_updateperson": username,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ "i_completetime": time.Now().Unix(),
|
|
|
+ "s_progress": "100%",
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if !success {
|
|
|
+ msg += "更新任务:" + taskId + "失败"
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //1、更新源数据表
|
|
|
+ success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false}, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "b_isgive": false,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ "$unset": map[string]interface{}{
|
|
|
+ "s_groupid": "",
|
|
|
+ },
|
|
|
+ }, false, true)
|
|
|
+ //2、删除临时任务表中对应未完成数据
|
|
|
+ success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
|
|
|
+ if success1 && success2 {
|
|
|
+ //更新任务状态、完成时间
|
|
|
+ success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_status": "已完成",
|
|
|
+ "s_updateperson": username,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ "i_completetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if !success {
|
|
|
+ msg += "更新任务:" + taskId + "失败"
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ qu.Debug("Update "+sourceinfo+":", success1, " Delete "+sourcetaskinfo+":", success2)
|
|
|
+ if !success1 {
|
|
|
+ msg += "更新" + sourceinfo + "数据失败;"
|
|
|
+ }
|
|
|
+ if !success2 {
|
|
|
+ msg += "删除" + sourcetaskinfo + "数据失败;"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ qu.Debug("Task Retrieve:", success, " num:", num, " Msg:", msg)
|
|
|
+ f.ServeJson(map[string]interface{}{"success": success, "msg": msg, "num": num})
|
|
|
+}
|
|
|
+
|
|
|
+// ProjectTaskClose 用户组任务关闭
|
|
|
+func (f *Front) ProjectTaskClose() {
|
|
|
+ defer qu.Catch()
|
|
|
+ success := false
|
|
|
+ user := f.GetSession("user").(map[string]interface{})
|
|
|
+ username := qu.ObjToString(user["s_name"])
|
|
|
+ status := f.GetString("s_status")
|
|
|
+ taskId := f.GetString("id")
|
|
|
+ if status == "已完成" {
|
|
|
+ //更新任务状态
|
|
|
+ success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_status": "已关闭",
|
|
|
+ "s_updateperson": username,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ "s_progress": "100%",
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ f.ServeJson(map[string]interface{}{"success": success})
|
|
|
+}
|
|
|
+
|
|
|
+// DeleleDataTagInfo 删除标注记录
|
|
|
+func DeleleDataTagInfo(sourceinfo string) {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.Mgo.GetMgoConn()
|
|
|
+ defer util.Mgo.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ query := map[string]interface{}{ //达标数据可能会分发后收回、打回再分发
|
|
|
+ "b_istagging": false, //达标数据
|
|
|
+ "b_cleartag": false, //未进行一次标注信息清理
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "v_taginfo": 1,
|
|
|
+ "v_check": 1,
|
|
|
+ }
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Iter()
|
|
|
+ count, _ := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Count()
|
|
|
+ qu.Debug("Find Needs To Clearn Data Count:", count)
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ tagInfo, _ := tmp["v_taginfo"].(map[string]interface{})
|
|
|
+ checkInfo, _ := tmp["v_check"].(map[string]interface{})
|
|
|
+ if len(tagInfo) == 0 || len(checkInfo) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for f, _ := range tagInfo {
|
|
|
+ if checkInfo[f] == nil {
|
|
|
+ delete(tagInfo, f)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "v_taginfo": tagInfo,
|
|
|
+ "b_cleartag": true,
|
|
|
+ },
|
|
|
+ })
|
|
|
+ lock.Lock()
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ if len(updateArr) > 500 {
|
|
|
+ util.Mgo.UpdateBulk(sourceinfo, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ util.Mgo.UpdateBulk(sourceinfo, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateSourceinfo 用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
|
|
|
+func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdMap map[string]int) {
|
|
|
+ defer qu.Catch()
|
|
|
+ for groupid, num := range groupIdMap {
|
|
|
+ sess := util.Mgo.GetMgoConn()
|
|
|
+ defer util.Mgo.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ query := map[string]interface{}{ //查找未分配对应stype的数据分发
|
|
|
+ "b_isgive": false,
|
|
|
+ }
|
|
|
+ if stype == "notag" { //达标数据
|
|
|
+ query["b_istagging"] = false
|
|
|
+ } else if stype == "tag" { //未达标数据
|
|
|
+ query["b_istagging"] = true
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "title": 1,
|
|
|
+ }
|
|
|
+ saveArr := []map[string]interface{}{}
|
|
|
+ updateArr := [][]map[string]interface{}{}
|
|
|
+ qu.Debug("Query:", query)
|
|
|
+ it := sess.DB(util.Mgo.DbName).C(sourceinfo).Find(&query).Select(&fields).Limit(int64(num)).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ title := qu.ObjToString(tmp["title"])
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ update = append(update, map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "s_groupid": groupid,
|
|
|
+ "b_isgive": true,
|
|
|
+ "i_updatetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ })
|
|
|
+ save := map[string]interface{}{
|
|
|
+ "s_infoid": id,
|
|
|
+ "s_infotitle": title,
|
|
|
+ "s_groupid": groupid,
|
|
|
+ "i_createtime": time.Now().Unix(),
|
|
|
+ "s_complete": false,
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ updateArr = append(updateArr, update)
|
|
|
+ saveArr = append(saveArr, save)
|
|
|
+ if len(updateArr) > 500 {
|
|
|
+ util.Mgo.UpdateBulk(sourceinfo, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(saveArr) > 500 {
|
|
|
+ util.Mgo.SaveBulk(sourcetaskinfo, saveArr...)
|
|
|
+ saveArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ qu.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(updateArr) > 0 {
|
|
|
+ util.Mgo.UpdateBulk(sourceinfo, updateArr...)
|
|
|
+ updateArr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(saveArr) > 0 {
|
|
|
+ util.Mgo.SaveBulk(sourcetaskinfo, saveArr...)
|
|
|
+ saveArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//ImportDataByExcel 通过excel获取数据源
|
|
|
func ImportDataByExcel(s_sourceinfo string, mf multipart.File, success *bool, msg *string, successNum *int64) (importDataNum int) {
|
|
|
defer qu.Catch()
|
|
@@ -390,7 +751,7 @@ func ImportDataByColl(s_sourceinfo, historyid string, success *bool, msg *string
|
|
|
idInfoMap[id] = needField
|
|
|
lock.Unlock()
|
|
|
}(tmp)
|
|
|
- if n%1000 == 0 {
|
|
|
+ if n%100 == 0 {
|
|
|
qu.Debug("current:", n)
|
|
|
}
|
|
|
tmp = map[string]interface{}{}
|
|
@@ -556,13 +917,14 @@ func GetDataById(idsInfo map[string]map[string]interface{}, importType, s_source
|
|
|
delete(*bidData, "_id")
|
|
|
//保存数据
|
|
|
baseInfoMap["_id"] = _id
|
|
|
- baseInfoMap["v_datainfo"] = bidData
|
|
|
+ baseInfoMap["v_baseinfo"] = bidData
|
|
|
if len(tagInfoMap) > 0 {
|
|
|
baseInfoMap["v_taginfo"] = tagInfoMap
|
|
|
}
|
|
|
baseInfoMap["i_createtime"] = time.Now().Unix()
|
|
|
- baseInfoMap["b_isgive"] = false //是否分配
|
|
|
- baseInfoMap["b_istag"] = false //是否已标注
|
|
|
+ baseInfoMap["b_isgive"] = false //是否分配
|
|
|
+ baseInfoMap["b_istag"] = false //是否已标注
|
|
|
+ baseInfoMap["b_cleartag"] = false //是否清理标注信息
|
|
|
|
|
|
if util.Mgo.SaveByOriID(s_sourceinfo, baseInfoMap) {
|
|
|
atomic.AddInt64(successNum, 1) //保存成功计数
|