Ver Fonte

用户分发任务

maxiaoshan há 3 anos atrás
pai
commit
a684e64d7c
4 ficheiros alterados com 206 adições e 36 exclusões
  1. 1 0
      src/front/front.go
  2. 30 30
      src/front/project.go
  3. 174 5
      src/front/user.go
  4. 1 1
      src/util/common.go

+ 1 - 0
src/front/front.go

@@ -63,6 +63,7 @@ type Front struct {
 	projectTaskClose      xweb.Mapper `xweb:"/front/project/task/close"`     //用户组任务关闭
 	projectGetEntnameList xweb.Mapper `xweb:"/front/project/getEntnameList"` //模糊查询公司名称
 
+	userTaskSave xweb.Mapper `xweb:"/front/user/task/save"` //用户任务分发
 	userTaskList xweb.Mapper `xweb:"/front/user/task/list"` //用户任务列表
 }
 

+ 30 - 30
src/front/project.go

@@ -192,15 +192,15 @@ func (f *Front) ProjectClear() {
 			f.ServeJson("查询项目信息失败")
 			return
 		}
-		sourceinfo := f.GetString("s_sourceinfo")                                                                      //数据源表
-		noTagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false})                    //达标数据总量
-		noTagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgive": true}) //达标数据已分发量
-		noTagNoGiveDataNum := noTagAllDataNum - noTagGiveDataNum                                                       //达标待分发量
-		tagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true})                       //未达标数据总量
-		tagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_isgive": true})    //未达标数据已分发量
-		tagNoGiveDataNum := tagAllDataNum - tagGiveDataNum                                                             //未达标待分发量
-		allGiveDataNum := noTagGiveDataNum + tagGiveDataNum                                                            //总分发量
-		allNoGiveDataNum := noTagNoGiveDataNum + tagNoGiveDataNum                                                      //总待分发量
+		sourceinfo := f.GetString("s_sourceinfo")                                                                           //数据源表
+		noTagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false})                         //达标数据总量
+		noTagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": false, "b_isgivegroup": true}) //达标数据已分发量
+		noTagNoGiveDataNum := noTagAllDataNum - noTagGiveDataNum                                                            //达标待分发量
+		tagAllDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true})                            //未达标数据总量
+		tagGiveDataNum := util.Mgo.Count(sourceinfo, map[string]interface{}{"b_istagging": true, "b_isgivegroup": true})    //未达标数据已分发量
+		tagNoGiveDataNum := tagAllDataNum - tagGiveDataNum                                                                  //未达标待分发量
+		allGiveDataNum := noTagGiveDataNum + tagGiveDataNum                                                                 //总分发量
+		allNoGiveDataNum := noTagNoGiveDataNum + tagNoGiveDataNum                                                           //总待分发量
 		allDataNum := allGiveDataNum + allNoGiveDataNum
 
 		f.ServeJson(map[string]interface{}{"allDataNum": allDataNum})
