Browse Source

Merge branch 'dev2.0' of http://192.168.3.207:8080/data_processing/data_validation into dev2.0

* 'dev2.0' of http://192.168.3.207:8080/data_processing/data_validation:
  用户分发任务
Jianghan 3 years ago
parent
commit
d9e9c3779f
4 changed files with 269 additions and 62 deletions
  1. 4 1
      src/front/front.go
  2. 90 55
      src/front/project.go
  3. 174 5
      src/front/user.go
  4. 1 1
      src/util/common.go

+ 4 - 1
src/front/front.go

@@ -63,8 +63,11 @@ type Front struct {
 	projectTaskClose      xweb.Mapper `xweb:"/front/project/task/close"`     //用户组任务关闭
 	projectGetEntnameList xweb.Mapper `xweb:"/front/project/getEntnameList"` //模糊查询公司名称
 
+	userTaskSave xweb.Mapper `xweb:"/front/user/task/save"` //用户任务分发
+	userTaskList xweb.Mapper `xweb:"/front/user/task/list"` //用户任务列表
+
 	groupTaskList xweb.Mapper `xweb:"/front/group/task/list"` //用户组任务列表
-	userTaskList  xweb.Mapper `xweb:"/front/user/task/list"`  //用户任务列表
+
 }
 
 func (f *Front) Index() {

+ 90 - 55
src/front/project.go

@@ -192,6 +192,7 @@ func (f *Front) ProjectClear() {
 			f.ServeJson("查询项目信息失败")
 			return
 		}
+
 		sourceinfo := qu.ObjToString((*project)["s_sourceinfo"])                                                       //数据源表
 		noTagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false})                    //达标数据总量
 		noTagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgive": true}) //达标数据已分发量
