main.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "qfw/util"
  12. "qfw/util/mongodb"
  13. "regexp"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. Sysconfig map[string]interface{} //配置文件
  19. mconf map[string]interface{} //mongodb配置信息
  20. mgo *mongodb.MongodbSim //mongodb操作对象
  21. extract string
  22. bidding string
  23. udpclient mu.UdpClient //udp对象
  24. nextNode []map[string]interface{} //下节点数组
  25. dupdays = 3 //初始化判重范围
  26. DM *datamap //判重数据
  27. FilterRegexp = regexp.MustCompile("^_$")
  28. lastid = ""
  29. )
  30. func init() {
  31. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  32. flag.Parse()
  33. util.ReadConfig(&Sysconfig)
  34. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  35. mconf = Sysconfig["mongodb"].(map[string]interface{})
  36. mgo = &mongodb.MongodbSim{
  37. MongodbAddr: mconf["addr"].(string),
  38. DbName: mconf["db"].(string),
  39. Size: util.IntAllDef(mconf["pool"], 10),
  40. }
  41. extract = mconf["extract"].(string)
  42. //bidding = mconf["bidding"].(string)
  43. mgo.InitPool()
  44. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  45. //加载数据
  46. DM = NewDatamap(dupdays, lastid)
  47. sw := util.ObjToString(Sysconfig["specialwords"])
  48. if sw != "" {
  49. FilterRegexp = regexp.MustCompile(sw)
  50. }
  51. }
  52. func main() {
  53. go checkMapJob()
  54. updport := Sysconfig["udpport"].(string)
  55. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  56. udpclient.Listen(processUdpMsg)
  57. log.Println("Udp服务监听", updport)
  58. time.Sleep(99999 * time.Hour)
  59. }
  60. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  61. switch act {
  62. case mu.OP_TYPE_DATA: //上个节点的数据
  63. //从表中开始处理
  64. var mapInfo map[string]interface{}
  65. err := json.Unmarshal(data, &mapInfo)
  66. log.Println("err:", err, "mapInfo:", mapInfo)
  67. if err != nil {
  68. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  69. } else if mapInfo != nil {
  70. go task(data, mapInfo)
  71. key, _ := mapInfo["key"].(string)
  72. if key == "" {
  73. key = "udpok"
  74. }
  75. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  76. }
  77. case mu.OP_NOOP: //下个节点回应
  78. ok := string(data)
  79. if ok != "" {
  80. log.Println("ok:", ok)
  81. udptaskmap.Delete(ok)
  82. }
  83. }
  84. }
  85. //开始判重程序
  86. func task(data []byte, mapInfo map[string]interface{}) {
  87. defer util.Catch()
  88. //区间id
  89. sess := mgo.GetMgoConn()
  90. defer mgo.DestoryMongoConn(sess)
  91. q := map[string]interface{}{
  92. "_id": map[string]interface{}{
  93. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  94. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  95. },
  96. }
  97. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  98. updateExtract := [][]map[string]interface{}{}
  99. pool := make(chan bool, 16)
  100. wg := &sync.WaitGroup{}
  101. mapLock := &sync.Mutex{}
  102. n, repeateN := 0, 0
  103. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  104. if util.ObjToString(tmp["subtype"]) == "变更" {
  105. //go IS.Add("update")
  106. continue
  107. }
  108. if n%10000 == 0 {
  109. log.Println("current:", n, tmp["_id"])
  110. }
  111. pool <- true
  112. wg.Add(1)
  113. go func(tmp map[string]interface{}) {
  114. defer func() {
  115. <-pool
  116. wg.Done()
  117. }()
  118. info := NewInfo(tmp)
  119. b, id := DM.check(info)
  120. if b { //有重复,生成更新语句,更新抽取和更新招标
  121. //IS.Add("repeat")
  122. repeateN++
  123. mapLock.Lock()
  124. updateExtract = append(updateExtract, []map[string]interface{}{
  125. map[string]interface{}{
  126. "_id": tmp["_id"],
  127. },
  128. map[string]interface{}{
  129. "$set": map[string]interface{}{
  130. "repeat": 1,
  131. "repeatid": id,
  132. },
  133. },
  134. })
  135. // updateBidding = append(updateBidding, []map[string]interface{}{
  136. // map[string]interface{}{
  137. // "_id": tmp["_id"],
  138. // },
  139. // map[string]interface{}{
  140. // "$set": map[string]interface{}{
  141. // "extracttype": -1,
  142. // "repeatid": id,
  143. // },
  144. // },
  145. // })
  146. if len(updateExtract) > 500 {
  147. mgo.UpdateBulk(extract, updateExtract...)
  148. //mgo.UpdateBulk(bidding, updateBidding...)
  149. //updateExtract, updateBidding = [][]map[string]interface{}{}, [][]map[string]interface{}{}
  150. updateExtract = [][]map[string]interface{}{}
  151. }
  152. mapLock.Unlock()
  153. } else {
  154. //IS.Add("new")
  155. }
  156. }(tmp)
  157. tmp = make(map[string]interface{})
  158. }
  159. wg.Wait()
  160. if len(updateExtract) > 0 {
  161. mgo.UpdateBulk(extract, updateExtract...)
  162. //mgo.UpdateBulk(bidding, updateBidding...)
  163. }
  164. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  165. //任务完成,开始发送广播通知下面节点
  166. if n > repeateN && mapInfo["stop"] == nil {
  167. for _, to := range nextNode {
  168. sid, _ := mapInfo["gtid"].(string)
  169. eid, _ := mapInfo["lteid"].(string)
  170. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  171. by, _ := json.Marshal(map[string]interface{}{
  172. "gtid": sid,
  173. "lteid": eid,
  174. "stype": util.ObjToString(to["stype"]),
  175. "key": key,
  176. })
  177. addr := &net.UDPAddr{
  178. IP: net.ParseIP(to["addr"].(string)),
  179. Port: util.IntAll(to["port"]),
  180. }
  181. node := &udpNode{by, addr, time.Now().Unix(), 0}
  182. udptaskmap.Store(key, node)
  183. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  184. }
  185. }
  186. }