package main import ( "fmt" "github.com/cron" "mongodb" "os" "qfw/util" "strings" "time" ) var ( Mgo, MgoH, MgoB *mongodb.MongodbSim Sysconfig, bidddingConf, biddingHConf map[string]interface{} noFields string ) func Init() { util.ReadConfig(&Sysconfig) s := Sysconfig fmt.Println(s) bidddingConf = Sysconfig["bidding"].(map[string]interface{}) Mgo = &mongodb.MongodbSim{ MongodbAddr: bidddingConf["addr"].(string), Size: util.IntAllDef(bidddingConf["size"], 5), DbName: bidddingConf["db"].(string), UserName: bidddingConf["username"].(string), Password: bidddingConf["password"].(string), //Direct: true, } Mgo.InitPool() biddingHConf = Sysconfig["bidding_high"].(map[string]interface{}) //高质量库 MgoH = &mongodb.MongodbSim{ MongodbAddr: biddingHConf["addr"].(string), Size: util.IntAllDef(biddingHConf["size"], 5), DbName: biddingHConf["db"].(string), UserName: biddingHConf["username"].(string), Password: biddingHConf["password"].(string), //Direct: true, } MgoH.InitPool() //bidding MgoB = &mongodb.MongodbSim{ MongodbAddr: biddingHConf["addr"].(string), Size: util.IntAllDef(biddingHConf["size"], 5), DbName: "qfw", UserName: biddingHConf["username"].(string), Password: biddingHConf["password"].(string), //Direct: true, } MgoB.InitPool() noFields = util.ObjToString(Sysconfig["no_fields"]) } func main() { Init() c := cron.New() err := c.AddFunc(Sysconfig["spec"].(string), Mark) if err != nil { util.Debug("err", err) } c.Start() defer c.Stop() select {} } func Mark() { go highMark() } func highMark() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) taskQuery := map[string]interface{}{ "s_stype": "group", "s_status": "已完成", "is_return_highdata": map[string]interface{}{ "$exists": 0, }, } fields, _ := Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1) if len(*fields) == 0 { util.Debug("字段顺序配置为空") os.Exit(1) } tasks, _ := Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1) util.Debug("本次处理任务总数:", len(*tasks)) for _, task := range *tasks { util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"]) taskID := mongodb.BsonIdToSId(task["_id"]) //任务对应的数据表 s_sourceinfo := util.ObjToString(task["s_sourceinfo"]) q := map[string]interface{}{ "s_grouptaskid": map[string]interface{}{ "$exists": 1, }, } query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { infoID := mongodb.BsonIdToSId(tmp["_id"]) if count%1000 == 0 { util.Debug(fmt.Sprintf(" %v deal current --- %d", task["s_entname"], count)) } //找到标注数据结果 marked, _ := Mgo.FindById("marked", infoID, nil) markedData := *marked //计算标注 结果 //标注结果,十进制数字 if markedData["v_taginfo"] == nil { continue } taginfo := markedData["v_taginfo"].(map[string]interface{}) res := calculateFlag(taginfo, *fields) //返回标注的十进制数字 if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok { where := make(map[string]interface{}) if _, ok := data["id"]; ok { bidd, _ := MgoB.FindById("bidding", util.ObjToString(data["id"]), nil) if len(*bidd) > 0 { where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"])) } } else { bidd, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(markedData["_id"]), nil) if len(*bidd) > 0 { where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"])) } else { continue } } data["field_bitvalue"] = res data["i_comeintime"] = time.Now().Unix() data["i_updatetime"] = time.Now().Unix() //删除多余无用字段 noField := strings.Split(noFields, ",") if len(noField) > 0 { for _, field := range noField { delete(data, field) } } update := make(map[string]interface{}) update["$set"] = data if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) { util.Debug("任务 ", task["s_groupname"], infoID, "入库错误,请检查") } else { //1、更新数据源信息 setResult := map[string]interface{}{ //更新字段集 "is_return_highdata": 1, "return_highdatetime": time.Now().Unix(), } set := map[string]interface{}{ "$set": setResult, } Mgo.UpdateById(s_sourceinfo, infoID, set) } } } util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"]) if count > 0 { //当前任务结束 //3.更新任务表, taskSetResult := map[string]interface{}{ //更新字段集 "is_return_highdata": 1, } taskSet := map[string]interface{}{ "$set": taskSetResult, } Mgo.UpdateById("f_task", taskID, taskSet) //4. 记录任务中入高质量库数据 taskInsert := map[string]interface{}{ "task_id": taskID, //任务ID "high_mark_count": count, // 标注入高质量数据 "given_count": task["i_givenum"], //任务分配数量 "createtime": time.Now().Unix(), "updatetime": time.Now().Unix(), } Mgo.Save("high_result", taskInsert) } else { util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"]) } util.Debug(task["s_groupname"], "数据处理完毕") } util.Debug("所有任务处理完毕") } // calculateFlag 根据数据,返回被标注的字段数字 func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 { var result uint64 for _, item := range data { name, ok := item["name"].(string) if !ok { continue } sort, ok := item["sort"].(int32) if !ok { continue } // 根据字段名称查找对应的标记值 _, ok = marked[name] if !ok { continue } // 通过位运算将标记值放置到正确的位置 result |= 1 << (sort - 1) } return result }