@@ -227,40 +228,74 @@ func (f *Front) ProjectClear() {
 // ProjectTaskList 用户组任务分发列表
 func (f *Front) ProjectTaskList() {
 	defer qu.Catch()
-	projectid := f.GetString("s_projectid") //项目id
-	status := f.GetString("s_status")       //任务状态
-	searchStr := f.GetString("search[value]")
-	search := strings.TrimSpace(searchStr)
-	start, _ := f.GetInteger("start")
-	limit, _ := f.GetInteger("length")
-	draw, _ := f.GetInteger("draw")
-	query := map[string]interface{}{ //查找用户组任务
-		"s_projectid": projectid,
-		"s_stype":     "group",
-	}
-	if status != "-1" {
-		query["s_status"] = status
-	}
-	if search != "" {
-		query["$or"] = []interface{}{
-			map[string]interface{}{"s_groupname": map[string]interface{}{"$regex": search}},
-		}
-	}
-	list, _ := util.Mgo.Find(util.TASKCOLLNAME, query, nil, nil, false, start, limit)
-	count := util.Mgo.Count(util.TASKCOLLNAME, query)
-	for _, l := range *list {
-		if status := qu.ObjToString(l["s_status"]); status == "进行中" { //更新任务进度
-			groupId := qu.ObjToString(l["s_groupid"])
-			giveNum := qu.IntAll(l["i_givenum"])
-			sourceinfo := qu.ObjToString(l["s_sourceinfo"])
-			tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": true})
-			progress := fmt.Sprint(math.Ceil(float64(tagNum)/float64(giveNum))) + "%"
-			l["s_progress"] = progress
-			//同步数据库
-			util.Mgo.UpdateById(util.TASKCOLLNAME, l["_id"], map[string]interface{}{"$set": map[string]interface{}{"s_progress": progress}})
-		}
-	}
-	f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
+	projectid := f.GetString("pid") //项目id
+	if f.Method() == "POST" {
+		status := f.GetString("s_status") //任务状态
+		searchStr := f.GetString("search[value]")
+		search := strings.TrimSpace(searchStr)
+		start, _ := f.GetInteger("start")
+		limit, _ := f.GetInteger("length")
+		draw, _ := f.GetInteger("draw")
+		query := map[string]interface{}{ //查找用户组任务
+			"s_projectid": projectid,
+			"s_stype":     "group",
+		}
+		if status != "-1" {
+			query["s_status"] = status
+		}
+		if search != "" {
+			query["$or"] = []interface{}{
+				map[string]interface{}{"s_groupname": map[string]interface{}{"$regex": search}},
+			}
+		}
+		list, _ := util.Mgo.Find(util.TASKCOLLNAME, query, nil, nil, false, start, limit)
+		count := util.Mgo.Count(util.TASKCOLLNAME, query)
+		for _, l := range *list {
+			if status := qu.ObjToString(l["s_status"]); status == "进行中" { //更新任务进度
+				groupId := qu.ObjToString(l["s_groupid"])
+				giveNum := qu.IntAll(l["i_givenum"])
+				sourceinfo := qu.ObjToString(l["s_sourceinfo"])
+				tagNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": true})
+				progress := fmt.Sprint(math.Ceil(float64(tagNum)/float64(giveNum))) + "%"
+				l["s_progress"] = progress
+				//同步数据库
+				util.Mgo.UpdateById(util.TASKCOLLNAME, l["_id"], map[string]interface{}{"$set": map[string]interface{}{"s_progress": progress}})
+			}
+		}
+		f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
+	} else {
+		project, _ := util.Mgo.FindById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"s_status": 1, "s_sourceinfo": 1})
+		if project != nil && len(*project) > 0 {
+			if status := qu.ObjToString((*project)["s_status"]); status == "未开始" {
+				//TODO:调用数据质量评估接口
+				//点击清洗更新项目状态为进行中
+				b := util.Mgo.UpdateById(util.PROJECTCOLLNAME, projectid, map[string]interface{}{"$set": map[string]interface{}{"s_status": "进行中", "i_starttime": time.Now().Unix()}})
+				qu.Debug("Update Porject:"+projectid+"	Status Success:", b)
+			}
+		}
+		sourceinfo := qu.ObjToString((*project)["s_sourceinfo"])                                                       //数据源表
+		noTagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false})                    //达标数据总量
+		noTagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgive": true}) //达标数据已分发量
+		noTagNoGiveDataNum := noTagAllDataNum - noTagGiveDataNum                                                       //达标待分发量
+		tagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true})                       //未达标数据总量
+		tagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_isgive": true})    //未达标数据已分发量
+		tagNoGiveDataNum := tagAllDataNum - tagGiveDataNum                                                             //未达标待分发量
+		allGiveDataNum := noTagGiveDataNum + tagGiveDataNum                                                            //总分发量
+		allNoGiveDataNum := noTagNoGiveDataNum + tagNoGiveDataNum                                                      //总待分发量
+		allDataNum := allGiveDataNum + allNoGiveDataNum
+		f.T["s_projectid"] = projectid
+		f.T["allDataNum"] = allDataNum
+		f.T["noTagAllDataNum"] = noTagAllDataNum
+		f.T["noTagGiveDataNum"] = noTagGiveDataNum
+		f.T["noTagNoGiveDataNum"] = noTagNoGiveDataNum
+		f.T["tagAllDataNum"] = tagAllDataNum
+		f.T["tagGiveDataNum"] = tagGiveDataNum
+		f.T["tagNoGiveDataNum"] = tagNoGiveDataNum
+		f.T["allGiveDataNum"] = allGiveDataNum
+		f.T["allNoGiveDataNum"] = allNoGiveDataNum
+		_ = f.Render("project/project_clear.html", &f.T)
+	}
+
 }
 
 // ProjectTaskSave 用户组任务分发
