main.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. du "jy/util"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "qfw/util"
  10. "qfw/util/mongodb"
  11. "qfw/util/redis"
  12. "sync"
  13. "time"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. const (
  17. REDISIDS = "ids"
  18. REDISKEYS = "keys"
  19. INFOID = "info"
  20. INFOTIMEOUT = 86400 * 30
  21. )
  22. var (
  23. Sysconfig map[string]interface{}
  24. MQFW mongodb.MongodbSim
  25. extractColl, projectColl string
  26. lenprojectname int
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. toaddr = []*net.UDPAddr{} //下节点对象
  30. MultiThread chan bool
  31. SingleThread = make(chan bool, 1) //udp调用
  32. IdLock = &sync.Mutex{}
  33. PncbMayLock = &sync.Mutex{}
  34. MegerFieldsLen *MegerFields
  35. //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组
  36. PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap()
  37. PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{}
  38. )
  39. type MegerFields struct {
  40. ProjectNamelen int
  41. ProjectCodelen int
  42. }
  43. type KeyMap struct {
  44. Lock sync.Mutex
  45. Map map[string]*Key
  46. }
  47. type Key struct {
  48. Arr *[]string
  49. Lock *sync.Mutex
  50. }
  51. func NewKeyMap() *KeyMap {
  52. return &KeyMap{
  53. Map: map[string]*Key{},
  54. }
  55. }
  56. func init() {
  57. initarea()
  58. du.SetConsole(false)
  59. du.SetRollingDaily("./", "project.log")
  60. du.SetLevel(du.DEBUG)
  61. util.ReadConfig(&Sysconfig)
  62. MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200))
  63. lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1
  64. megerfields, _ := Sysconfig["megerfields"].(map[string]interface{})
  65. MegerFieldsLen = &MegerFields{
  66. ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5),
  67. ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8),
  68. }
  69. redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300)
  70. MQFW = mongodb.MongodbSim{
  71. MongodbAddr: Sysconfig["mongodbServers"].(string),
  72. Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
  73. DbName: Sysconfig["mongodbName"].(string),
  74. }
  75. MQFW.InitPool()
  76. extractColl = Sysconfig["extractColl"].(string)
  77. projectColl = Sysconfig["projectColl"].(string)
  78. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  79. for _, m := range nextNode {
  80. toaddr = append(toaddr, &net.UDPAddr{
  81. IP: net.ParseIP(m["addr"].(string)),
  82. Port: util.IntAll(m["port"]),
  83. })
  84. }
  85. }
  86. func main() {
  87. go checkMapJob()
  88. //先加载所有锁
  89. log.Println("loading data from redis...")
  90. n := 0
  91. km := []*KeyMap{PNKey, PCKey, PBKey}
  92. for pos, key := range []string{"pn_*", "pc_*", "pb_*"} {
  93. res := redis.GetKeysByPattern(REDISKEYS, key)
  94. pk := km[pos]
  95. if res != nil { //一次500条
  96. num := 0
  97. arr := []string{}
  98. for _, v := range res {
  99. n++
  100. num++
  101. k := string(v.([]uint8))
  102. arr = append(arr, k) //根据正则找到key
  103. if num == 500 {
  104. num = 0
  105. ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容
  106. if len(ret) > 0 {
  107. for k1, v1 := range ret {
  108. if v1 != nil {
  109. var a1 []string
  110. json.Unmarshal(v1.([]uint8), &a1)
  111. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组
  112. }
  113. }
  114. }
  115. arr = []string{}
  116. }
  117. }
  118. if num > 0 {
  119. ret := redis.Mget(REDISKEYS, arr)
  120. if len(ret) > 0 {
  121. for k1, v1 := range ret {
  122. if v1 != nil {
  123. var a1 []string
  124. json.Unmarshal(v1.([]uint8), &a1)
  125. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}}
  126. }
  127. }
  128. }
  129. arr = []string{}
  130. }
  131. }
  132. }
  133. log.Println("load data from redis finished.", n)
  134. //清理redis
  135. //clearedis()
  136. updport := Sysconfig["udpport"].(string)
  137. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  138. udpclient.Listen(processUdpMsg)
  139. log.Println("Udp服务监听", updport)
  140. time.Sleep(99999 * time.Hour)
  141. }
  142. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  143. switch act {
  144. case mu.OP_TYPE_DATA: //上个节点的数据
  145. var mapInfo map[string]interface{}
  146. err := json.Unmarshal(data, &mapInfo)
  147. log.Println("err:", err, "mapInfo:", mapInfo)
  148. if err != nil {
  149. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  150. } else if mapInfo != nil {
  151. key, _ := mapInfo["key"].(string)
  152. if key == "" {
  153. key = "udpok"
  154. }
  155. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  156. SingleThread <- true
  157. go task(data, mapInfo)
  158. }
  159. case mu.OP_NOOP: //下个节点回应
  160. ok := string(data)
  161. if ok != "" {
  162. log.Println("ok:", ok)
  163. udptaskmap.Delete(ok)
  164. }
  165. }
  166. }
  167. func task(data []byte, mapInfo map[string]interface{}) {
  168. defer func() {
  169. <-SingleThread
  170. }()
  171. defer util.Catch()
  172. q, _ := mapInfo["query"].(map[string]interface{})
  173. if q == nil {
  174. q = map[string]interface{}{
  175. "_id": map[string]interface{}{
  176. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  177. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  178. },
  179. }
  180. }
  181. sess := MQFW.GetMgoConn()
  182. defer MQFW.DestoryMongoConn(sess)
  183. //数据正序处理
  184. it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Iter() //.Sort("publishtime")
  185. count, index := 0, 0
  186. pici := time.Now().Unix()
  187. wg := &sync.WaitGroup{}
  188. idmap := &sync.Map{}
  189. for tmp := make(map[string]interface{}); it.Next(tmp); {
  190. if index%10000 == 0 {
  191. log.Println(index, tmp["_id"])
  192. }
  193. index++
  194. if util.IntAll(tmp["repeat"]) == 1 {
  195. tmp = make(map[string]interface{})
  196. continue
  197. }
  198. count++
  199. thisid := util.BsonIdToSId(tmp["_id"])
  200. b, err := redis.Exists(INFOID, thisid)
  201. if err != nil {
  202. log.Println("checkid err", err.Error())
  203. }
  204. if !b {
  205. wg.Add(1)
  206. idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
  207. MultiThread <- true
  208. go func(tmp map[string]interface{}, thisid string) {
  209. defer func() {
  210. <-MultiThread
  211. wg.Done()
  212. idmap.Delete(tmp["_id"])
  213. }()
  214. info := PreThisInfo(tmp)
  215. if info != nil {
  216. lockPNCBMap(info)
  217. startProjectMerge(info, tmp)
  218. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  219. unlockPNCBMap(info)
  220. }
  221. }(tmp, thisid)
  222. }
  223. if count%500 == 0 {
  224. log.Println("count:", count)
  225. }
  226. tmp = make(map[string]interface{})
  227. }
  228. for {
  229. time.Sleep(5 * time.Second)
  230. n := 0
  231. idmap.Range(func(key interface{}, v interface{}) bool {
  232. n++
  233. log.Println(key, v)
  234. return true
  235. })
  236. if n < 1 {
  237. break
  238. }
  239. }
  240. wg.Wait()
  241. log.Println("task over...", index, count)
  242. //发送udp,调用生成项目索引
  243. mapInfo["stype"] = "project"
  244. if mapInfo["stop"] == nil && len(toaddr) > 0 {
  245. for n, to := range toaddr {
  246. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  247. mapInfo["query"] = map[string]interface{}{
  248. "pici": pici,
  249. }
  250. mapInfo["key"] = key
  251. datas, _ := json.Marshal(mapInfo)
  252. node := &udpNode{datas, to, time.Now().Unix(), 0}
  253. udptaskmap.Store(key, node)
  254. udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  255. }
  256. }
  257. }
  258. func NewPushInfo(tmp map[string]interface{}) bson.M {
  259. return bson.M{
  260. "comeintime": tmp["comeintime"],
  261. "publishtime": tmp["publishtime"],
  262. "title": tmp["title"],
  263. "toptype": tmp["toptype"],
  264. "subtype": tmp["subtype"],
  265. "infoformat": tmp["infoformat"],
  266. "infoid": util.BsonIdToSId(tmp["_id"]),
  267. "href": tmp["href"],
  268. "area": tmp["area"],
  269. "city": tmp["city"],
  270. "cresult": tmp["cresult"],
  271. "score": tmp["score"],
  272. }
  273. }