main.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package main
  2. /**
  3. 生成中标企业库
  4. redis去重后存入mongodb库,然后调用下一个节点生成索引
  5. **/
  6. import (
  7. "encoding/json"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "qfw/util"
  13. //"qfw/util/elastic"
  14. "qfw/util/mongodb"
  15. "qfw/util/redis"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. Sysconfig map[string]interface{} //配置文件
  21. mconf map[string]interface{} //mongodb配置信息
  22. mgo *mongodb.MongodbSim //mongodb操作对象
  23. udpclient mu.UdpClient //udp对象
  24. nextNode []map[string]interface{} //下节点数组
  25. toaddr = []*net.UDPAddr{} //下节点对象
  26. collect string //保存的库名
  27. redisLock = &sync.Mutex{}
  28. )
  29. func init() {
  30. util.ReadConfig(&Sysconfig)
  31. collect, _ = Sysconfig["collect"].(string)
  32. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  33. for _, m := range nextNode {
  34. toaddr = append(toaddr, &net.UDPAddr{
  35. IP: net.ParseIP(m["addr"].(string)),
  36. Port: util.IntAll(m["port"]),
  37. })
  38. }
  39. mconf = Sysconfig["mongodb"].(map[string]interface{})
  40. mgo = &mongodb.MongodbSim{
  41. MongodbAddr: mconf["addr"].(string),
  42. DbName: mconf["db"].(string),
  43. Size: util.IntAllDef(mconf["pool"], 5),
  44. }
  45. mgo.InitPool()
  46. rconf := Sysconfig["redis"].(map[string]interface{})
  47. redis.InitRedisBySize(rconf["addr"].(string), util.IntAllDef(rconf["pool"], 5), 5, 240)
  48. }
  49. func main() {
  50. go checkMapJob()
  51. updport := Sysconfig["udpport"].(string)
  52. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  53. udpclient.Listen(processUdpMsg)
  54. log.Println("Udp服务监听", updport)
  55. time.Sleep(99999 * time.Hour)
  56. }
  57. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  58. switch act {
  59. case mu.OP_TYPE_DATA: //上个节点的数据
  60. //从表中开始处理生成企业数据
  61. var mapInfo map[string]interface{}
  62. err := json.Unmarshal(data, &mapInfo)
  63. log.Println("err:", err, "mapInfo:", mapInfo)
  64. if err != nil {
  65. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  66. } else if mapInfo != nil {
  67. go task(data, mapInfo)
  68. key, _ := mapInfo["key"].(string)
  69. if key == "" {
  70. key = "udpok"
  71. }
  72. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  73. }
  74. case mu.OP_NOOP: //下个节点回应
  75. ok := string(data)
  76. if ok != "" {
  77. log.Println("ok:", ok)
  78. udptaskmap.Delete(ok)
  79. }
  80. }
  81. }
  82. //开始判重程序
  83. func task(data []byte, mapInfo map[string]interface{}) {
  84. defer util.Catch()
  85. //区间id
  86. sess := mgo.GetMgoConn()
  87. defer mgo.DestoryMongoConn(sess)
  88. q := map[string]interface{}{
  89. "_id": map[string]interface{}{
  90. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  91. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  92. },
  93. }
  94. it := sess.DB(mgo.DbName).C(mconf["collect"].(string)).Find(&q).Iter()
  95. nameArr := []map[string]interface{}{}
  96. nameLock := &sync.Mutex{}
  97. pool := make(chan bool, 8)
  98. wg := &sync.WaitGroup{}
  99. n, newN := 0, 0
  100. pici := time.Now().Unix()
  101. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  102. pool <- true
  103. wg.Add(1)
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-pool
  107. wg.Done()
  108. }()
  109. //names := []string{}
  110. names := []map[string]string{}
  111. winner, _ := tmp["winner"].(string)
  112. if len(winner) > 9 {
  113. //names = append(names, winner)
  114. names = append(names, map[string]string{"name": winner, "id": util.BsonIdToSId(tmp["_id"]), "type": "winner"})
  115. }
  116. winnerOrder, _ := tmp["winnerorder"].([]map[string]interface{})
  117. if len(winnerOrder) > 0 {
  118. for _, m := range winnerOrder {
  119. wm := m["entname"].(string)
  120. if len(wm) > 9 {
  121. //names = append(names, wm)
  122. names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "winnerorder"})
  123. }
  124. }
  125. }
  126. package1 := tmp["package"]
  127. if package1 != nil {
  128. packageM := package1.(map[string]interface{})
  129. for _, p := range packageM {
  130. pm := p.(map[string]interface{})
  131. pw, _ := pm["winner"].(string)
  132. if len(pw) > 9 {
  133. //names = append(names, pw)
  134. names = append(names, map[string]string{"name": pw, "id": util.BsonIdToSId(tmp["_id"]), "type": "package"})
  135. }
  136. pwo, _ := pm["winnerorder"].([]map[string]interface{})
  137. if len(pwo) > 0 {
  138. for _, m := range pwo {
  139. wm := m["entname"].(string)
  140. if len(wm) > 9 {
  141. //names = append(names, wm)
  142. names = append(names, map[string]string{"name": wm, "id": util.BsonIdToSId(tmp["_id"]), "type": "pwinnerorder"})
  143. }
  144. }
  145. }
  146. }
  147. }
  148. if len(names) > 0 {
  149. for _, tmp := range names {
  150. redisLock.Lock()
  151. b, _ := redis.Exists("winner", tmp["name"])
  152. if !b {
  153. go IS.Add("normal")
  154. newN++
  155. redis.PutCKV("winner", tmp["name"], 1)
  156. nameLock.Lock()
  157. nameArr = append(nameArr, map[string]interface{}{
  158. "name": tmp["name"],
  159. "sid": tmp["id"],
  160. "type": tmp["type"],
  161. "pici": pici,
  162. })
  163. nameLock.Unlock()
  164. } else {
  165. go IS.Add("repeat")
  166. }
  167. redisLock.Unlock()
  168. }
  169. }
  170. if n%1000 == 0 {
  171. log.Println("current:", n)
  172. }
  173. nameLock.Lock()
  174. if len(nameArr) >= 800 {
  175. mgo.SaveBulk(collect, nameArr...)
  176. nameArr = []map[string]interface{}{}
  177. }
  178. nameLock.Unlock()
  179. }(tmp)
  180. tmp = make(map[string]interface{})
  181. }
  182. wg.Wait()
  183. if len(nameArr) > 0 {
  184. mgo.SaveBulk(collect, nameArr...)
  185. }
  186. log.Println("this task over.", n, "newN:", newN)
  187. //任务完成,开始发送广播通知下面节点
  188. if newN > 0 && mapInfo["stop"] == nil {
  189. mapInfo["stype"] = "winner"
  190. for n, to := range toaddr {
  191. key := fmt.Sprintf("%d-%s-%d", pici, "winner", n)
  192. mapInfo["query"] = map[string]interface{}{
  193. "pici": pici,
  194. }
  195. mapInfo["key"] = key
  196. data, _ = json.Marshal(mapInfo)
  197. node := &udpNode{data, to, time.Now().Unix(), 0}
  198. udptaskmap.Store(key, node)
  199. udpclient.WriteUdp(data, mu.OP_TYPE_DATA, to)
  200. }
  201. }
  202. }