@@ -281,7 +316,7 @@ func (f *Front) ProjectTaskSave() {
 	group := f.GetString("s_group")
 	stype := f.GetString("s_type")
 	if err := json.Unmarshal([]byte(group), &groupArr); err != nil {
-		qu.Debug("V_Filelds Unmarshal Failed:", err)
+		qu.Debug("GroupInfo Unmarshal Failed:", err)
 		msg = "用户组信息解析失败"
 	} else {
 		if stype == "notag" { //如果分发的是达标数据且进行了初步质检,将没有质检记录的字段从v_taginfo标注记录中删除
@@ -291,16 +326,15 @@ func (f *Front) ProjectTaskSave() {
 			groupId := qu.ObjToString(groupInfo["s_groupid"])
 			groupIdArr = append(groupIdArr, groupId)
 			givenum := qu.IntAll(groupInfo["i_givenum"])
-			//groupIdMap[groupId] = givenum
-			_id := primitive.NewObjectID()
-			sid := mongodb.BsonIdToSId(_id)
+			groupTaskId := primitive.NewObjectID()
+			groupTaskIdStr := mongodb.BsonIdToSId(groupTaskId)
 			gt := util.Task{
-				GroupId: groupId,
+				UserId:  groupId,
 				GiveNum: givenum,
 			}
-			groupIdTask[sid] = gt
+			groupIdTask[groupTaskIdStr] = gt
 			groupTask := map[string]interface{}{
-				"_id":              _id,                                       //生成任务id
+				"_id":              groupTaskId,                               //生成任务id
 				"s_projectid":      projectid,                                 //项目标识
 				"s_projectname":    projectname,                               //项目名称
 				"s_status":         "未开始",                                     //任务状态
@@ -432,7 +466,7 @@ func (f *Front) ProjectTaskRetrieve() {
 	sourcetaskinfo := f.GetString("s_sourcetaskinfo")
 	if status == "已完成" {
 		count1 := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false})
-		count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
+		count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "b_iscomplete": false})
 		if count1 != count2 { //数据源表和临时表数量不一致
 			qu.Debug("Count Is Not Same:", sourceinfo+":", count1, sourceinfo+":", count2)
 		}
@@ -456,15 +490,15 @@ func (f *Front) ProjectTaskRetrieve() {
 			//1、更新源数据表
 			success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false}, map[string]interface{}{
 				"$set": map[string]interface{}{
-					"b_isgive":     false,
-					"i_updatetime": time.Now().Unix(),
+					"b_isgivegroup": false,
+					"i_updatetime":  time.Now().Unix(),
 				},
 				"$unset": map[string]interface{}{
 					"s_groupid": "",
 				},
 			}, false, true)
 			//2、删除临时任务表中对应未完成数据
-			success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
+			success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "b_iscomplete": false})
 			if success1 && success2 {
 				//更新任务状态、完成时间
 				success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
@@ -588,7 +622,7 @@ func DeleleDataTagInfo(sourceinfo string) {
 func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[string]util.Task) {
 	defer qu.Catch()
 	for groupTaskId, tInfo := range groupIdInfo {
-		groupid := tInfo.GroupId
+		groupId := tInfo.UserId
 		num := tInfo.GiveNum
 		sess := util.Mgo.GetMgoConn()
 		defer util.Mgo.DestoryMongoConn(sess)
@@ -596,7 +630,7 @@ func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[
 		wg := &sync.WaitGroup{}
 		lock := &sync.Mutex{}
 		query := map[string]interface{}{ //查找未分配对应stype的数据分发
-			"b_isgive": false,
+			"b_isgivegroup": false,
 		}
 		if stype == "notag" { //达标数据
 			query["b_istagging"] = false
@@ -625,18 +659,19 @@ func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[
 				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
 				update = append(update, map[string]interface{}{
 					"$set": map[string]interface{}{
-						"s_groupid":    groupid,
-						"b_isgive":     true,
-						"i_updatetime": time.Now().Unix(),
+						"s_groupid":     groupId,
+						"b_isgivegroup": true,
+						"i_updatetime":  time.Now().Unix(),
 					},
 				})
 				save := map[string]interface{}{
 					"s_infoid":      id,
 					"s_infotitle":   title,
-					"s_groupid":     groupid,
+					"s_groupid":     groupId,
 					"i_createtime":  time.Now().Unix(),
-					"s_complete":    false,
+					"b_iscomplete":  false,
 					"s_grouptaskid": groupTaskId,
+					"b_isgiveuser":  false,
 				}
 				lock.Lock()
 				updateArr = append(updateArr, update)
@@ -946,9 +981,9 @@ func GetDataById(idsInfo map[string]map[string]interface{}, importType, s_source
 					baseInfoMap["v_taginfo"] = tagInfoMap
 				}
 				baseInfoMap["i_createtime"] = time.Now().Unix()
-				baseInfoMap["b_isgive"] = false   //是否分配
-				baseInfoMap["b_istag"] = false    //是否已标注
-				baseInfoMap["b_cleartag"] = false //是否清理标注信息
+				baseInfoMap["b_isgivegroup"] = false //是否分配
+				baseInfoMap["b_istag"] = false       //是否已标注
+				baseInfoMap["b_cleartag"] = false    //是否清理标注信息
 
 				if util.Mgo.SaveByOriID(s_sourceinfo, baseInfoMap) {
 					atomic.AddInt64(successNum, 1) //保存成功计数

+ 174 - 5
src/front/user.go

@@ -1,13 +1,16 @@
 package front
 
 import (
+	"encoding/json"
 	"github.com/dchest/captcha"
 	"github.com/gorilla/sessions"
 	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	mgo "mongodb"
 	qu "qfw/util"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 	. "util"
 )
@@ -323,18 +326,184 @@ func (f *Front) GroupList() {
 	}
 }
 
+//用户任务分发
+func (f *Front) UserTaskSave() {
+	defer qu.Catch()
+	user := f.GetSession("user").(map[string]interface{})
+	username := qu.ObjToString(user["s_name"])
+	var taskArr []map[string]interface{}
+	userTaskIdInfo := map[string]Task{}
+	success := false
+	msg := ""
+	groupTaskId := f.GetString("grouptaskid") //用户组任务id
+	groupTask, _ := Mgo.FindById(TASKCOLLNAME, groupTaskId, nil)
+	if len(*groupTask) == 0 {
+		qu.Debug("GroupTask Find Error:", groupTaskId)
+		msg = "用户组任务:" + groupTaskId + "查询失败"
+		f.ServeJson(map[string]interface{}{"success": false, "msg": msg})
+		return
+	}
+	sourcetaskinfo := qu.ObjToString((*groupTask)["s_sourcetaskinfo"])
+	userNums := f.GetString("usernums")
+	var userArr []map[string]interface{}
+	if err := json.Unmarshal([]byte(userNums), &userArr); err != nil {
+		qu.Debug("UserInfo Unmarshal Failed:", err)
+		msg = "用户信息解析失败"
+	} else {
+		for _, userInfo := range userArr {
+			userid := qu.ObjToString(userInfo["s_userid"])
+			name := qu.ObjToString(userInfo["s_name"])
+			login := qu.ObjToString(userInfo["s_login"])
+			givenum := qu.IntAll(userInfo["i_givenum"])
+			userTaskId := primitive.NewObjectID()
+			userTaskIdStr := mgo.BsonIdToSId(userTaskId)
+			ut := Task{
+				UserId:  userid,
+				GiveNum: givenum,
+			}
+			userTaskIdInfo[userTaskIdStr] = ut
+			userTask := map[string]interface{}{
+				"_id":              userTaskId,                                    //生成任务id
+				"s_projectid":      qu.ObjToString((*groupTask)["s_projectid"]),   //项目标识
+				"s_projectname":    qu.ObjToString((*groupTask)["s_projectname"]), //项目名称
+				"s_status":         "未开始",                                         //任务状态
+				"s_personid":       userid,                                        //任务负责人标识
+				"s_personname":     name,                                          //任务负责人
+				"s_login":          login,                                         //用户账号
+				"s_groupname":      qu.ObjToString((*groupTask)["s_groupname"]),   //用户组名称
+				"s_groupid":        qu.ObjToString((*groupTask)["s_groupid"]),     //用户组标识
+				"i_givenum":        givenum,                                       //分发数据量
+				"s_createname":     username,                                      //创建人
+				"i_createtime":     time.Now().Unix(),                             //创建时间
+				"s_progress":       "0%",                                          //完成进度
+				"s_sourceinfo":     qu.ObjToString((*groupTask)["s_sourceinfo"]),  //源数据表
+				"s_sourcetaskinfo": sourcetaskinfo,                                //任务日志表
+				"s_stype":          "user",                                        //任务类型
+				"s_parentid":       groupTaskId,                                   //父任务及用户组任务id
+			}
+			taskArr = append(taskArr, userTask)
+		}
+	}
+	if len(taskArr) > 0 {
+		success = Mgo.SaveBulk(TASKCOLLNAME, taskArr...) //用户分发任务
+		if success {
+			//用户分发任务后更新该用户组任务的状态和开始时间
+			Mgo.Update(TASKCOLLNAME,
+				map[string]interface{}{
+					"_id":      mgo.StringTOBsonId(groupTaskId),
+					"s_status": "未开始",
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"s_status":    "进行中",
+						"i_starttime": time.Now().Unix(),
+					}},
+				false, false)
+			msg = "任务分发成功"
+			UpdateSourceTaskInfo(sourcetaskinfo, groupTaskId, userTaskIdInfo) //用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
+		}
+	}
+}
+
 // UerTaskList 用户任务分发列表
 func (f *Front) UerTaskList() {
 	defer qu.Catch()
 	//groupId := f.GetString("s_groupid") //用户组id
-	taskId := f.GetString("id") //用户组任务id
+	groupTaskId := f.GetString("grouptaskid") //用户组任务id
 	if f.Method() == "POST" {
-
+		start, _ := f.GetInteger("start")
+		limit, _ := f.GetInteger("length")
+		draw, _ := f.GetInteger("draw")
+		status := f.GetString("status")
+		login := f.GetString("login")
+		//searchStr := f.GetString("search[value]")
+		//search := strings.TrimSpace(searchStr)
+		query := map[string]interface{}{
+			"s_parentid": groupTaskId,
+		}
+		if status != "-1" { //任务状态
+			query["s_status"] = status
+		}
+		if login != "-1" { //用户账号
+			query["s_login"] = login
+		}
+		list, _ := Mgo.Find(TASKCOLLNAME, query, nil, nil, false, start, limit)
+		count := Mgo.Count(TASKCOLLNAME, query)
+		f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
 	} else {
-		//f.T["s_groupid"] = groupId
-		f.T["grouptaskid"] = taskId //用户组任务id
+		sourcetaskinfo := f.GetString("s_sourcetaskinfo")
 		//统计数据量
-
+		isGiveNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_isgiveuser": true})     //已分发量
+		isNotGiveNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_isgiveuser": false}) //待分发量
+		isTagNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_iscomplete": true})      //已标注数量
+		isNotTagNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_iscomplete": false})  //未标注数量
+		allNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId})                              //数据总量
+		f.T["grouptaskid"] = groupTaskId
+		f.T["allNum"] = allNum
+		f.T["isGiveNum"] = isGiveNum
+		f.T["isNotGiveNum"] = isNotGiveNum
+		f.T["isTagNum"] = isTagNum
+		f.T["isNotTagNum"] = isNotTagNum
 		_ = f.Render("/user/task.html", &f.T)
 	}
 }
+
+// UpdateSourceTaskInfo 用户分发任务成功后更新临时任务表
+func UpdateSourceTaskInfo(sourcetaskinfo, groupTaskId string, userTaskIdInfo map[string]Task) {
+	defer qu.Catch()
+	for userTaskId, tInfo := range userTaskIdInfo {
+		userId := tInfo.UserId
+		num := tInfo.GiveNum
+		sess := Mgo.GetMgoConn()
+		defer Mgo.DestoryMongoConn(sess)
+		ch := make(chan bool, 5)
+		wg := &sync.WaitGroup{}
+		lock := &sync.Mutex{}
+		query := map[string]interface{}{ //查找用户组任务id关联的临时数据
+			"s_grouptaskid": groupTaskId,
+			"b_isgiveuser":  false,
+		}
+		updateArr := [][]map[string]interface{}{}
+		qu.Debug("Query:", query)
+		it := sess.DB(Mgo.DbName).C(sourcetaskinfo).Find(&query).Limit(int64(num)).Iter()
+		n := 0
+		for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+			ch <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-ch
+					wg.Done()
+				}()
+				update := []map[string]interface{}{}
+				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+				update = append(update, map[string]interface{}{
+					"$set": map[string]interface{}{
+						"s_usertaskid": userTaskId,
+						"s_userid":     userId,
+						"b_isgiveuser": true,
+						"i_updatetime": time.Now().Unix(),
+					},
+				})
+				lock.Lock()
+				updateArr = append(updateArr, update)
+				if len(updateArr) > 500 {
+					Mgo.UpdateBulk(sourcetaskinfo, updateArr...)
+					updateArr = [][]map[string]interface{}{}
+				}
+				lock.Unlock()
+			}(tmp)
+			if n%100 == 0 {
+				qu.Debug("current:", n)
+			}
+			tmp = map[string]interface{}{}
+		}
+		wg.Wait()
+		lock.Lock()
+		if len(updateArr) > 0 {
+			Mgo.UpdateBulk(sourcetaskinfo, updateArr...)
+			updateArr = [][]map[string]interface{}{}
+		}
+		lock.Unlock()
+	}
+}

+ 1 - 1
src/util/common.go

@@ -9,7 +9,7 @@ import (
 )
 
 type Task struct {
-	GroupId string
+	UserId  string
 	GiveNum int
 }