main.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package main
  2. import (
  3. "log"
  4. mu "mfw/util"
  5. "net"
  6. "os"
  7. "qfw/common/src/qfw/util"
  8. qu "qfw/util"
  9. "time"
  10. )
  11. var (
  12. sysconfig map[string]interface{} //配置文件
  13. mgo *MongodbSim //mongodb操作对象
  14. qy_mgo *MongodbSim
  15. udpclient mu.UdpClient //udp对象
  16. nextNode []map[string]interface{} //下节点数组
  17. siteMap map[string]map[string]interface{} //站点map
  18. coll_name,qy_coll_name string
  19. core_element,other_element []map[string]interface{} //要素
  20. deduct_element []string
  21. total_score,core_max,core_each,other_max,other_each ,deduct_each int
  22. specialaddr string
  23. )
  24. //站点配置
  25. func initSite() {
  26. mconf := sysconfig["mongodb"].(map[string]interface{})
  27. site := mconf["site"].(map[string]interface{})
  28. siteMap = make(map[string]map[string]interface{}, 0)
  29. start := int(time.Now().Unix())
  30. sess_site := mgo.GetMgoConn()
  31. defer mgo.DestoryMongoConn(sess_site)
  32. res_site := sess_site.DB(site["site_dbname"].(string)).C(site["site_coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  33. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  34. data_map := map[string]interface{}{
  35. "area": util.ObjToString(site_dict["area"]),
  36. "city": util.ObjToString(site_dict["city"]),
  37. }
  38. siteMap[util.ObjToString(site_dict["site"])] = data_map
  39. }
  40. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(siteMap))
  41. }
  42. //mgo-配置等
  43. func initMgo() {
  44. mconf := sysconfig["mongodb"].(map[string]interface{})
  45. log.Println(mconf)
  46. mgo = &MongodbSim{
  47. MongodbAddr: mconf["addrName"].(string),
  48. DbName: mconf["dbName"].(string),
  49. Size: qu.IntAllDef(mconf["pool"], 10),
  50. }
  51. mgo.InitPool()
  52. qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{})
  53. qy_mgo = &MongodbSim{
  54. MongodbAddr: qy_mconf["qy_addrName"].(string),
  55. DbName: qy_mconf["qy_dbName"].(string),
  56. Size: qu.IntAllDef(qy_mconf["pool"], 10),
  57. UserName: qy_mconf["qy_username"].(string),
  58. Password: qy_mconf["qy_password"].(string),
  59. }
  60. qy_mgo.InitPool()
  61. coll_name = mconf["collName"].(string)
  62. qy_coll_name = qy_mconf["qy_collName"].(string)
  63. core_element = qu.ObjArrToMapArr(sysconfig["core_element"].([]interface{}))
  64. other_element = qu.ObjArrToMapArr(sysconfig["other_element"].([]interface{}))
  65. deduct_element =qu.ObjArrToStringArr(sysconfig["deduct_element"].([]interface{}))
  66. score_standard := sysconfig["score_standard"].(map[string]interface{})
  67. total_score = qu.IntAll(score_standard["total_score"])
  68. core_max = qu.IntAll(score_standard["core_max"])
  69. core_each = qu.IntAll(score_standard["core_each"])
  70. other_max = qu.IntAll(score_standard["other_max"])
  71. other_each = qu.IntAll(score_standard["other_each"])
  72. deduct_each = qu.IntAll(score_standard["deduct_each"])
  73. specialaddr = sysconfig["specialaddr"].(string)
  74. }
  75. //初始化
  76. func init() {
  77. qu.ReadConfig(&sysconfig)//加载配置文件
  78. initMgo()
  79. initSite()//加载站点
  80. }
  81. //正式流程
  82. func mainT() {
  83. go checkMapJob()
  84. updport := sysconfig["udpport"].(string)
  85. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  86. udpclient.Listen(processUdpMsg)
  87. log.Println("Udp服务监听", updport)
  88. time.Sleep(99999 * time.Hour)
  89. }
  90. //快速测试使用
  91. func main() {
  92. sid := "1f0000000000000000000000"
  93. eid := "9f0000000000000000000000"
  94. log.Println(sid, "---", eid)
  95. mapinfo := map[string]interface{}{}
  96. if sid == "" || eid == "" {
  97. log.Println("sid,eid参数不能为空")
  98. os.Exit(0)
  99. }
  100. mapinfo["gtid"] = sid
  101. mapinfo["lteid"] = eid
  102. startScoreTask([]byte{}, mapinfo)
  103. time.Sleep(99999 * time.Hour)
  104. }
  105. //udp监听
  106. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  107. switch act {
  108. case mu.OP_TYPE_DATA: //上个节点的数据
  109. case mu.OP_NOOP: //下个节点回应
  110. }
  111. }
  112. //打分流程-方法
  113. func startScoreTask(data []byte, mapInfo map[string]interface{}) {
  114. log.Println("开始评分流程")
  115. defer qu.Catch()
  116. //区间id
  117. q := map[string]interface{}{
  118. "_id": map[string]interface{}{
  119. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  120. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  121. },
  122. }
  123. log.Println("查询条件:",q)
  124. sess := mgo.GetMgoConn()
  125. defer mgo.DestoryMongoConn(sess)
  126. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  127. updateExtract := [][]map[string]interface{}{}//更新需要
  128. index:=0
  129. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  130. element_score,core_score,other_score,element_reason:=dealWithElementRate(tmp)
  131. error_score,abnormal_score,error_reason,abnormal_reason:=dealWithErrorRate(tmp)
  132. if index%1000 == 0 {
  133. log.Println("当前数量:", index, tmp["_id"],"元素分:",element_score,"错误分:",error_score,"异常分:",abnormal_score)
  134. }
  135. updateExtract = append(updateExtract, []map[string]interface{}{
  136. map[string]interface{}{
  137. "_id": tmp["_id"],
  138. },
  139. map[string]interface{}{
  140. "$set": map[string]interface{}{
  141. "element_score": element_score,
  142. "core_score": core_score,
  143. "other_score": other_score,
  144. "error_score":error_score,
  145. "abnormal_score": abnormal_score,
  146. "quality_reason":map[string]interface{}{
  147. "element_reason":element_reason,
  148. "error_reason":error_reason,
  149. "abnormal_reason":abnormal_reason,
  150. },
  151. },
  152. },
  153. })
  154. if len(updateExtract) >= 200 {
  155. mgo.UpSertBulk(coll_name, updateExtract...)
  156. updateExtract = [][]map[string]interface{}{}
  157. }
  158. tmp = make(map[string]interface{})
  159. }
  160. if len(updateExtract) >0 {
  161. mgo.UpSertBulk(coll_name, updateExtract...)
  162. }
  163. log.Println("task quality over - 总计数量",index)
  164. }