123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- package main
- import (
- "fmt"
- "github.com/cron"
- "mongodb"
- "os"
- "qfw/util"
- "strings"
- "time"
- )
- var (
- Mgo, MgoH, MgoB *mongodb.MongodbSim
- Sysconfig, bidddingConf, biddingHConf map[string]interface{}
- noFields string
- )
- func Init() {
- util.ReadConfig(&Sysconfig)
- s := Sysconfig
- fmt.Println(s)
- bidddingConf = Sysconfig["bidding"].(map[string]interface{})
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: bidddingConf["addr"].(string),
- Size: util.IntAllDef(bidddingConf["size"], 5),
- DbName: bidddingConf["db"].(string),
- UserName: bidddingConf["username"].(string),
- Password: bidddingConf["password"].(string),
- //Direct: true,
- }
- Mgo.InitPool()
- biddingHConf = Sysconfig["bidding_high"].(map[string]interface{})
- //高质量库
- MgoH = &mongodb.MongodbSim{
- MongodbAddr: biddingHConf["addr"].(string),
- Size: util.IntAllDef(biddingHConf["size"], 5),
- DbName: biddingHConf["db"].(string),
- UserName: biddingHConf["username"].(string),
- Password: biddingHConf["password"].(string),
- //Direct: true,
- }
- MgoH.InitPool()
- //bidding
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: biddingHConf["addr"].(string),
- Size: util.IntAllDef(biddingHConf["size"], 5),
- DbName: "qfw",
- UserName: biddingHConf["username"].(string),
- Password: biddingHConf["password"].(string),
- //Direct: true,
- }
- MgoB.InitPool()
- noFields = util.ObjToString(Sysconfig["no_fields"])
- }
- func main() {
- Init()
- c := cron.New()
- err := c.AddFunc(Sysconfig["spec"].(string), Mark)
- if err != nil {
- util.Debug("err", err)
- }
- c.Start()
- defer c.Stop()
- select {}
- }
- func Mark() {
- go highMark()
- }
- func highMark() {
- defer util.Catch()
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- taskQuery := map[string]interface{}{
- "s_stype": "group",
- "s_status": "已完成",
- "is_return_highdata": map[string]interface{}{
- "$exists": 0,
- },
- }
- fields, _ := Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1)
- if len(*fields) == 0 {
- util.Debug("字段顺序配置为空")
- os.Exit(1)
- }
- tasks, _ := Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1)
- util.Debug("本次处理任务总数:", len(*tasks))
- for _, task := range *tasks {
- util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"], task["s_sourceinfo"])
- taskID := mongodb.BsonIdToSId(task["_id"])
- //任务对应的数据表
- s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
- q := map[string]interface{}{
- "s_grouptaskid": map[string]interface{}{
- "$exists": 1,
- },
- "is_return_highdata": map[string]interface{}{
- "$exists": 0,
- },
- }
- query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter()
- count := 0
- taskFinish := false
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- infoID := mongodb.BsonIdToSId(tmp["_id"])
- if count%1000 == 0 {
- util.Debug(fmt.Sprintf(" %v deal current --- %d", task["s_entname"], count))
- }
- //找到标注数据结果
- marked, _ := Mgo.FindById("marked", infoID, nil)
- markedData := *marked
- //计算标注 结果
- //标注结果,十进制数字
- if markedData["v_taginfo"] == nil {
- continue
- }
- taginfo := markedData["v_taginfo"].(map[string]interface{})
- res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
- if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
- delete(data, "_id")
- where := make(map[string]interface{})
- if _, ok := data["id"]; ok {
- bidd, _ := MgoB.FindById("bidding", util.ObjToString(data["id"]), nil)
- if len(*bidd) > 0 {
- where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
- }
- } else {
- bidd, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(markedData["_id"]), nil)
- if len(*bidd) > 0 {
- where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
- } else {
- continue
- }
- }
- data["field_bitvalue"] = res
- data["i_comeintime"] = time.Now().Unix()
- data["i_updatetime"] = time.Now().Unix()
- //删除多余无用字段
- noField := strings.Split(noFields, ",")
- if len(noField) > 0 {
- for _, field := range noField {
- delete(data, field)
- }
- }
- update := make(map[string]interface{})
- update["$set"] = data
- if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
- taskFinish = false
- util.Debug("任务 ", task["s_groupname"], task["s_sourceinfo"], infoID, "入库错误,请检查")
- } else {
- taskFinish = true
- //1、更新数据源信息
- setResult := map[string]interface{}{ //更新字段集
- "is_return_highdata": 1,
- "return_highdatetime": time.Now().Unix(),
- }
- set := map[string]interface{}{
- "$set": setResult,
- }
- Mgo.UpdateById(s_sourceinfo, infoID, set)
- }
- }
- }
- util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
- if count > 0 && taskFinish {
- //当前任务结束
- //3.更新任务表,
- taskSetResult := map[string]interface{}{ //更新字段集
- "is_return_highdata": 1,
- }
- taskSet := map[string]interface{}{
- "$set": taskSetResult,
- }
- Mgo.UpdateById("f_task", taskID, taskSet)
- //4. 记录任务中入高质量库数据
- taskInsert := map[string]interface{}{
- "task_id": taskID, //任务ID
- "high_mark_count": count, // 标注入高质量数据
- "given_count": task["i_givenum"], //任务分配数量
- "createtime": time.Now().Unix(),
- "updatetime": time.Now().Unix(),
- }
- Mgo.Save("high_result", taskInsert)
- } else {
- util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"])
- }
- util.Debug(task["s_groupname"], "数据处理完毕")
- }
- util.Debug("所有任务处理完毕")
- }
- // calculateFlag 根据数据,返回被标注的字段数字
- func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
- var result uint64
- for _, item := range data {
- name, ok := item["name"].(string)
- if !ok {
- continue
- }
- sort, ok := item["sort"].(int32)
- if !ok {
- continue
- }
- // 根据字段名称查找对应的标记值
- _, ok = marked[name]
- if !ok {
- continue
- }
- // 通过位运算将标记值放置到正确的位置
- result |= 1 << (sort - 1)
- }
- return result
- }
|