main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package highMark
  2. import (
  3. "fmt"
  4. "mongodb"
  5. "os"
  6. "qfw/util"
  7. "time"
  8. )
  9. var (
  10. Mgo, MgoH *mongodb.MongodbSim
  11. Sysconfig, bidddingConf, biddingHConf map[string]interface{}
  12. )
  13. func Init() {
  14. util.ReadConfig(&Sysconfig)
  15. s := Sysconfig
  16. fmt.Println(s)
  17. bidddingConf = Sysconfig["bidding"].(map[string]interface{})
  18. Mgo = &mongodb.MongodbSim{
  19. MongodbAddr: bidddingConf["addr"].(string),
  20. Size: util.IntAllDef(bidddingConf["size"], 5),
  21. DbName: bidddingConf["db"].(string),
  22. UserName: bidddingConf["username"].(string),
  23. Password: bidddingConf["password"].(string),
  24. //Direct: true,
  25. }
  26. Mgo.InitPool()
  27. biddingHConf = Sysconfig["bidding_high"].(map[string]interface{})
  28. //高质量库
  29. MgoH = &mongodb.MongodbSim{
  30. MongodbAddr: biddingHConf["addr"].(string),
  31. Size: util.IntAllDef(biddingHConf["size"], 5),
  32. DbName: biddingHConf["db"].(string),
  33. UserName: biddingHConf["username"].(string),
  34. Password: biddingHConf["password"].(string),
  35. //Direct: true,
  36. }
  37. MgoH.InitPool()
  38. }
  39. //func main() {
  40. // Init()
  41. // c := cron.New()
  42. // err := c.AddFunc(Sysconfig["spec"].(string), Mark)
  43. // if err != nil {
  44. // util.Debug("err", err)
  45. // }
  46. //
  47. // c.Start()
  48. // defer c.Stop()
  49. //
  50. // select {}
  51. //
  52. // //highMark()
  53. //}
  54. func Mark() {
  55. go highMark()
  56. }
  57. func highMark() {
  58. defer util.Catch()
  59. sess := Mgo.GetMgoConn()
  60. defer Mgo.DestoryMongoConn(sess)
  61. taskQuery := map[string]interface{}{
  62. "s_stype": "group",
  63. "s_status": "已完成",
  64. "is_return_highdata": map[string]interface{}{
  65. "$exists": 0,
  66. },
  67. }
  68. fields, _ := Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1)
  69. if len(*fields) == 0 {
  70. util.Debug("字段顺序配置为空")
  71. os.Exit(1)
  72. }
  73. tasks, _ := Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1)
  74. util.Debug("本次处理任务总数:", len(*tasks))
  75. for _, task := range *tasks {
  76. util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"])
  77. taskID := mongodb.BsonIdToSId(task["_id"])
  78. //任务对应的数据表
  79. s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
  80. q := map[string]interface{}{
  81. "s_grouptaskid": map[string]interface{}{
  82. "$exists": 1,
  83. },
  84. }
  85. query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter()
  86. count := 0
  87. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  88. infoID := mongodb.BsonIdToSId(tmp["_id"])
  89. if count%1000 == 0 {
  90. util.Debug(fmt.Sprintf(" %v deal current --- %d", task["s_entname"], count))
  91. }
  92. //找到标注数据结果
  93. marked, _ := Mgo.FindById("marked", infoID, nil)
  94. markedData := *marked
  95. //计算标注 结果
  96. //标注结果,十进制数字
  97. if markedData["v_taginfo"] == nil {
  98. continue
  99. }
  100. taginfo := markedData["v_taginfo"].(map[string]interface{})
  101. res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
  102. if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
  103. data["_id"] = tmp["_id"]
  104. data["field_bitvalue"] = res
  105. data["i_comeintime"] = time.Now().Unix()
  106. data["i_updatetime"] = time.Now().Unix()
  107. update := make(map[string]interface{})
  108. update["$set"] = data
  109. where := map[string]interface{}{
  110. "_id": tmp["_id"],
  111. }
  112. if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
  113. util.Debug("任务 ", task["s_groupname"], infoID, "入库错误,请检查")
  114. } else {
  115. //1、更新数据源信息
  116. setResult := map[string]interface{}{ //更新字段集
  117. "is_return_highdata": 1,
  118. "return_highdatetime": time.Now().Unix(),
  119. }
  120. set := map[string]interface{}{
  121. "$set": setResult,
  122. }
  123. Mgo.UpdateById(s_sourceinfo, infoID, set)
  124. }
  125. }
  126. }
  127. util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
  128. if count > 0 {
  129. //当前任务结束
  130. //3.更新任务表,
  131. taskSetResult := map[string]interface{}{ //更新字段集
  132. "is_return_highdata": 1,
  133. }
  134. taskSet := map[string]interface{}{
  135. "$set": taskSetResult,
  136. }
  137. Mgo.UpdateById("f_task", taskID, taskSet)
  138. //4. 记录任务中入高质量库数据
  139. taskInsert := map[string]interface{}{
  140. "task_id": taskID, //任务ID
  141. "high_mark_count": count, // 标注入高质量数据
  142. "given_count": task["i_givenum"], //任务分配数量
  143. "createtime": time.Now().Unix(),
  144. "updatetime": time.Now().Unix(),
  145. }
  146. Mgo.Save("high_result", taskInsert)
  147. } else {
  148. util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"])
  149. }
  150. util.Debug(task["s_groupname"], "数据处理完毕")
  151. }
  152. util.Debug("所有任务处理完毕")
  153. }
  154. //calculateFlag 根据数据,返回被标注的字段数字
  155. func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
  156. var result uint64
  157. for _, item := range data {
  158. name, ok := item["name"].(string)
  159. if !ok {
  160. continue
  161. }
  162. sort, ok := item["sort"].(int32)
  163. if !ok {
  164. continue
  165. }
  166. // 根据字段名称查找对应的标记值
  167. _, ok = marked[name]
  168. if !ok {
  169. continue
  170. }
  171. // 通过位运算将标记值放置到正确的位置
  172. result |= 1 << (sort - 1)
  173. }
  174. return result
  175. }