main.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "mongodb"
  6. "os"
  7. "qfw/util"
  8. "strings"
  9. "time"
  10. )
  11. var (
  12. Mgo, MgoH, MgoB *mongodb.MongodbSim
  13. Sysconfig, bidddingConf, biddingHConf map[string]interface{}
  14. noFields string
  15. )
  16. func Init() {
  17. util.ReadConfig(&Sysconfig)
  18. s := Sysconfig
  19. fmt.Println(s)
  20. bidddingConf = Sysconfig["bidding"].(map[string]interface{})
  21. Mgo = &mongodb.MongodbSim{
  22. MongodbAddr: bidddingConf["addr"].(string),
  23. Size: util.IntAllDef(bidddingConf["size"], 5),
  24. DbName: bidddingConf["db"].(string),
  25. UserName: bidddingConf["username"].(string),
  26. Password: bidddingConf["password"].(string),
  27. //Direct: true,
  28. }
  29. Mgo.InitPool()
  30. biddingHConf = Sysconfig["bidding_high"].(map[string]interface{})
  31. //高质量库
  32. MgoH = &mongodb.MongodbSim{
  33. MongodbAddr: biddingHConf["addr"].(string),
  34. Size: util.IntAllDef(biddingHConf["size"], 5),
  35. DbName: biddingHConf["db"].(string),
  36. UserName: biddingHConf["username"].(string),
  37. Password: biddingHConf["password"].(string),
  38. //Direct: true,
  39. }
  40. MgoH.InitPool()
  41. //bidding
  42. MgoB = &mongodb.MongodbSim{
  43. MongodbAddr: biddingHConf["addr"].(string),
  44. Size: util.IntAllDef(biddingHConf["size"], 5),
  45. DbName: "qfw",
  46. UserName: biddingHConf["username"].(string),
  47. Password: biddingHConf["password"].(string),
  48. //Direct: true,
  49. }
  50. MgoB.InitPool()
  51. noFields = util.ObjToString(Sysconfig["no_fields"])
  52. }
  53. func main() {
  54. Init()
  55. c := cron.New()
  56. err := c.AddFunc(Sysconfig["spec"].(string), Mark)
  57. if err != nil {
  58. util.Debug("err", err)
  59. }
  60. c.Start()
  61. defer c.Stop()
  62. select {}
  63. }
  64. func Mark() {
  65. go highMark()
  66. }
  67. func highMark() {
  68. defer util.Catch()
  69. sess := Mgo.GetMgoConn()
  70. defer Mgo.DestoryMongoConn(sess)
  71. taskQuery := map[string]interface{}{
  72. "s_stype": "group",
  73. "s_status": "已完成",
  74. "is_return_highdata": map[string]interface{}{
  75. "$exists": 0,
  76. },
  77. }
  78. fields, _ := Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1)
  79. if len(*fields) == 0 {
  80. util.Debug("字段顺序配置为空")
  81. os.Exit(1)
  82. }
  83. tasks, _ := Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1)
  84. util.Debug("本次处理任务总数:", len(*tasks))
  85. for _, task := range *tasks {
  86. util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"], task["s_sourceinfo"])
  87. taskID := mongodb.BsonIdToSId(task["_id"])
  88. //任务对应的数据表
  89. s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
  90. q := map[string]interface{}{
  91. "s_grouptaskid": map[string]interface{}{
  92. "$exists": 1,
  93. },
  94. "is_return_highdata": map[string]interface{}{
  95. "$exists": 0,
  96. },
  97. }
  98. query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter()
  99. count := 0
  100. taskFinish := false
  101. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  102. infoID := mongodb.BsonIdToSId(tmp["_id"])
  103. if count%1000 == 0 {
  104. util.Debug(fmt.Sprintf(" %v deal current --- %d", task["s_entname"], count))
  105. }
  106. //找到标注数据结果
  107. marked, _ := Mgo.FindById("marked", infoID, nil)
  108. markedData := *marked
  109. //计算标注 结果
  110. //标注结果,十进制数字
  111. if markedData["v_taginfo"] == nil {
  112. continue
  113. }
  114. taginfo := markedData["v_taginfo"].(map[string]interface{})
  115. res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
  116. if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
  117. delete(data, "_id")
  118. where := make(map[string]interface{})
  119. if _, ok := data["id"]; ok {
  120. bidd, _ := MgoB.FindById("bidding", util.ObjToString(data["id"]), nil)
  121. if len(*bidd) > 0 {
  122. where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
  123. }
  124. } else {
  125. bidd, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(markedData["_id"]), nil)
  126. if len(*bidd) > 0 {
  127. where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
  128. } else {
  129. continue
  130. }
  131. }
  132. data["field_bitvalue"] = res
  133. data["i_comeintime"] = time.Now().Unix()
  134. data["i_updatetime"] = time.Now().Unix()
  135. //删除多余无用字段
  136. noField := strings.Split(noFields, ",")
  137. if len(noField) > 0 {
  138. for _, field := range noField {
  139. delete(data, field)
  140. }
  141. }
  142. update := make(map[string]interface{})
  143. update["$set"] = data
  144. if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
  145. taskFinish = false
  146. util.Debug("任务 ", task["s_groupname"], task["s_sourceinfo"], infoID, "入库错误,请检查")
  147. } else {
  148. taskFinish = true
  149. //1、更新数据源信息
  150. setResult := map[string]interface{}{ //更新字段集
  151. "is_return_highdata": 1,
  152. "return_highdatetime": time.Now().Unix(),
  153. }
  154. set := map[string]interface{}{
  155. "$set": setResult,
  156. }
  157. Mgo.UpdateById(s_sourceinfo, infoID, set)
  158. }
  159. }
  160. }
  161. util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
  162. if count > 0 && taskFinish {
  163. //当前任务结束
  164. //3.更新任务表,
  165. taskSetResult := map[string]interface{}{ //更新字段集
  166. "is_return_highdata": 1,
  167. }
  168. taskSet := map[string]interface{}{
  169. "$set": taskSetResult,
  170. }
  171. Mgo.UpdateById("f_task", taskID, taskSet)
  172. //4. 记录任务中入高质量库数据
  173. taskInsert := map[string]interface{}{
  174. "task_id": taskID, //任务ID
  175. "high_mark_count": count, // 标注入高质量数据
  176. "given_count": task["i_givenum"], //任务分配数量
  177. "createtime": time.Now().Unix(),
  178. "updatetime": time.Now().Unix(),
  179. }
  180. Mgo.Save("high_result", taskInsert)
  181. } else {
  182. util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"])
  183. }
  184. util.Debug(task["s_groupname"], "数据处理完毕")
  185. }
  186. util.Debug("所有任务处理完毕")
  187. }
  188. // calculateFlag 根据数据,返回被标注的字段数字
  189. func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
  190. var result uint64
  191. for _, item := range data {
  192. name, ok := item["name"].(string)
  193. if !ok {
  194. continue
  195. }
  196. sort, ok := item["sort"].(int32)
  197. if !ok {
  198. continue
  199. }
  200. // 根据字段名称查找对应的标记值
  201. _, ok = marked[name]
  202. if !ok {
  203. continue
  204. }
  205. // 通过位运算将标记值放置到正确的位置
  206. result |= 1 << (sort - 1)
  207. }
  208. return result
  209. }