main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "qfw/util"
  12. "qfw/util/mongodb"
  13. "regexp"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. Sysconfig map[string]interface{} //配置文件
  19. mconf map[string]interface{} //mongodb配置信息
  20. mgo *mongodb.MongodbSim //mongodb操作对象
  21. extract string
  22. bidding string
  23. udpclient mu.UdpClient //udp对象
  24. nextNode []map[string]interface{} //下节点数组
  25. dupdays = 3 //初始化判重范围
  26. DM *datamap //判重数据
  27. FilterRegexp = regexp.MustCompile("^_$")
  28. lastid = ""
  29. )
  30. func init() {
  31. //。。。
  32. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  33. flag.Parse()
  34. util.ReadConfig(&Sysconfig)
  35. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  36. mconf = Sysconfig["mongodb"].(map[string]interface{})
  37. mgo = &mongodb.MongodbSim{
  38. MongodbAddr: mconf["addr"].(string),
  39. DbName: mconf["db"].(string),
  40. Size: util.IntAllDef(mconf["pool"], 10),
  41. }
  42. extract = mconf["extract"].(string)
  43. //bidding = mconf["bidding"].(string)
  44. mgo.InitPool()
  45. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  46. //加载数据
  47. DM = NewDatamap(dupdays, lastid)
  48. sw := util.ObjToString(Sysconfig["specialwords"])
  49. if sw != "" {
  50. FilterRegexp = regexp.MustCompile(sw)
  51. }
  52. }
  53. func main() {
  54. go checkMapJob()
  55. updport := Sysconfig["udpport"].(string)
  56. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  57. udpclient.Listen(processUdpMsg)
  58. log.Println("Udp服务监听", updport)
  59. time.Sleep(99999 * time.Hour)
  60. }
  61. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  62. switch act {
  63. case mu.OP_TYPE_DATA: //上个节点的数据
  64. //从表中开始处理
  65. var mapInfo map[string]interface{}
  66. err := json.Unmarshal(data, &mapInfo)
  67. log.Println("err:", err, "mapInfo:", mapInfo)
  68. if err != nil {
  69. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  70. } else if mapInfo != nil {
  71. go task(data, mapInfo)
  72. key, _ := mapInfo["key"].(string)
  73. if key == "" {
  74. key = "udpok"
  75. }
  76. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  77. }
  78. case mu.OP_NOOP: //下个节点回应
  79. ok := string(data)
  80. if ok != "" {
  81. log.Println("ok:", ok)
  82. udptaskmap.Delete(ok)
  83. }
  84. }
  85. }
  86. //开始判重程序
  87. func task(data []byte, mapInfo map[string]interface{}) {
  88. defer util.Catch()
  89. //区间id
  90. sess := mgo.GetMgoConn()
  91. defer mgo.DestoryMongoConn(sess)
  92. q := map[string]interface{}{
  93. "_id": map[string]interface{}{
  94. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  95. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  96. },
  97. }
  98. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  99. updateExtract := [][]map[string]interface{}{}
  100. pool := make(chan bool, 16)
  101. wg := &sync.WaitGroup{}
  102. mapLock := &sync.Mutex{}
  103. n, repeateN := 0, 0
  104. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  105. if util.ObjToString(tmp["subtype"]) == "变更" {
  106. //go IS.Add("update")
  107. continue
  108. }
  109. if n%10000 == 0 {
  110. log.Println("current:", n, tmp["_id"])
  111. }
  112. pool <- true
  113. wg.Add(1)
  114. go func(tmp map[string]interface{}) {
  115. defer func() {
  116. <-pool
  117. wg.Done()
  118. }()
  119. info := NewInfo(tmp)
  120. b, id := DM.check(info)
  121. if b { //有重复,生成更新语句,更新抽取和更新招标
  122. //IS.Add("repeat")
  123. repeateN++
  124. mapLock.Lock()
  125. updateExtract = append(updateExtract, []map[string]interface{}{
  126. map[string]interface{}{
  127. "_id": tmp["_id"],
  128. },
  129. map[string]interface{}{
  130. "$set": map[string]interface{}{
  131. "repeat": 1,
  132. "repeatid": id,
  133. },
  134. },
  135. })
  136. // updateBidding = append(updateBidding, []map[string]interface{}{
  137. // map[string]interface{}{
  138. // "_id": tmp["_id"],
  139. // },
  140. // map[string]interface{}{
  141. // "$set": map[string]interface{}{
  142. // "extracttype": -1,
  143. // "repeatid": id,
  144. // },
  145. // },
  146. // })
  147. if len(updateExtract) > 500 {
  148. mgo.UpdateBulk(extract, updateExtract...)
  149. //mgo.UpdateBulk(bidding, updateBidding...)
  150. //updateExtract, updateBidding = [][]map[string]interface{}{}, [][]map[string]interface{}{}
  151. updateExtract = [][]map[string]interface{}{}
  152. }
  153. mapLock.Unlock()
  154. } else {
  155. //IS.Add("new")
  156. }
  157. }(tmp)
  158. tmp = make(map[string]interface{})
  159. }
  160. wg.Wait()
  161. if len(updateExtract) > 0 {
  162. mgo.UpdateBulk(extract, updateExtract...)
  163. //mgo.UpdateBulk(bidding, updateBidding...)
  164. }
  165. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  166. //任务完成,开始发送广播通知下面节点
  167. if n > repeateN && mapInfo["stop"] == nil {
  168. for _, to := range nextNode {
  169. sid, _ := mapInfo["gtid"].(string)
  170. eid, _ := mapInfo["lteid"].(string)
  171. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  172. by, _ := json.Marshal(map[string]interface{}{
  173. "gtid": sid,
  174. "lteid": eid,
  175. "stype": util.ObjToString(to["stype"]),
  176. "key": key,
  177. })
  178. addr := &net.UDPAddr{
  179. IP: net.ParseIP(to["addr"].(string)),
  180. Port: util.IntAll(to["port"]),
  181. }
  182. node := &udpNode{by, addr, time.Now().Unix(), 0}
  183. udptaskmap.Store(key, node)
  184. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  185. }
  186. }
  187. }