@@ -279,7 +279,7 @@ func (f *Front) ProjectTaskSave() {
 	group := f.GetString("s_group")
 	stype := f.GetString("s_type")
 	if err := json.Unmarshal([]byte(group), &groupArr); err != nil {
-		qu.Debug("V_Filelds Unmarshal Failed:", err)
+		qu.Debug("GroupInfo Unmarshal Failed:", err)
 		msg = "用户组信息解析失败"
 	} else {
 		if stype == "notag" { //如果分发的是达标数据且进行了初步质检,将没有质检记录的字段从v_taginfo标注记录中删除
@@ -289,16 +289,15 @@ func (f *Front) ProjectTaskSave() {
 			groupId := qu.ObjToString(groupInfo["s_groupid"])
 			groupIdArr = append(groupIdArr, groupId)
 			givenum := qu.IntAll(groupInfo["i_givenum"])
-			//groupIdMap[groupId] = givenum
-			_id := primitive.NewObjectID()
-			sid := mongodb.BsonIdToSId(_id)
+			groupTaskId := primitive.NewObjectID()
+			groupTaskIdStr := mongodb.BsonIdToSId(groupTaskId)
 			gt := util.Task{
-				GroupId: groupId,
+				UserId:  groupId,
 				GiveNum: givenum,
 			}
-			groupIdTask[sid] = gt
+			groupIdTask[groupTaskIdStr] = gt
 			groupTask := map[string]interface{}{
-				"_id":              _id,                                       //生成任务id
+				"_id":              groupTaskId,                               //生成任务id
 				"s_projectid":      projectid,                                 //项目标识
 				"s_projectname":    projectname,                               //项目名称
 				"s_status":         "未开始",                                     //任务状态
@@ -430,7 +429,7 @@ func (f *Front) ProjectTaskRetrieve() {
 	sourcetaskinfo := f.GetString("s_sourcetaskinfo")
 	if status == "已完成" {
 		count1 := util.Mgo.Count(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false})
-		count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
+		count2 := util.Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "b_iscomplete": false})
 		if count1 != count2 { //数据源表和临时表数量不一致
 			qu.Debug("Count Is Not Same:", sourceinfo+":", count1, sourceinfo+":", count2)
 		}
@@ -454,15 +453,15 @@ func (f *Front) ProjectTaskRetrieve() {
 			//1、更新源数据表
 			success1 := util.Mgo.Update(sourceinfo, map[string]interface{}{"s_groupid": groupId, "b_istag": false}, map[string]interface{}{
 				"$set": map[string]interface{}{
-					"b_isgive":     false,
-					"i_updatetime": time.Now().Unix(),
+					"b_isgivegroup": false,
+					"i_updatetime":  time.Now().Unix(),
 				},
 				"$unset": map[string]interface{}{
 					"s_groupid": "",
 				},
 			}, false, true)
 			//2、删除临时任务表中对应未完成数据
-			success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "s_complete": false})
+			success2 := util.Mgo.Del(sourcetaskinfo, map[string]interface{}{"s_groupid": groupId, "b_iscomplete": false})
 			if success1 && success2 {
 				//更新任务状态、完成时间
 				success = util.Mgo.UpdateById(util.TASKCOLLNAME, taskId, map[string]interface{}{
@@ -586,7 +585,7 @@ func DeleleDataTagInfo(sourceinfo string) {
 func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[string]util.Task) {
 	defer qu.Catch()
 	for groupTaskId, tInfo := range groupIdInfo {
-		groupid := tInfo.GroupId
+		groupId := tInfo.UserId
 		num := tInfo.GiveNum
 		sess := util.Mgo.GetMgoConn()
 		defer util.Mgo.DestoryMongoConn(sess)
@@ -594,7 +593,7 @@ func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[
 		wg := &sync.WaitGroup{}
 		lock := &sync.Mutex{}
 		query := map[string]interface{}{ //查找未分配对应stype的数据分发
-			"b_isgive": false,
+			"b_isgivegroup": false,
 		}
 		if stype == "notag" { //达标数据
 			query["b_istagging"] = false
@@ -623,18 +622,19 @@ func UpdateSourceinfo(sourceinfo, sourcetaskinfo, stype string, groupIdInfo map[
 				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
 				update = append(update, map[string]interface{}{
 					"$set": map[string]interface{}{
-						"s_groupid":    groupid,
-						"b_isgive":     true,
-						"i_updatetime": time.Now().Unix(),
+						"s_groupid":     groupId,
+						"b_isgivegroup": true,
+						"i_updatetime":  time.Now().Unix(),
 					},
 				})
 				save := map[string]interface{}{
 					"s_infoid":      id,
 					"s_infotitle":   title,
-					"s_groupid":     groupid,
+					"s_groupid":     groupId,
 					"i_createtime":  time.Now().Unix(),
-					"s_complete":    false,
+					"b_iscomplete":  false,
 					"s_grouptaskid": groupTaskId,
+					"b_isgiveuser":  false,
 				}
 				lock.Lock()
 				updateArr = append(updateArr, update)
@@ -944,9 +944,9 @@ func GetDataById(idsInfo map[string]map[string]interface{}, importType, s_source
 					baseInfoMap["v_taginfo"] = tagInfoMap
 				}
 				baseInfoMap["i_createtime"] = time.Now().Unix()
-				baseInfoMap["b_isgive"] = false   //是否分配
-				baseInfoMap["b_istag"] = false    //是否已标注
-				baseInfoMap["b_cleartag"] = false //是否清理标注信息
+				baseInfoMap["b_isgivegroup"] = false //是否分配
+				baseInfoMap["b_istag"] = false       //是否已标注
+				baseInfoMap["b_cleartag"] = false    //是否清理标注信息
 
 				if util.Mgo.SaveByOriID(s_sourceinfo, baseInfoMap) {
 					atomic.AddInt64(successNum, 1) //保存成功计数

+ 174 - 5
src/front/user.go

@@ -1,13 +1,16 @@
 package front
 
 import (
+	"encoding/json"
 	"github.com/dchest/captcha"
 	"github.com/gorilla/sessions"
 	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	mgo "mongodb"
 	qu "qfw/util"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 	. "util"
 )
@@ -323,18 +326,184 @@ func (f *Front) GroupList() {
 	}
 }
 
+//用户任务分发
+func (f *Front) UserTaskSave() {
+	defer qu.Catch()
+	user := f.GetSession("user").(map[string]interface{})
+	username := qu.ObjToString(user["s_name"])
+	var taskArr []map[string]interface{}
+	userTaskIdInfo := map[string]Task{}
+	success := false
+	msg := ""
+	groupTaskId := f.GetString("grouptaskid") //用户组任务id
+	groupTask, _ := Mgo.FindById(TASKCOLLNAME, groupTaskId, nil)
+	if len(*groupTask) == 0 {
+		qu.Debug("GroupTask Find Error:", groupTaskId)
+		msg = "用户组任务:" + groupTaskId + "查询失败"
+		f.ServeJson(map[string]interface{}{"success": false, "msg": msg})
+		return
+	}
+	sourcetaskinfo := qu.ObjToString((*groupTask)["s_sourcetaskinfo"])
+	userNums := f.GetString("usernums")
+	var userArr []map[string]interface{}
+	if err := json.Unmarshal([]byte(userNums), &userArr); err != nil {
+		qu.Debug("UserInfo Unmarshal Failed:", err)
+		msg = "用户信息解析失败"
+	} else {
+		for _, userInfo := range userArr {
+			userid := qu.ObjToString(userInfo["s_userid"])
+			name := qu.ObjToString(userInfo["s_name"])
+			login := qu.ObjToString(userInfo["s_login"])
+			givenum := qu.IntAll(userInfo["i_givenum"])
+			userTaskId := primitive.NewObjectID()
+			userTaskIdStr := mgo.BsonIdToSId(userTaskId)
+			ut := Task{
+				UserId:  userid,
+				GiveNum: givenum,
+			}
+			userTaskIdInfo[userTaskIdStr] = ut
+			userTask := map[string]interface{}{
+				"_id":              userTaskId,                                    //生成任务id
+				"s_projectid":      qu.ObjToString((*groupTask)["s_projectid"]),   //项目标识
+				"s_projectname":    qu.ObjToString((*groupTask)["s_projectname"]), //项目名称
+				"s_status":         "未开始",                                         //任务状态
+				"s_personid":       userid,                                        //任务负责人标识
+				"s_personname":     name,                                          //任务负责人
+				"s_login":          login,                                         //用户账号
+				"s_groupname":      qu.ObjToString((*groupTask)["s_groupname"]),   //用户组名称
+				"s_groupid":        qu.ObjToString((*groupTask)["s_groupid"]),     //用户组标识
+				"i_givenum":        givenum,                                       //分发数据量
+				"s_createname":     username,                                      //创建人
+				"i_createtime":     time.Now().Unix(),                             //创建时间
+				"s_progress":       "0%",                                          //完成进度
+				"s_sourceinfo":     qu.ObjToString((*groupTask)["s_sourceinfo"]),  //源数据表
+				"s_sourcetaskinfo": sourcetaskinfo,                                //任务日志表
+				"s_stype":          "user",                                        //任务类型
+				"s_parentid":       groupTaskId,                                   //父任务及用户组任务id
+			}
+			taskArr = append(taskArr, userTask)
+		}
+	}
+	if len(taskArr) > 0 {
+		success = Mgo.SaveBulk(TASKCOLLNAME, taskArr...) //用户分发任务
+		if success {
+			//用户分发任务后更新该用户组任务的状态和开始时间
+			Mgo.Update(TASKCOLLNAME,
+				map[string]interface{}{
+					"_id":      mgo.StringTOBsonId(groupTaskId),
+					"s_status": "未开始",
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"s_status":    "进行中",
+						"i_starttime": time.Now().Unix(),
+					}},
+				false, false)
+			msg = "任务分发成功"
+			UpdateSourceTaskInfo(sourcetaskinfo, groupTaskId, userTaskIdInfo) //用户组分发任务成功后,给数据源打上用户组标识,同时生成任务临时表
+		}
+	}
+}
+
 // UerTaskList 用户任务分发列表
 func (f *Front) UerTaskList() {
 	defer qu.Catch()
 	//groupId := f.GetString("s_groupid") //用户组id
-	taskId := f.GetString("id") //用户组任务id
+	groupTaskId := f.GetString("grouptaskid") //用户组任务id
 	if f.Method() == "POST" {
-
+		start, _ := f.GetInteger("start")
+		limit, _ := f.GetInteger("length")
+		draw, _ := f.GetInteger("draw")
+		status := f.GetString("status")
+		login := f.GetString("login")
+		//searchStr := f.GetString("search[value]")
+		//search := strings.TrimSpace(searchStr)
+		query := map[string]interface{}{
+			"s_parentid": groupTaskId,
+		}
+		if status != "-1" { //任务状态
+			query["s_status"] = status
+		}
+		if login != "-1" { //用户账号
+			query["s_login"] = login
+		}
+		list, _ := Mgo.Find(TASKCOLLNAME, query, nil, nil, false, start, limit)
+		count := Mgo.Count(TASKCOLLNAME, query)
+		f.ServeJson(map[string]interface{}{"draw": draw, "data": *list, "recordsFiltered": count, "recordsTotal": count})
 	} else {
-		//f.T["s_groupid"] = groupId
-		f.T["grouptaskid"] = taskId //用户组任务id
+		sourcetaskinfo := f.GetString("s_sourcetaskinfo")
 		//统计数据量
-
+		isGiveNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_isgiveuser": true})     //已分发量
+		isNotGiveNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_isgiveuser": false}) //待分发量
+		isTagNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_iscomplete": true})      //已标注数量
+		isNotTagNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId, "b_iscomplete": false})  //未标注数量
+		allNum := Mgo.Count(sourcetaskinfo, map[string]interface{}{"s_grouptaskid": groupTaskId})                              //数据总量
+		f.T["grouptaskid"] = groupTaskId
+		f.T["allNum"] = allNum
+		f.T["isGiveNum"] = isGiveNum
+		f.T["isNotGiveNum"] = isNotGiveNum
+		f.T["isTagNum"] = isTagNum
+		f.T["isNotTagNum"] = isNotTagNum
 		_ = f.Render("/user/task.html", &f.T)
 	}
 }
+
+// UpdateSourceTaskInfo 用户分发任务成功后更新临时任务表
+func UpdateSourceTaskInfo(sourcetaskinfo, groupTaskId string, userTaskIdInfo map[string]Task) {
+	defer qu.Catch()
+	for userTaskId, tInfo := range userTaskIdInfo {
+		userId := tInfo.UserId
+		num := tInfo.GiveNum
+		sess := Mgo.GetMgoConn()
+		defer Mgo.DestoryMongoConn(sess)
+		ch := make(chan bool, 5)
+		wg := &sync.WaitGroup{}
+		lock := &sync.Mutex{}
+		query := map[string]interface{}{ //查找用户组任务id关联的临时数据
+			"s_grouptaskid": groupTaskId,
+			"b_isgiveuser":  false,
+		}
+		updateArr := [][]map[string]interface{}{}
+		qu.Debug("Query:", query)
+		it := sess.DB(Mgo.DbName).C(sourcetaskinfo).Find(&query).Limit(int64(num)).Iter()
+		n := 0
+		for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+			ch <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-ch
+					wg.Done()
+				}()
+				update := []map[string]interface{}{}
+				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+				update = append(update, map[string]interface{}{
+					"$set": map[string]interface{}{
+						"s_usertaskid": userTaskId,
+						"s_userid":     userId,
+						"b_isgiveuser": true,
+						"i_updatetime": time.Now().Unix(),
+					},
+				})
+				lock.Lock()
+				updateArr = append(updateArr, update)
+				if len(updateArr) > 500 {
+					Mgo.UpdateBulk(sourcetaskinfo, updateArr...)
+					updateArr = [][]map[string]interface{}{}
+				}
+				lock.Unlock()
+			}(tmp)
+			if n%100 == 0 {
+				qu.Debug("current:", n)
+			}
+			tmp = map[string]interface{}{}
+		}
+		wg.Wait()
+		lock.Lock()
+		if len(updateArr) > 0 {
+			Mgo.UpdateBulk(sourcetaskinfo, updateArr...)
+			updateArr = [][]map[string]interface{}{}
+		}
+		lock.Unlock()
+	}
+}

+ 1 - 1
src/util/common.go

@@ -9,7 +9,7 @@ import (
 )
 
 type Task struct {
-	GroupId string
+	UserId  string
 	GiveNum int
 }