main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. du "jy/util"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "qfw/util"
  10. "qfw/util/mongodb"
  11. "qfw/util/redis"
  12. "sync"
  13. "time"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. const (
  17. REDISIDS = "ids"
  18. REDISKEYS = "keys"
  19. INFOID = "info"
  20. INFOTIMEOUT = 86400 * 30
  21. )
  22. var (
  23. Sysconfig map[string]interface{}
  24. MQFW mongodb.MongodbSim
  25. extractColl, projectColl string
  26. lenprojectname int
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. toaddr = []*net.UDPAddr{} //下节点对象
  30. MultiThread chan bool
  31. SingleThread = make(chan bool, 1) //udp调用
  32. IdLock = &sync.Mutex{}
  33. PncbMayLock = &sync.Mutex{}
  34. MegerFieldsLen *MegerFields
  35. //三组lock,对应的(PNKey)key为项目名称,值对应的是此项目名称对应的项目id数组
  36. PNKey, PCKey, PBKey = NewKeyMap(), NewKeyMap(), NewKeyMap()
  37. PNKeyMap, PCKeyMap, PBKeyMap = sync.Map{}, sync.Map{}, sync.Map{}
  38. currentMegerTime int64 //合并项目的时间位置,用来清理几个月之前的项目
  39. currentMegerCount int //合并项目的计数,用来定时清理
  40. )
  41. type MegerFields struct {
  42. ProjectNamelen int
  43. ProjectCodelen int
  44. }
  45. type KeyMap struct {
  46. Lock sync.Mutex
  47. Map map[string]*Key
  48. }
  49. type Key struct {
  50. Arr *[]string
  51. Lock *sync.Mutex
  52. }
  53. func NewKeyMap() *KeyMap {
  54. return &KeyMap{
  55. Map: map[string]*Key{},
  56. }
  57. }
  58. func init() {
  59. initarea()
  60. du.SetConsole(false)
  61. du.SetRollingDaily("./", "project.log")
  62. du.SetLevel(du.DEBUG)
  63. util.ReadConfig(&Sysconfig)
  64. MultiThread = make(chan bool, util.IntAllDef(Sysconfig["thread"], 200))
  65. lenprojectname = util.IntAllDef(Sysconfig["lenprojectname"], 20) - 1
  66. megerfields, _ := Sysconfig["megerfields"].(map[string]interface{})
  67. MegerFieldsLen = &MegerFields{
  68. ProjectNamelen: util.IntAllDef(megerfields["projectlen"], 5),
  69. ProjectCodelen: util.IntAllDef(megerfields["projectcodelen"], 8),
  70. }
  71. redis.InitRedisBySize(Sysconfig["redisaddrs"].(string), util.IntAllDef(Sysconfig["redisPoolSize"], 100), 30, 300)
  72. MQFW = mongodb.MongodbSim{
  73. MongodbAddr: Sysconfig["mongodbServers"].(string),
  74. Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
  75. DbName: Sysconfig["mongodbName"].(string),
  76. }
  77. MQFW.InitPool()
  78. extractColl = Sysconfig["extractColl"].(string)
  79. projectColl = Sysconfig["projectColl"].(string)
  80. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  81. for _, m := range nextNode {
  82. toaddr = append(toaddr, &net.UDPAddr{
  83. IP: net.ParseIP(m["addr"].(string)),
  84. Port: util.IntAll(m["port"]),
  85. })
  86. }
  87. }
  88. func main() {
  89. go checkMapJob()
  90. //先加载所有锁
  91. log.Println("loading data from redis...")
  92. n := 0
  93. km := []*KeyMap{PNKey, PCKey, PBKey}
  94. for pos, key := range []string{"pn_*", "pc_*", "pb_*"} {
  95. res := redis.GetKeysByPattern(REDISKEYS, key)
  96. pk := km[pos]
  97. if res != nil { //一次500条
  98. num := 0
  99. arr := []string{}
  100. for _, v := range res {
  101. n++
  102. num++
  103. k := string(v.([]uint8))
  104. arr = append(arr, k) //根据正则找到key
  105. if num == 500 {
  106. num = 0
  107. ret := redis.Mget(REDISKEYS, arr) //根据key批量取内容
  108. if len(ret) > 0 {
  109. for k1, v1 := range ret {
  110. if v1 != nil {
  111. var a1 []string
  112. json.Unmarshal(v1.([]uint8), &a1)
  113. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}} //pn_项目名称 id数组
  114. }
  115. }
  116. }
  117. arr = []string{}
  118. }
  119. }
  120. if num > 0 {
  121. ret := redis.Mget(REDISKEYS, arr)
  122. if len(ret) > 0 {
  123. for k1, v1 := range ret {
  124. if v1 != nil {
  125. var a1 []string
  126. json.Unmarshal(v1.([]uint8), &a1)
  127. pk.Map[arr[k1]] = &Key{&a1, &sync.Mutex{}}
  128. }
  129. }
  130. }
  131. arr = []string{}
  132. }
  133. }
  134. }
  135. log.Println("load data from redis finished.", n)
  136. //清理redis
  137. //clearedis()
  138. if taskstock, ok := Sysconfig["taskstock"].(map[string]interface{}); ok { //跑存量数据
  139. if b, _ := taskstock["open"].(bool); b {
  140. startdate, _ := taskstock["startdate"].(string)
  141. endate, _ := taskstock["endate"].(string)
  142. taskStock(startdate, endate)
  143. }
  144. }
  145. updport := Sysconfig["udpport"].(string)
  146. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  147. udpclient.Listen(processUdpMsg)
  148. log.Println("Udp服务监听", updport)
  149. time.Sleep(99999 * time.Hour)
  150. }
  151. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  152. switch act {
  153. case mu.OP_TYPE_DATA: //上个节点的数据
  154. var mapInfo map[string]interface{}
  155. err := json.Unmarshal(data, &mapInfo)
  156. log.Println("err:", err, "mapInfo:", mapInfo)
  157. if err != nil {
  158. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  159. } else if mapInfo != nil {
  160. key, _ := mapInfo["key"].(string)
  161. if key == "" {
  162. key = "udpok"
  163. }
  164. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  165. SingleThread <- true
  166. go taskInc(mapInfo)
  167. }
  168. case mu.OP_NOOP: //下个节点回应
  169. ok := string(data)
  170. if ok != "" {
  171. log.Println("ok:", ok)
  172. udptaskmap.Delete(ok)
  173. }
  174. }
  175. }
  176. func taskInc(mapInfo map[string]interface{}) {
  177. defer func() {
  178. <-SingleThread
  179. }()
  180. defer util.Catch()
  181. q, _ := mapInfo["query"].(map[string]interface{})
  182. if q == nil {
  183. q = map[string]interface{}{
  184. "_id": map[string]interface{}{
  185. "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
  186. "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
  187. },
  188. }
  189. }
  190. sess := MQFW.GetMgoConn()
  191. defer MQFW.DestoryMongoConn(sess)
  192. //数据正序处理
  193. it := sess.DB(MQFW.DbName).C(extractColl).Find(map[string]interface{}{}).Sort("publishtime").Iter()
  194. count, index := 0, 0
  195. pici := time.Now().Unix()
  196. wg := &sync.WaitGroup{}
  197. //idmap := &sync.Map{}
  198. for tmp := make(map[string]interface{}); it.Next(tmp); {
  199. if index%10000 == 0 {
  200. log.Println(index, tmp["_id"])
  201. }
  202. index++
  203. if util.IntAll(tmp["repeat"]) == 1 {
  204. tmp = make(map[string]interface{})
  205. continue
  206. }
  207. pt := util.Int64All(tmp["publishtime"])
  208. if pt > currentMegerTime {
  209. currentMegerTime = pt
  210. }
  211. count++
  212. currentMegerCount++
  213. if currentMegerCount > 300000 {
  214. time.Sleep(100 * time.Millisecond)
  215. clearPKey()
  216. }
  217. thisid := util.BsonIdToSId(tmp["_id"])
  218. b, err := redis.Exists(INFOID, thisid)
  219. if err != nil {
  220. log.Println("checkid err", err.Error())
  221. }
  222. if !b {
  223. wg.Add(1)
  224. //idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
  225. MultiThread <- true
  226. go func(tmp map[string]interface{}, thisid string) {
  227. defer func() {
  228. <-MultiThread
  229. wg.Done()
  230. //idmap.Delete(tmp["_id"])
  231. }()
  232. info := PreThisInfo(tmp)
  233. if info != nil {
  234. lockPNCBMap(info)
  235. startProjectMerge(info, tmp)
  236. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  237. currentMegerTime = info.Publishtime
  238. unlockPNCBMap(info)
  239. }
  240. }(tmp, thisid)
  241. }
  242. if count%500 == 0 {
  243. log.Println("count:", count)
  244. }
  245. tmp = make(map[string]interface{})
  246. }
  247. // for {
  248. // time.Sleep(5 * time.Second)
  249. // n := 0
  250. // idmap.Range(func(key interface{}, v interface{}) bool {
  251. // n++
  252. // log.Println(key, v)
  253. // return true
  254. // })
  255. // if n < 1 {
  256. // break
  257. // }
  258. // }
  259. wg.Wait()
  260. log.Println("task over...", index, count)
  261. //发送udp,调用生成项目索引
  262. mapInfo["stype"] = "project"
  263. if mapInfo["stop"] == nil && len(toaddr) > 0 {
  264. for n, to := range toaddr {
  265. key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
  266. mapInfo["query"] = map[string]interface{}{
  267. "pici": pici,
  268. }
  269. mapInfo["key"] = key
  270. datas, _ := json.Marshal(mapInfo)
  271. node := &udpNode{datas, to, time.Now().Unix(), 0}
  272. udptaskmap.Store(key, node)
  273. udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
  274. }
  275. }
  276. }
  277. func taskStock(startDate, endDate string) {
  278. defer func() {
  279. <-SingleThread
  280. }()
  281. defer util.Catch()
  282. publishtimes := []map[string]interface{}{}
  283. start, _ := time.ParseInLocation(util.Date_Short_Layout, startDate, time.Local)
  284. end, _ := time.ParseInLocation(util.Date_Short_Layout, endDate, time.Local)
  285. for {
  286. publishtime := map[string]interface{}{
  287. "date": start.Format(util.Date_Short_Layout),
  288. "stime": start.Unix(),
  289. "etime": start.Add(24 * time.Hour).Unix(),
  290. }
  291. publishtimes = append(publishtimes, publishtime)
  292. start = start.Add(24 * time.Hour)
  293. if start.Unix() > end.Unix() {
  294. break
  295. }
  296. }
  297. sess := MQFW.GetMgoConn()
  298. defer MQFW.DestoryMongoConn(sess)
  299. wg := &sync.WaitGroup{}
  300. idmap := &sync.Map{}
  301. count, index := 0, 0
  302. for _, v := range publishtimes {
  303. q := map[string]interface{}{
  304. "publishtime": map[string]interface{}{
  305. "$gt": util.Int64All(v["stime"]),
  306. "$lte": util.Int64All(v["etime"]),
  307. },
  308. }
  309. log.Println(q)
  310. //数据正序处理
  311. it := sess.DB(MQFW.DbName).C(extractColl).Find(&q).Sort("publishtime").Iter()
  312. datenum := 0
  313. for tmp := make(map[string]interface{}); it.Next(tmp); {
  314. if index%10000 == 0 {
  315. log.Println(index, tmp["_id"])
  316. }
  317. index++
  318. datenum++
  319. if util.IntAll(tmp["repeat"]) == 1 {
  320. tmp = make(map[string]interface{})
  321. continue
  322. }
  323. pt := util.Int64All(tmp["publishtime"])
  324. if pt > currentMegerTime {
  325. currentMegerTime = pt
  326. }
  327. count++
  328. currentMegerCount++
  329. if currentMegerCount > 300000 {
  330. log.Println("执行清理", currentMegerTime)
  331. time.Sleep(1 * time.Second)
  332. clearPKey()
  333. currentMegerCount = 0
  334. }
  335. thisid := util.BsonIdToSId(tmp["_id"])
  336. b, err := redis.Exists(INFOID, thisid)
  337. if err != nil {
  338. log.Println("checkid err", err.Error())
  339. }
  340. if !b {
  341. wg.Add(1)
  342. idmap.Store(tmp["_id"], true) //增加判重逻辑,重复id不再生成
  343. MultiThread <- true
  344. go func(tmp map[string]interface{}, thisid string) {
  345. defer func() {
  346. <-MultiThread
  347. wg.Done()
  348. idmap.Delete(tmp["_id"])
  349. }()
  350. info := PreThisInfo(tmp)
  351. if info != nil {
  352. lockPNCBMap(info)
  353. startProjectMerge(info, tmp)
  354. redis.Put(INFOID, thisid, 1, INFOTIMEOUT)
  355. currentMegerTime = info.Publishtime
  356. unlockPNCBMap(info)
  357. }
  358. }(tmp, thisid)
  359. }
  360. if count%1000 == 0 {
  361. log.Println("count:", count)
  362. }
  363. tmp = make(map[string]interface{})
  364. }
  365. log.Println(v["date"], datenum)
  366. }
  367. for {
  368. time.Sleep(5 * time.Second)
  369. n := 0
  370. idmap.Range(func(key interface{}, v interface{}) bool {
  371. n++
  372. log.Println(key, v)
  373. return true
  374. })
  375. if n < 1 {
  376. break
  377. }
  378. }
  379. wg.Wait()
  380. log.Println("taskStock over...", index, count)
  381. }
  382. func NewPushInfo(tmp map[string]interface{}) bson.M {
  383. return bson.M{
  384. "comeintime": tmp["comeintime"],
  385. "publishtime": tmp["publishtime"],
  386. "title": tmp["title"],
  387. "toptype": tmp["toptype"],
  388. "subtype": tmp["subtype"],
  389. "infoformat": tmp["infoformat"],
  390. "infoid": util.BsonIdToSId(tmp["_id"]),
  391. "href": tmp["href"],
  392. "area": tmp["area"],
  393. "city": tmp["city"],
  394. "cresult": tmp["cresult"],
  395. "score": tmp["score"],
  396. }
  397. }