task.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/robfig/cron"
  6. "log"
  7. "mongodb"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. /**
  15. 任务入口
  16. 全量、增量合并
  17. 更新、插入,内存清理
  18. 转换成info对象
  19. **/
  20. var PreRegexp = map[string][]*regexp.Regexp{}
  21. var BackRegexp = map[string][]*regexp.Regexp{}
  22. var BackRepRegexp = map[string][]RegexpInfo{}
  23. var BlackRegexp = map[string][]*regexp.Regexp{}
  24. var (
  25. //从标题获取项目编号
  26. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  27. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  28. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  29. //项目编号过滤
  30. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  31. //项目编号只是数字或只是字母4个以下
  32. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  33. //纯数字或纯字母
  34. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  35. //含分包词,招标未识别分包 合并到一个项目
  36. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  37. )
  38. type RegexpInfo struct {
  39. regs *regexp.Regexp
  40. repstr string
  41. }
  42. func NewPT() *ProjectTask {
  43. p := &ProjectTask{
  44. InitMinTime: int64(1325347200),
  45. name: "全/增量对象",
  46. thread: Thread,
  47. updatePool: make(chan []map[string]interface{}, 5000),
  48. //savePool: make(chan map[string]interface{}, 2000),
  49. wg: sync.WaitGroup{},
  50. AllIdsMap: make(map[string]*ID, 5000000),
  51. mapPb: make(map[string]*Key, 1500000),
  52. mapPn: make(map[string]*Key, 5000000),
  53. mapPc: make(map[string]*Key, 5000000),
  54. saveSize: 100,
  55. //saveSign: make(chan bool, 1),
  56. //updateSign: make(chan bool, 1),
  57. coll: ProjectColl,
  58. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  59. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  60. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  61. }
  62. return p
  63. }
  64. var P_QL *ProjectTask
  65. var sp = make(chan bool, 5)
  66. //初始化全量合并对象
  67. func init() {
  68. P_QL = NewPT()
  69. go P_QL.updateAllQueue()
  70. //go P_QL.clearMem()
  71. }
  72. func (p *ProjectTask) updateAllQueue() {
  73. arru := make([][]map[string]interface{}, p.saveSize)
  74. indexu := 0
  75. for {
  76. select {
  77. case v := <-p.updatePool:
  78. arru[indexu] = v
  79. indexu++
  80. if indexu == p.saveSize {
  81. sp <- true
  82. go func(arru [][]map[string]interface{}) {
  83. defer func() {
  84. <-sp
  85. }()
  86. MongoTool.UpSertBulk(p.coll, arru...)
  87. }(arru)
  88. arru = make([][]map[string]interface{}, p.saveSize)
  89. indexu = 0
  90. }
  91. case <-time.After(1000 * time.Millisecond):
  92. if indexu > 0 {
  93. sp <- true
  94. go func(arru [][]map[string]interface{}) {
  95. defer func() {
  96. <-sp
  97. }()
  98. MongoTool.UpSertBulk(p.coll, arru...)
  99. }(arru[:indexu])
  100. arru = make([][]map[string]interface{}, p.saveSize)
  101. indexu = 0
  102. }
  103. }
  104. }
  105. }
  106. //项目合并内存更新
  107. func (p *ProjectTask) clearMem() {
  108. c := cron.New()
  109. //在内存中保留最近6个月的信息
  110. //跑全量时每5分钟跑一次,跑增量时400分钟跑一次
  111. _ = c.AddFunc("50 0/5 * * * *", func() {
  112. if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
  113. SingleClear = 1
  114. //跳过的次数清零
  115. p.clearContimes = 0
  116. //信息进入查找对比全局锁
  117. p.findLock.Lock()
  118. //defer p.findLock.Unlock()
  119. //合并进行的任务都完成
  120. p.wg.Wait()
  121. //遍历id
  122. //所有内存中的项目信息
  123. p.AllIdsMapLock.Lock()
  124. log.Println("清除开始")
  125. //清除计数
  126. clearNum := 0
  127. for k, v := range p.AllIdsMap {
  128. if p.currentTime-v.P.LastTime > p.validTime {
  129. clearNum++
  130. //删除id的map
  131. delete(p.AllIdsMap, k)
  132. //删除pb
  133. if v.P.Buyer != "" {
  134. ids := p.mapPb[v.P.Buyer]
  135. if ids != nil {
  136. ids.Lock.Lock()
  137. ids.Arr = deleteSlice(ids.Arr, k, "pb")
  138. if len(ids.Arr) == 0 {
  139. delete(p.mapPb, v.P.Buyer)
  140. }
  141. ids.Lock.Unlock()
  142. }
  143. }
  144. //删除mapPn
  145. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  146. if vn != "" {
  147. ids := p.mapPn[vn]
  148. if ids != nil {
  149. ids.Lock.Lock()
  150. ids.Arr = deleteSlice(ids.Arr, k, "pn")
  151. if len(ids.Arr) == 0 {
  152. delete(p.mapPn, vn)
  153. }
  154. ids.Lock.Unlock()
  155. }
  156. }
  157. }
  158. //删除mapPc
  159. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  160. if vn != "" {
  161. ids := p.mapPc[vn]
  162. if ids != nil {
  163. ids.Lock.Lock()
  164. ids.Arr = deleteSlice(ids.Arr, k, "pc")
  165. if len(ids.Arr) == 0 {
  166. delete(p.mapPc, vn)
  167. }
  168. ids.Lock.Unlock()
  169. }
  170. }
  171. }
  172. v = nil
  173. }
  174. }
  175. p.AllIdsMapLock.Unlock()
  176. p.findLock.Unlock()
  177. SingleClear = 0
  178. log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb))
  179. } else {
  180. p.clearContimes++
  181. }
  182. })
  183. c.Start()
  184. }
  185. //全量合并
  186. func (p *ProjectTask) taskQl() {
  187. defer util.Catch()
  188. p.thread = util.IntAllDef(Thread, 4)
  189. p.enter(MongoTool.DbName, ExtractColl, nil)
  190. }
  191. //增量合并
  192. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  193. defer util.Catch()
  194. //1、检查pubilshtime索引
  195. db, _ := udpInfo["db"].(string)
  196. if db == "" {
  197. db = MongoTool.DbName
  198. }
  199. coll, _ := udpInfo["coll"].(string)
  200. if coll == "" {
  201. coll = ExtractColl
  202. }
  203. thread := util.IntAllDef(Thread, 4)
  204. if thread > 0 {
  205. p.thread = thread
  206. }
  207. //开始id和结束id
  208. q, _ := udpInfo["query"].(map[string]interface{})
  209. gtid := udpInfo["gtid"].(string)
  210. lteid := udpInfo["lteid"].(string)
  211. q = map[string]interface{}{
  212. "_id": map[string]interface{}{
  213. "$gt": mongodb.StringTOBsonId(gtid),
  214. "$lte": mongodb.StringTOBsonId(lteid),
  215. },
  216. }
  217. p.enter(db, coll, q)
  218. if udpInfo["stop"] == nil {
  219. for i := 0; i < 5; i++ {
  220. sp <- true
  221. }
  222. for i := 0; i < 5; i++ {
  223. <-sp
  224. }
  225. log.Println("保存完成", p.pici)
  226. }
  227. }
  228. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  229. defer util.Catch()
  230. defer func() {
  231. p.Brun = false
  232. }()
  233. p.Brun = true
  234. count := 0
  235. q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 1643262000}}
  236. util.Debug("start project", q, p.pici)
  237. sess := MongoTool.GetMgoConn()
  238. defer MongoTool.DestoryMongoConn(sess)
  239. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  240. ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
  241. query := ms.Iter()
  242. var lastid interface{}
  243. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  244. if count%2000 == 0 {
  245. util.Debug("current ---", count, lastid)
  246. }
  247. if util.ObjToString(tmp["useable"]) == "0" {
  248. continue
  249. }
  250. lastid = tmp["_id"]
  251. info := ParseInfo(tmp)
  252. p.currentTime = info.Publishtime
  253. p.startProjectMerge(info, tmp)
  254. }
  255. log.Println("over...", count)
  256. }
  257. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  258. bys, _ := json.Marshal(tmp)
  259. var thisinfo *Info
  260. _ = json.Unmarshal(bys, &thisinfo)
  261. if thisinfo == nil {
  262. return nil
  263. }
  264. if len(thisinfo.Topscopeclass) == 0 {
  265. thisinfo.Topscopeclass = []string{}
  266. }
  267. if len(thisinfo.Subscopeclass) == 0 {
  268. thisinfo.Subscopeclass = []string{}
  269. }
  270. if thisinfo.SubType == "" {
  271. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  272. }
  273. //从标题中查找项目编号
  274. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  275. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  276. thisinfo.PTC = res[1]
  277. } else {
  278. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  279. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  280. thisinfo.PTC = res[3]
  281. } else {
  282. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  283. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  284. thisinfo.PTC = res[1]
  285. }
  286. }
  287. }
  288. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  289. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  290. //if thisinfo.ProjectName != "" {
  291. thisinfo.pnbval++
  292. //}
  293. }
  294. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  295. if thisinfo.ProjectCode != "" {
  296. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  297. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  298. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  299. }
  300. } else {
  301. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  302. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  303. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  304. }
  305. }
  306. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  307. thisinfo.pnbval++
  308. }
  309. }
  310. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  311. thisinfo.PTC = ""
  312. }
  313. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  314. thisinfo.pnbval++
  315. } else {
  316. thisinfo.Buyer = ""
  317. }
  318. //清理评审专家名单
  319. if len(thisinfo.ReviewExperts) > 0 {
  320. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  321. }
  322. //winners整理
  323. thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  324. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  325. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  326. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  327. //处理分包中数据异常问题
  328. for k, tmp := range thisinfo.Package {
  329. if ps, ok := tmp.([]map[string]interface{}); ok {
  330. for i, p := range ps {
  331. name, _ := p["name"].(string)
  332. if len([]rune(name)) > 100 {
  333. p["name"] = fmt.Sprint([]rune(name[:100]))
  334. }
  335. ps[i] = p
  336. }
  337. thisinfo.Package[k] = ps
  338. }
  339. }
  340. return thisinfo
  341. }
  342. //从数组中删除元素
  343. func deleteSlice(arr []string, v, stype string) []string {
  344. for k, v1 := range arr {
  345. if v1 == v {
  346. ts := time.Now().Unix()
  347. arr = append(arr[:k], arr[k+1:]...)
  348. rt := time.Now().Unix() - ts
  349. if rt > 0 {
  350. log.Println("deleteSlice", stype, rt, v, len(arr))
  351. }
  352. return arr
  353. }
  354. }
  355. return arr
  356. }
  357. //校验评审专家
  358. func ClearRp(tmp []string) []string {
  359. arrTmp := []string{}
  360. for _, v := range tmp {
  361. // 汉字过滤(全汉字,2-4个字)
  362. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  363. continue
  364. }
  365. arrTmp = append(arrTmp, v)
  366. }
  367. return arrTmp
  368. }