main.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "os"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. sysconfig map[string]interface{} //配置文件
  14. mgo, qy_mgo *MongodbSim //mongodb操作对象
  15. udpclient mu.UdpClient //udp对象
  16. udplock, dataLock sync.Mutex //udp锁
  17. coll_name, qy_coll_name string //表名
  18. isTest bool //是否测试
  19. Ext_Type, Ext_From map[string]interface{} //抽取来源,方式分
  20. buyer_score, s_winner_score map[string]interface{}
  21. budget_score, bidamount_score map[string]interface{}
  22. projectname_score, projectcode_score map[string]interface{}
  23. )
  24. //mgo-配置等
  25. func initMgo() {
  26. isTest = sysconfig["isTest"].(bool)
  27. mconf := sysconfig["mongodb"].(map[string]interface{})
  28. log.Println(mconf)
  29. mgo = &MongodbSim{
  30. MongodbAddr: mconf["addrName"].(string),
  31. DbName: mconf["dbName"].(string),
  32. Size: qu.IntAllDef(mconf["pool"], 10),
  33. }
  34. mgo.InitPool()
  35. qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{})
  36. qy_mgo = &MongodbSim{
  37. MongodbAddr: qy_mconf["qy_addrName"].(string),
  38. DbName: qy_mconf["qy_dbName"].(string),
  39. Size: qu.IntAllDef(qy_mconf["pool"], 10),
  40. UserName: qy_mconf["qy_username"].(string),
  41. Password: qy_mconf["qy_password"].(string),
  42. }
  43. qy_mgo.InitPool()
  44. coll_name = mconf["collName"].(string)
  45. qy_coll_name = qy_mconf["qy_collName"].(string)
  46. }
  47. //初始化打分
  48. func initScore() {
  49. Ext_Type = sysconfig["ext_type"].(map[string]interface{})
  50. Ext_From = sysconfig["ext_from"].(map[string]interface{})
  51. buyer_score = sysconfig["buyer_score"].(map[string]interface{})
  52. s_winner_score = sysconfig["s_winner_score"].(map[string]interface{})
  53. budget_score = sysconfig["budget_score"].(map[string]interface{})
  54. bidamount_score = sysconfig["bidamount_score"].(map[string]interface{})
  55. projectname_score = sysconfig["projectname_score"].(map[string]interface{})
  56. projectcode_score = sysconfig["projectcode_score"].(map[string]interface{})
  57. }
  58. //初始化
  59. func init() {
  60. qu.ReadConfig(&sysconfig) //加载配置文件
  61. initMgo()
  62. initScore()
  63. }
  64. func mainT() {
  65. updport := sysconfig["udpport"].(string)
  66. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  67. udpclient.Listen(processUdpMsg)
  68. log.Println("Udp服务监听", updport)
  69. time.Sleep(99999 * time.Hour)
  70. }
  71. //调试流程
  72. func main() {
  73. sid := "1f0000000000000000000000"
  74. eid := "9f0000000000000000000000"
  75. log.Println(sid, "---", eid)
  76. mapinfo := map[string]interface{}{}
  77. if sid == "" || eid == "" {
  78. log.Println("sid,eid参数不能为空")
  79. os.Exit(0)
  80. }
  81. mapinfo["gtid"] = sid
  82. mapinfo["lteid"] = eid
  83. startFieldScoreTask(mapinfo)
  84. }
  85. //打分流程-方法
  86. func startFieldScoreTask(mapInfo map[string]interface{}) {
  87. log.Println("开始字段规则链...评分流程")
  88. defer qu.Catch()
  89. q := map[string]interface{}{
  90. "_id": map[string]interface{}{
  91. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  92. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  93. },
  94. }
  95. log.Println("查询条件:", q)
  96. sess := mgo.GetMgoConn()
  97. defer mgo.DestoryMongoConn(sess)
  98. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  99. updateFieldScore, total := [][]map[string]interface{}{}, 0
  100. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  101. if total%1000 == 0 {
  102. log.Println("当前数量:", total)
  103. }
  104. //验证初始分字段
  105. source := *qu.ObjToMap(tmp["field_source"])
  106. f_s := dealWithFieldSourceScore(source)
  107. //更新集合
  108. update_dict := make(map[string]interface{}, 0)
  109. subtype := qu.ObjToString(tmp["subtype"])
  110. buyer_s := buyerFieldScore(tmp, f_s["buyer"])
  111. update_dict["buyer"] = buyer_s
  112. budget_s := budgetFieldScore(tmp, f_s["budget"])
  113. update_dict["budget"] = budget_s
  114. projectname_s := projectnameFieldScore(tmp, f_s["projectname"])
  115. update_dict["projectname"] = projectname_s
  116. projectcode_s := projectcodeFieldScore(tmp, f_s["projectcode"])
  117. update_dict["projectcode"] = projectcode_s
  118. if subtype == "中标" || subtype == "成交" || subtype == "合同" {
  119. s_winner_s := winnerFieldScore(tmp, f_s["s_winner"])
  120. update_dict["s_winner"] = s_winner_s
  121. bidamount_s := bidamountFieldScore(tmp, f_s["bidamount"])
  122. update_dict["bidamount"] = bidamount_s
  123. }
  124. updateFieldScore = append(updateFieldScore, []map[string]interface{}{
  125. map[string]interface{}{
  126. "_id": tmp["_id"],
  127. },
  128. map[string]interface{}{
  129. "$set": map[string]interface{}{
  130. "field_score": update_dict,
  131. },
  132. },
  133. })
  134. if len(updateFieldScore) >= 200 {
  135. mgo.UpSertBulk(coll_name, updateFieldScore...)
  136. updateFieldScore = [][]map[string]interface{}{}
  137. }
  138. tmp = make(map[string]interface{})
  139. }
  140. if len(updateFieldScore) > 0 {
  141. mgo.UpSertBulk(coll_name, updateFieldScore...)
  142. }
  143. log.Println("field score is over - 总计数量", total)
  144. }
  145. //udp监听
  146. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  147. switch act {
  148. case mu.OP_TYPE_DATA:
  149. var mapInfo map[string]interface{}
  150. err := json.Unmarshal(data, &mapInfo)
  151. if err != nil {
  152. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  153. } else if mapInfo != nil {
  154. sid, _ := mapInfo["gtid"].(string)
  155. eid, _ := mapInfo["lteid"].(string)
  156. if sid == "" || eid == "" {
  157. log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  158. } else {
  159. udpinfo, _ := mapInfo["key"].(string)
  160. go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  161. udplock.Lock()
  162. startFieldScoreTask(mapInfo)
  163. udplock.Unlock()
  164. }
  165. }
  166. case mu.OP_NOOP: //下个节点回应
  167. str := string(data)
  168. log.Println("节点回应:", str)
  169. }
  170. }