task.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/robfig/cron"
  13. )
  14. /**
  15. 任务入口
  16. 全量、增量合并
  17. 更新、插入,内存清理
  18. 转换成info对象
  19. **/
  20. //项目合并对象
  21. type ProjectTask struct {
  22. InitMinTime int64 //最小时间,小于0的处理一次
  23. name string
  24. thread int //线程数
  25. //查找锁
  26. findLock sync.Mutex
  27. wg sync.WaitGroup
  28. //map锁
  29. AllIdsMapLock sync.Mutex
  30. //对应的id
  31. AllIdsMap map[string]*ID
  32. //采购单位、项目名称、项目编号
  33. mapPb, mapPn, mapPc map[string]*Key
  34. //更新或新增通道
  35. updatePool chan []map[string]interface{}
  36. //表名
  37. coll string
  38. //当前状态是全量还是增量
  39. currentType string //当前是跑全量还是跑增量
  40. //
  41. clearContimes int
  42. //当前时间
  43. currentTime int64
  44. //保存长度
  45. saveSize int
  46. }
  47. func NewPT() *ProjectTask {
  48. return &ProjectTask{
  49. InitMinTime: int64(1325347200),
  50. name: "全/增量对象",
  51. thread: 3,
  52. updatePool: make(chan []map[string]interface{}, 2000),
  53. wg: sync.WaitGroup{},
  54. AllIdsMap: make(map[string]*ID, 5000000),
  55. mapPb: make(map[string]*Key, 5000000),
  56. mapPn: make(map[string]*Key, 5000000),
  57. mapPc: make(map[string]*Key, 5000000),
  58. saveSize: 200,
  59. coll: ProjectColl,
  60. }
  61. }
  62. var P_QL *ProjectTask
  63. //初始化全量合并对象
  64. func init() {
  65. P_QL = NewPT()
  66. go P_QL.updateQueue()
  67. go P_QL.clearMem()
  68. }
  69. //项目保存和更新通道
  70. func (p *ProjectTask) updateQueue() {
  71. arr := make([][]map[string]interface{}, p.saveSize)
  72. index := 0
  73. for {
  74. select {
  75. case v := <-p.updatePool:
  76. arr[index] = v
  77. index++
  78. if index == p.saveSize {
  79. MongoTool.UpSertBulk(p.coll, arr...)
  80. arr = make([][]map[string]interface{}, p.saveSize)
  81. index = 0
  82. }
  83. case <-time.After(2 * time.Second):
  84. if index > 0 {
  85. MongoTool.UpSertBulk(p.coll, arr[:index]...)
  86. arr = make([][]map[string]interface{}, p.saveSize)
  87. index = 0
  88. }
  89. }
  90. }
  91. }
  92. //项目合并内存更新
  93. func (p *ProjectTask) clearMem() {
  94. c := cron.New()
  95. //在内存中保留最近6个月的信息
  96. validTime := int64(6 * 30 * 86400)
  97. //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
  98. c.AddFunc("50 0/4 * * * *", func() {
  99. if p.currentType == "ql" || p.clearContimes >= 100 {
  100. //跳过的次数清零
  101. p.clearContimes = 0
  102. //信息进入查找对比全局锁
  103. p.findLock.Lock()
  104. defer p.findLock.Unlock()
  105. //合并进行的任务都完成
  106. p.wg.Wait()
  107. //遍历id
  108. //所有内存中的项目信息
  109. p.AllIdsMapLock.Lock()
  110. defer p.AllIdsMapLock.Unlock()
  111. //清除计数
  112. clearNum := 0
  113. for k, v := range p.AllIdsMap {
  114. if p.currentTime-v.P.LastTime > validTime {
  115. clearNum++
  116. //删除id的map
  117. delete(p.AllIdsMap, k)
  118. //删除pb
  119. if v.P.Buyer != "" {
  120. ids := p.mapPb[v.P.Buyer]
  121. if ids != nil {
  122. ids.Lock.Lock()
  123. ids.Arr = deleteSlice(ids.Arr, k)
  124. if len(ids.Arr) == 0 {
  125. delete(p.mapPb, v.P.Buyer)
  126. }
  127. ids.Lock.Unlock()
  128. }
  129. }
  130. //删除mapPn
  131. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  132. if vn != "" {
  133. ids := p.mapPn[vn]
  134. if ids != nil {
  135. ids.Lock.Lock()
  136. ids.Arr = deleteSlice(ids.Arr, k)
  137. if len(ids.Arr) == 0 {
  138. delete(p.mapPn, vn)
  139. }
  140. ids.Lock.Unlock()
  141. }
  142. }
  143. }
  144. //删除mapPc
  145. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  146. if vn != "" {
  147. ids := p.mapPc[vn]
  148. if ids != nil {
  149. ids.Lock.Lock()
  150. ids.Arr = deleteSlice(ids.Arr, k)
  151. if len(ids.Arr) == 0 {
  152. delete(p.mapPc, vn)
  153. }
  154. ids.Lock.Unlock()
  155. }
  156. }
  157. }
  158. v = nil
  159. }
  160. }
  161. log.Println("清除完成:", clearNum, len(p.AllIdsMap))
  162. } else {
  163. p.clearContimes++
  164. }
  165. })
  166. c.Start()
  167. select {}
  168. }
  169. //全量合并
  170. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  171. defer util.Catch()
  172. //1、检查pubilshtime索引
  173. db, _ := udpInfo["db"].(string)
  174. if db == "" {
  175. db = MongoTool.DbName
  176. }
  177. coll, _ := udpInfo["coll"].(string)
  178. if coll == "" {
  179. coll = ExtractColl
  180. }
  181. // sess := MongoTool.GetMgoConn()
  182. // bcon := false
  183. // if sess.DB(db).C(coll).EnsureIndexKey("publishtime_1", "publishtime_-1") == nil {
  184. // bcon = true
  185. // } else {
  186. // log.Println("publishtime_1索引不存在")
  187. // }
  188. // MongoTool.DestoryMongoConn(sess)
  189. thread := util.IntAllDef(udpInfo["thread"], 3)
  190. if thread > 0 {
  191. p.thread = thread
  192. }
  193. bcon := true
  194. if bcon {
  195. //生成查询语句执行
  196. p.enter(db, coll, map[string]interface{}{})
  197. }
  198. }
  199. //增量合并
  200. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  201. defer util.Catch()
  202. //1、检查pubilshtime索引
  203. db, _ := udpInfo["db"].(string)
  204. if db == "" {
  205. db = MongoTool.DbName
  206. }
  207. coll, _ := udpInfo["coll"].(string)
  208. if coll == "" {
  209. coll = ExtractColl
  210. }
  211. thread := util.IntAllDef(udpInfo["thread"], 3)
  212. if thread > 0 {
  213. p.thread = thread
  214. }
  215. //开始id和结束id
  216. q, _ := udpInfo["query"].(map[string]interface{})
  217. gtid := udpInfo["gtid"].(string)
  218. lteid := udpInfo["lteid"].(string)
  219. if q == nil {
  220. q = map[string]interface{}{
  221. "_id": map[string]interface{}{
  222. "$gt": util.StringTOBsonId(gtid), //util.StringTOBsonId(udpInfo["gtid"].(string)),
  223. "$lte": util.StringTOBsonId(lteid), //util.StringTOBsonId(udpInfo["lteid"].(string)),
  224. },
  225. }
  226. }
  227. pici := time.Now().Unix()
  228. if q != nil {
  229. //生成查询语句执行
  230. p.enter(db, coll, q)
  231. }
  232. nextNode(gtid, lteid, "project", pici)
  233. }
  234. //通知下个节点nextNode
  235. func nextNode(gtid, lteid, stype string, pici int64) {
  236. by, _ := json.Marshal(map[string]interface{}{
  237. "gtid": gtid,
  238. "lteid": lteid,
  239. "stype": stype,
  240. "query": map[string]interface{}{
  241. "pici": pici,
  242. },
  243. })
  244. log.Println("nextnode", string(by))
  245. for _, v := range NextNode {
  246. if node, ok := v.(map[string]interface{}); ok {
  247. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  248. IP: net.ParseIP(node["addr"].(string)),
  249. Port: util.IntAll(node["port"]),
  250. })
  251. }
  252. }
  253. }
  254. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  255. defer util.Catch()
  256. sess := MongoTool.GetMgoConn()
  257. defer MongoTool.DestoryMongoConn(sess)
  258. query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  259. pool := make(chan bool, p.thread)
  260. count := 0
  261. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  262. info := ParseInfo(tmp)
  263. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  264. pool <- true
  265. go func(info *Info, tmp map[string]interface{}) {
  266. defer func() {
  267. p.currentTime = info.Publishtime
  268. <-pool
  269. }()
  270. p.startProjectMerge(info, tmp)
  271. }(info, tmp)
  272. } else {
  273. //信息错误,进行更新
  274. }
  275. if count%1000 == 0 {
  276. log.Println("current", count)
  277. }
  278. tmp = make(map[string]interface{})
  279. }
  280. //阻塞
  281. for n := 0; n < p.thread; n++ {
  282. pool <- true
  283. }
  284. log.Println("所有线程执行完成...", count)
  285. }
  286. var (
  287. //从标题获取项目编号
  288. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  289. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  290. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  291. //项目编号过滤
  292. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  293. //项目编号只是数字或只是字母4个以下
  294. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  295. //纯数字或纯字母
  296. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  297. )
  298. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  299. bys, _ := json.Marshal(tmp)
  300. var thisinfo *Info
  301. json.Unmarshal(bys, &thisinfo)
  302. if thisinfo == nil {
  303. return nil
  304. }
  305. if len(thisinfo.Topscopeclass) == 0 {
  306. thisinfo.Topscopeclass = []string{}
  307. }
  308. if len(thisinfo.Subscopeclass) == 0 {
  309. thisinfo.Subscopeclass = []string{}
  310. }
  311. //从标题中查找项目编号
  312. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  313. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  314. thisinfo.PTC = res[1]
  315. } else {
  316. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  317. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  318. thisinfo.PTC = res[3]
  319. } else {
  320. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  321. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  322. thisinfo.PTC = res[1]
  323. }
  324. }
  325. }
  326. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  327. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  328. if thisinfo.ProjectName != "" {
  329. thisinfo.pnbval++
  330. }
  331. }
  332. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  333. if thisinfo.ProjectCode != "" {
  334. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  335. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  336. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  337. }
  338. } else {
  339. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  340. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  341. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  342. }
  343. }
  344. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  345. thisinfo.pnbval++
  346. }
  347. }
  348. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  349. thisinfo.PTC = ""
  350. }
  351. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  352. thisinfo.pnbval++
  353. } else {
  354. thisinfo.Buyer = ""
  355. }
  356. //winners整理
  357. winner, _ := tmp["winner"].(string)
  358. m1 := map[string]bool{}
  359. winners := []string{}
  360. if winner != "" {
  361. m1[winner] = true
  362. winners = append(winners, winner)
  363. }
  364. if thisinfo.HasPackage {
  365. packageM, _ := tmp["package"].(map[string]interface{})
  366. for _, p := range packageM {
  367. pm, _ := p.(map[string]interface{})
  368. pw, _ := pm["winner"].(string)
  369. if pw != "" {
  370. m1[pw] = true
  371. winners = append(winners, pw)
  372. }
  373. }
  374. }
  375. thisinfo.Winners = winners
  376. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  377. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  378. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  379. return thisinfo
  380. }
  381. //从数组中删除元素
  382. func deleteSlice(arr []string, v string) []string {
  383. for k, v1 := range arr {
  384. if v1 == v {
  385. return append(arr[:k], arr[k+1:]...)
  386. }
  387. }
  388. return arr
  389. }