task.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. )
  14. /**
  15. 任务入口
  16. 全量、增量合并
  17. 更新、插入,内存清理
  18. 转换成info对象
  19. **/
  20. //项目合并对象
  21. type ProjectTask struct {
  22. InitMinTime int64 //最小时间,小于0的处理一次
  23. name string
  24. thread int //线程数
  25. //查找锁
  26. findLock sync.Mutex
  27. wg sync.WaitGroup
  28. //map锁
  29. AllIdsMapLock sync.Mutex
  30. //对应的id
  31. AllIdsMap map[string]*ID
  32. //采购单位、项目名称、项目编号
  33. mapPb, mapPn, mapPc map[string]*Key
  34. //更新或新增通道
  35. updatePool chan []map[string]interface{}
  36. //表名
  37. coll string
  38. //当前状态是全量还是增量
  39. currentType string //当前是跑全量还是跑增量
  40. //
  41. clearContimes int
  42. //当前时间
  43. currentTime int64
  44. //保存长度
  45. saveSize int
  46. pici int64
  47. }
  48. func NewPT() *ProjectTask {
  49. return &ProjectTask{
  50. InitMinTime: int64(1325347200),
  51. name: "全/增量对象",
  52. thread: 4,
  53. updatePool: make(chan []map[string]interface{}, 2000),
  54. wg: sync.WaitGroup{},
  55. AllIdsMap: make(map[string]*ID, 10000000),
  56. mapPb: make(map[string]*Key, 3000000),
  57. mapPn: make(map[string]*Key, 10000000),
  58. mapPc: make(map[string]*Key, 10000000),
  59. saveSize: 200,
  60. coll: ProjectColl,
  61. }
  62. }
  63. var P_QL *ProjectTask
  64. //初始化全量合并对象
  65. func init() {
  66. P_QL = NewPT()
  67. go P_QL.updateQueue()
  68. go P_QL.clearMem()
  69. }
  70. //项目保存和更新通道
  71. func (p *ProjectTask) updateQueue() {
  72. arr := make([][]map[string]interface{}, p.saveSize)
  73. index := 0
  74. for {
  75. select {
  76. case v := <-p.updatePool:
  77. arr[index] = v
  78. index++
  79. if index == p.saveSize {
  80. MongoTool.UpSertBulk(p.coll, arr...)
  81. arr = make([][]map[string]interface{}, p.saveSize)
  82. index = 0
  83. }
  84. case <-time.After(2 * time.Second):
  85. if index > 0 {
  86. MongoTool.UpSertBulk(p.coll, arr[:index]...)
  87. arr = make([][]map[string]interface{}, p.saveSize)
  88. index = 0
  89. }
  90. }
  91. }
  92. }
  93. //项目合并内存更新
  94. func (p *ProjectTask) clearMem() {
  95. c := cron.New()
  96. //在内存中保留最近6个月的信息
  97. validTime := int64(6 * 30 * 86400)
  98. //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
  99. c.AddFunc("50 0/4 * * * *", func() {
  100. if p.currentType == "ql" || p.clearContimes >= 100 {
  101. //跳过的次数清零
  102. p.clearContimes = 0
  103. //信息进入查找对比全局锁
  104. p.findLock.Lock()
  105. defer p.findLock.Unlock()
  106. //合并进行的任务都完成
  107. p.wg.Wait()
  108. //遍历id
  109. //所有内存中的项目信息
  110. p.AllIdsMapLock.Lock()
  111. defer p.AllIdsMapLock.Unlock()
  112. //清除计数
  113. clearNum := 0
  114. for k, v := range p.AllIdsMap {
  115. if p.currentTime-v.P.LastTime > validTime {
  116. clearNum++
  117. //删除id的map
  118. delete(p.AllIdsMap, k)
  119. //删除pb
  120. if v.P.Buyer != "" {
  121. ids := p.mapPb[v.P.Buyer]
  122. if ids != nil {
  123. ids.Lock.Lock()
  124. ids.Arr = deleteSlice(ids.Arr, k)
  125. if len(ids.Arr) == 0 {
  126. delete(p.mapPb, v.P.Buyer)
  127. }
  128. ids.Lock.Unlock()
  129. }
  130. }
  131. //删除mapPn
  132. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  133. if vn != "" {
  134. ids := p.mapPn[vn]
  135. if ids != nil {
  136. ids.Lock.Lock()
  137. ids.Arr = deleteSlice(ids.Arr, k)
  138. if len(ids.Arr) == 0 {
  139. delete(p.mapPn, vn)
  140. }
  141. ids.Lock.Unlock()
  142. }
  143. }
  144. }
  145. //删除mapPc
  146. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  147. if vn != "" {
  148. ids := p.mapPc[vn]
  149. if ids != nil {
  150. ids.Lock.Lock()
  151. ids.Arr = deleteSlice(ids.Arr, k)
  152. if len(ids.Arr) == 0 {
  153. delete(p.mapPc, vn)
  154. }
  155. ids.Lock.Unlock()
  156. }
  157. }
  158. }
  159. v = nil
  160. }
  161. }
  162. log.Println("清除完成:", clearNum, len(p.AllIdsMap))
  163. } else {
  164. p.clearContimes++
  165. }
  166. })
  167. c.Start()
  168. select {}
  169. }
  170. //全量合并
  171. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  172. defer util.Catch()
  173. //1、检查pubilshtime索引
  174. db, _ := udpInfo["db"].(string)
  175. if db == "" {
  176. db = MongoTool.DbName
  177. }
  178. coll, _ := udpInfo["coll"].(string)
  179. if coll == "" {
  180. coll = ExtractColl
  181. }
  182. // sess := MongoTool.GetMgoConn()
  183. // bcon := false
  184. // if sess.DB(db).C(coll).EnsureIndexKey("publishtime_1", "publishtime_-1") == nil {
  185. // bcon = true
  186. // } else {
  187. // log.Println("publishtime_1索引不存在")
  188. // }
  189. // MongoTool.DestoryMongoConn(sess)
  190. thread := util.IntAllDef(udpInfo["thread"], 4)
  191. if thread > 0 {
  192. p.thread = thread
  193. }
  194. q, _ := udpInfo["query"].(map[string]interface{})
  195. if q == nil {
  196. q = map[string]interface{}{}
  197. lteid, _ := udpInfo["lteid"].(string)
  198. var idmap map[string]interface{}
  199. if len(lteid) > 15 {
  200. idmap = map[string]interface{}{
  201. "$lte": util.StringTOBsonId(lteid),
  202. }
  203. }
  204. gtid, _ := udpInfo["gtid"].(string)
  205. if len(gtid) > 15 {
  206. if idmap == nil {
  207. idmap = map[string]interface{}{}
  208. }
  209. idmap["$gt"] = util.StringTOBsonId(gtid)
  210. }
  211. if idmap != nil {
  212. q["_id"] = idmap
  213. }
  214. }
  215. //生成查询语句执行
  216. p.enter(db, coll, q)
  217. }
  218. //增量合并
  219. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  220. defer util.Catch()
  221. //1、检查pubilshtime索引
  222. db, _ := udpInfo["db"].(string)
  223. if db == "" {
  224. db = MongoTool.DbName
  225. }
  226. coll, _ := udpInfo["coll"].(string)
  227. if coll == "" {
  228. coll = ExtractColl
  229. }
  230. thread := util.IntAllDef(udpInfo["thread"], 3)
  231. if thread > 0 {
  232. p.thread = thread
  233. }
  234. //开始id和结束id
  235. q, _ := udpInfo["query"].(map[string]interface{})
  236. gtid := udpInfo["gtid"].(string)
  237. lteid := udpInfo["lteid"].(string)
  238. if q == nil {
  239. q = map[string]interface{}{
  240. "_id": map[string]interface{}{
  241. "$gt": util.StringTOBsonId(gtid), //util.StringTOBsonId(udpInfo["gtid"].(string)),
  242. "$lte": util.StringTOBsonId(lteid), //util.StringTOBsonId(udpInfo["lteid"].(string)),
  243. },
  244. }
  245. }
  246. if q != nil {
  247. //生成查询语句执行
  248. p.enter(db, coll, q)
  249. }
  250. nextNode(gtid, lteid, "project", p.pici)
  251. }
  252. //通知下个节点nextNode
  253. func nextNode(gtid, lteid, stype string, pici int64) {
  254. by, _ := json.Marshal(map[string]interface{}{
  255. "gtid": gtid,
  256. "lteid": lteid,
  257. "stype": stype,
  258. "query": map[string]interface{}{
  259. "pici": pici,
  260. },
  261. })
  262. log.Println("nextnode", string(by))
  263. for _, v := range NextNode {
  264. if node, ok := v.(map[string]interface{}); ok {
  265. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  266. IP: net.ParseIP(node["addr"].(string)),
  267. Port: util.IntAll(node["port"]),
  268. })
  269. }
  270. }
  271. }
  272. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  273. defer util.Catch()
  274. sess := MongoTool.GetMgoConn()
  275. defer MongoTool.DestoryMongoConn(sess)
  276. query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  277. pool := make(chan bool, p.thread)
  278. count := 0
  279. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  280. info := ParseInfo(tmp)
  281. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  282. pool <- true
  283. go func(info *Info, tmp map[string]interface{}) {
  284. defer func() {
  285. p.currentTime = info.Publishtime
  286. <-pool
  287. }()
  288. p.startProjectMerge(info, tmp)
  289. }(info, tmp)
  290. } else {
  291. //信息错误,进行更新
  292. }
  293. if count%1000 == 0 {
  294. log.Println("current", count)
  295. }
  296. tmp = make(map[string]interface{})
  297. }
  298. //阻塞
  299. for n := 0; n < p.thread; n++ {
  300. pool <- true
  301. }
  302. log.Println("所有线程执行完成...", count)
  303. }
  304. var (
  305. //从标题获取项目编号
  306. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  307. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  308. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  309. //项目编号过滤
  310. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  311. //项目编号只是数字或只是字母4个以下
  312. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  313. //纯数字或纯字母
  314. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  315. )
  316. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  317. bys, _ := json.Marshal(tmp)
  318. var thisinfo *Info
  319. json.Unmarshal(bys, &thisinfo)
  320. if thisinfo == nil {
  321. return nil
  322. }
  323. if len(thisinfo.Topscopeclass) == 0 {
  324. thisinfo.Topscopeclass = []string{}
  325. }
  326. if len(thisinfo.Subscopeclass) == 0 {
  327. thisinfo.Subscopeclass = []string{}
  328. }
  329. //从标题中查找项目编号
  330. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  331. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  332. thisinfo.PTC = res[1]
  333. } else {
  334. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  335. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  336. thisinfo.PTC = res[3]
  337. } else {
  338. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  339. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  340. thisinfo.PTC = res[1]
  341. }
  342. }
  343. }
  344. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  345. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  346. if thisinfo.ProjectName != "" {
  347. thisinfo.pnbval++
  348. }
  349. }
  350. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  351. if thisinfo.ProjectCode != "" {
  352. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  353. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  354. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  355. }
  356. } else {
  357. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  358. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  359. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  360. }
  361. }
  362. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  363. thisinfo.pnbval++
  364. }
  365. }
  366. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  367. thisinfo.PTC = ""
  368. }
  369. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  370. thisinfo.pnbval++
  371. } else {
  372. thisinfo.Buyer = ""
  373. }
  374. //winners整理
  375. winner, _ := tmp["winner"].(string)
  376. m1 := map[string]bool{}
  377. winners := []string{}
  378. if winner != "" {
  379. m1[winner] = true
  380. winners = append(winners, winner)
  381. }
  382. if thisinfo.HasPackage {
  383. packageM, _ := tmp["package"].(map[string]interface{})
  384. for _, p := range packageM {
  385. pm, _ := p.(map[string]interface{})
  386. pw, _ := pm["winner"].(string)
  387. if pw != "" {
  388. m1[pw] = true
  389. winners = append(winners, pw)
  390. }
  391. }
  392. }
  393. thisinfo.Winners = winners
  394. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  395. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  396. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  397. return thisinfo
  398. }
  399. //从数组中删除元素
  400. func deleteSlice(arr []string, v string) []string {
  401. for k, v1 := range arr {
  402. if v1 == v {
  403. return append(arr[:k], arr[k+1:]...)
  404. }
  405. }
  406. return arr
  407. }