main.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. du "jy/util"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "qfw/util"
  10. "qfw/util/mongodb"
  11. "qfw/util/redis"
  12. "sync"
  13. "time"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. const (
  17. REDISIDS = "ids"
  18. REDISKEYS = "keys"
  19. INFOID = "info"
  20. INFOTIMEOUT = 86400 * 30
  21. )
  22. var (
  23. Sysconfig map[string]interface{}
  24. MQFW mongodb.MongodbSim
  25. extractColl, projectColl string
  26. lenprojectname int
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. toaddr = []*net.UDPAddr{} //下节点对象
  30. MultiThread chan bool
  31. SingleThread = make(chan bool, 1) //udp调用
  32. IdLock = &sync.Mutex{}
  33. PncbMayLock = &sync.Mutex{}
  34. MegerFieldsLen *MegerFields
  35. //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组
  36. PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap()
  37. PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{}
  38. currentMegerTime int64 //合并项目的时间位置,用来清理几个月之前的项目
  39. currentMegerCount int //合并项目的计数,用来定时清理
  40. )
  41. type MegerFields struct {
  42. ProjectNamelen int
  43. ProjectCodelen int
  44. }
  45. type KeyMap struct {
  46. Lock sync.Mutex
  47. Map map[string]*Key
  48. }
  49. type Key struct {
  50. Arr *[]string
  51. Lock *sync.Mutex
  52. }
  53. func NewKeyMap() *KeyMap {
  54. return &KeyMap{
  55. Map: map[string]*Key{},
  56. }
  57. }
  58. func init() {
  59. initarea()
  60. du.SetConsole(false)
  61. du.SetRollingDaily("./", "project.log")
  62. du.SetLevel(du.DEBUG)
  63. util.ReadConfig(&Sysconfig)
  64. MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200))
  65. lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1
  66. megerfields, _ := Sysconfig["megerfields"].(map[string]interface{})
  67. MegerFieldsLen = &MegerFields{
  68. ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5),
  69. ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8),
  70. }
  71. //插入合并参数
  72. if insertmeger, ok := Sysconfig["insertmeger"].(map[string]interface{}); ok {
  73. OmitNumMax = util.Int64All(insertmeger["omitmax"])
  74. DeviationDay = util.Int64All(insertmeger["deviationday"])
  75. HourInterval = util.Int64All(insertmeger["hourinterval"])
  76. }
  77. redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300)
  78. MQFW = mongodb.MongodbSim{
  79. MongodbAddr: Sysconfig["mongodbServers"].(string),
  80. Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
  81. DbName: Sysconfig["mongodbName"].(string),
  82. }
  83. MQFW.InitPool()
  84. extractColl = Sysconfig["extractColl"].(string)
  85. projectColl = Sysconfig["projectColl"].(string)
  86. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  87. for _, m := range nextNode {
  88. toaddr = append(toaddr, &net.UDPAddr{
  89. IP: net.ParseIP(m["addr"].(string)),
  90. Port: util.IntAll(m["port"]),
  91. })
  92. }
  93. }
  94. func main() {
  95. go checkMapJob()
  96. //先加载所有锁
  97. log.Println("loading data from redis...")
  98. n := 0
  99. km := []*KeyMap{PNKey, PCKey, PBKey}
  100. for pos, key := range []string{"pn_*", "pc_*", "pb_*"} {
  101. res := redis.GetKeysByPattern(REDISKEYS, key)
  102. pk := km[pos]
  103. if res != nil { //一次500条
  104. num := 0
  105. arr := []string{}
  106. for _, v := range res {
  107. n++
  108. num++
  109. k := string(v.([]uint8))
  110. arr = append(arr, k) //根据正则找到key
  111. if num == 500 {
  112. num = 0
  113. ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容
  114. if len(ret) > 0 {
  115. for k1, v1 := range ret {
  116. if v1 != nil {
  117. var a1 []string
  118. json.Unmarshal(v1.([]uint8), &a1)
  119. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组
  120. }
  121. }
  122. }
  123. arr = []string{}
  124. }
  125. }
  126. if num > 0 {
  127. ret := redis.Mget(REDISKEYS, arr)
  128. if len(ret) > 0 {
  129. for k1, v1 := range ret {
  130. if v1 != nil {
  131. var a1 []string
  132. json.Unmarshal(v1.([]uint8), &a1)
  133. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}}
  134. }
  135. }
  136. }
  137. arr = []string{}
  138. }
  139. }
  140. }
  141. log.Println("load data from redis finished.", n)
  142. if taskstock, ok := Sysconfig["taskstock"].(map[string]interface{}); ok { //跑存量数据
  143. if b, _ := taskstock["open"].(bool); b {
  144. RunFullData(util.Int64All(taskstock["startTime"]))
  145. }
  146. }
  147. updport := Sysconfig["udpport"].(string)
  148. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  149. udpclient.Listen(processUdpMsg)
  150. log.Println("Udp服务监听", updport)
  151. time.Sleep(99999 * time.Hour)
  152. }
  153. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  154. switch act {
  155. case mu.OP_TYPE_DATA: //上个节点的数据
  156. var mapInfo map[string]interface{}
  157. err := json.Unmarshal(data, &mapInfo)
  158. log.Println("err:", err, "mapInfo:", mapInfo)
  159. if err != nil {
  160. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  161. } else if mapInfo != nil {
  162. key, _ := mapInfo["key"].(string)
  163. if key == "" {
  164. key = "udpok"
  165. }
  166. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  167. SingleThread <- true
  168. go taskInc(mapInfo)
  169. }
  170. case mu.OP_NOOP: //下个节点回应
  171. ok := string(data)
  172. if ok != "" {
  173. log.Println("ok:", ok)
  174. udptaskmap.Delete(ok)
  175. }
  176. }
  177. }
  178. func taskInc(mapInfo map[string]interface{}) {
  179. defer func() {
  180. <-SingleThread
  181. }()
  182. defer util.Catch()
  183. q, _ := mapInfo["query"].(map[string]interface{})
  184. if q == nil {
  185. q = map[string]interface{}{
  186. "_id": map[string]interface{}{
  187. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  188. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  189. },
  190. }
  191. }
  192. sess := MQFW.GetMgoConn()
  193. defer MQFW.DestoryMongoConn(sess)
  194. //数据正序处理
  195. it := sess.DB(MQFW.DbName).C(extractColl).Find(q).Sort("publishtime").Iter()
  196. count, index := 0, 0
  197. pici := time.Now().Unix()
  198. wg := &sync.WaitGroup{}
  199. //idmap := &sync.Map{}
  200. for tmp := make(map[string]interface{}); it.Next(tmp); {
  201. if index%10000 == 0 {
  202. log.Println(index, tmp["_id"])
  203. }
  204. index++
  205. if util.IntAll(tmp["repeat"]) == 1 {
  206. tmp = make(map[string]interface{})
  207. continue
  208. }
  209. pt := util.Int64All(tmp["publishtime"])
  210. if time.Now().Unix()-DeviationDay*86400 > pt { //DeviationDay前的数据不处理,走插入何必
  211. continue
  212. }
  213. if pt > currentMegerTime {
  214. currentMegerTime = pt
  215. }
  216. count++
  217. currentMegerCount++
  218. if currentMegerCount > 300000 {
  219. time.Sleep(100 * time.Millisecond)
  220. clearPKey()
  221. }
  222. thisid := util.BsonIdToSId(tmp["_id"])
  223. b, err := redis.Exists(INFOID, thisid)
  224. if err != nil {
  225. log.Println("checkid err", err.Error())
  226. }
  227. if !b {
  228. wg.Add(1)
  229. //idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
  230. MultiThread <- true
  231. go func(tmp map[string]interface{}, thisid string) {
  232. defer func() {
  233. <-MultiThread
  234. wg.Done()
  235. //idmap.Delete(tmp["_id"])
  236. }()
  237. info := PreThisInfo(tmp)
  238. if info != nil {
  239. lockPNCBMap(info)
  240. storeLock(info)
  241. startProjectMerge(info, tmp)
  242. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  243. currentMegerTime = info.Publishtime
  244. unlockPNCBMap(info)
  245. }
  246. }(tmp, thisid)
  247. }
  248. if count%500 == 0 {
  249. log.Println("count:", count)
  250. }
  251. tmp = make(map[string]interface{})
  252. }
  253. // for {
  254. // time.Sleep(5 * time.Second)
  255. // n := 0
  256. // idmap.Range(func(key interface{}, v interface{}) bool {
  257. // n++
  258. // log.Println(key, v)
  259. // return true
  260. // })
  261. // if n < 1 {
  262. // break
  263. // }
  264. // }
  265. wg.Wait()
  266. log.Println("task over...", index, count)
  267. //发送udp,调用生成项目索引
  268. mapInfo["stype"] = "project"
  269. if mapInfo["stop"] == nil && len(toaddr) > 0 {
  270. for n, to := range toaddr {
  271. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  272. mapInfo["query"] = map[string]interface{}{
  273. "pici": pici,
  274. }
  275. mapInfo["key"] = key
  276. datas, _ := json.Marshal(mapInfo)
  277. node := &udpNode{datas, to, time.Now().Unix(), 0}
  278. udptaskmap.Store(key, node)
  279. udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  280. }
  281. }
  282. }
  283. func NewPushInfo(tmp map[string]interface{}) bson.M {
  284. return bson.M{
  285. "comeintime": tmp["comeintime"],
  286. "publishtime": tmp["publishtime"],
  287. "title": tmp["title"],
  288. "toptype": tmp["toptype"],
  289. "subtype": tmp["subtype"],
  290. "infoformat": tmp["infoformat"],
  291. "infoid": util.BsonIdToSId(tmp["_id"]),
  292. "href": tmp["href"],
  293. "area": tmp["area"],
  294. "city": tmp["city"],
  295. "cresult": tmp["cresult"],
  296. "score": tmp["score"],
  297. }
  298. }