task.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "qfw/util"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. //"strings"
  10. "time"
  11. "github.com/robfig/cron"
  12. )
  13. /**
  14. 任务入口
  15. 全量、增量合并
  16. 更新、插入,内存清理
  17. 转换成info对象
  18. **/
  19. //项目合并对象
  20. type ProjectTask struct {
  21. InitMinTime int64 //最小时间,小于0的处理一次
  22. name string
  23. thread int //线程数
  24. //查找锁
  25. findLock sync.Mutex
  26. wg sync.WaitGroup
  27. //map锁
  28. AllIdsMapLock sync.Mutex
  29. //对应的id
  30. AllIdsMap map[string]*ID
  31. //采购单位、项目名称、项目编号
  32. mapPb, mapPn, mapPc map[string]*Key
  33. //更新或新增通道
  34. updatePool chan []map[string]interface{}
  35. //表名
  36. coll string
  37. //当前状态是全量还是增量
  38. currentType string //当前是跑全量还是跑增量
  39. //
  40. clearContimes int
  41. //当前时间
  42. currentTime int64
  43. //保存长度
  44. saveSize int
  45. }
  46. func NewPT() *ProjectTask {
  47. return &ProjectTask{
  48. InitMinTime: int64(1325347200),
  49. name: "全/增量对象",
  50. thread: 3,
  51. updatePool: make(chan []map[string]interface{}, 2000),
  52. wg: sync.WaitGroup{},
  53. AllIdsMap: make(map[string]*ID, 5000000),
  54. mapPb: make(map[string]*Key, 5000000),
  55. mapPn: make(map[string]*Key, 5000000),
  56. mapPc: make(map[string]*Key, 5000000),
  57. saveSize: 200,
  58. coll: ProjectColl,
  59. }
  60. }
  61. var P_QL *ProjectTask
  62. //初始化全量合并对象
  63. func init() {
  64. P_QL = NewPT()
  65. go P_QL.updateQueue()
  66. go P_QL.clearMem()
  67. }
  68. //项目保存和更新通道
  69. func (p *ProjectTask) updateQueue() {
  70. arr := make([][]map[string]interface{}, p.saveSize)
  71. index := 0
  72. for {
  73. select {
  74. case v := <-p.updatePool:
  75. arr[index] = v
  76. index++
  77. if index == p.saveSize {
  78. MongoTool.UpSertBulk(p.coll, arr...)
  79. arr = make([][]map[string]interface{}, p.saveSize)
  80. index = 0
  81. }
  82. case <-time.After(2 * time.Second):
  83. if index > 0 {
  84. MongoTool.UpSertBulk(p.coll, arr[:index]...)
  85. arr = make([][]map[string]interface{}, p.saveSize)
  86. index = 0
  87. }
  88. }
  89. }
  90. }
  91. //项目合并内存更新
  92. func (p *ProjectTask) clearMem() {
  93. c := cron.New()
  94. //在内存中保留最近6个月的信息
  95. validTime := int64(6 * 30 * 86400)
  96. //跑全量时每4分钟跑一次,跑增量时400分钟跑一次
  97. c.AddFunc("50 0/4 * * * *", func() {
  98. if p.currentType == "ql" || p.clearContimes >= 100 {
  99. //跳过的次数清零
  100. p.clearContimes = 0
  101. //信息进入查找对比全局锁
  102. p.findLock.Lock()
  103. defer p.findLock.Unlock()
  104. //合并进行的任务都完成
  105. p.wg.Wait()
  106. //遍历id
  107. //所有内存中的项目信息
  108. p.AllIdsMapLock.Lock()
  109. defer p.AllIdsMapLock.Unlock()
  110. //清除计数
  111. clearNum := 0
  112. for k, v := range p.AllIdsMap {
  113. if p.currentTime-v.P.LastTime > validTime {
  114. clearNum++
  115. //删除id的map
  116. delete(p.AllIdsMap, k)
  117. //删除pb
  118. if v.P.Buyer != "" {
  119. ids := p.mapPb[v.P.Buyer]
  120. if ids != nil {
  121. ids.Lock.Lock()
  122. ids.Arr = deleteSlice(ids.Arr, k)
  123. if len(ids.Arr) == 0 {
  124. delete(p.mapPb, v.P.Buyer)
  125. }
  126. ids.Lock.Unlock()
  127. }
  128. }
  129. //删除mapPn
  130. for _, vn := range append([]string{v.P.ProjectName}, v.P.MPN...) {
  131. if vn != "" {
  132. ids := p.mapPn[vn]
  133. if ids != nil {
  134. ids.Lock.Lock()
  135. ids.Arr = deleteSlice(ids.Arr, k)
  136. if len(ids.Arr) == 0 {
  137. delete(p.mapPn, vn)
  138. }
  139. ids.Lock.Unlock()
  140. }
  141. }
  142. }
  143. //删除mapPc
  144. for _, vn := range append([]string{v.P.ProjectCode}, v.P.MPC...) {
  145. if vn != "" {
  146. ids := p.mapPc[vn]
  147. if ids != nil {
  148. ids.Lock.Lock()
  149. ids.Arr = deleteSlice(ids.Arr, k)
  150. if len(ids.Arr) == 0 {
  151. delete(p.mapPc, vn)
  152. }
  153. ids.Lock.Unlock()
  154. }
  155. }
  156. }
  157. v = nil
  158. }
  159. }
  160. log.Println("清除完成:", clearNum, len(p.AllIdsMap))
  161. } else {
  162. p.clearContimes++
  163. }
  164. })
  165. c.Start()
  166. select {}
  167. }
  168. //全量合并
  169. func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
  170. defer util.Catch()
  171. //1、检查pubilshtime索引
  172. db, _ := udpInfo["db"].(string)
  173. if db == "" {
  174. db = MongoTool.DbName
  175. }
  176. coll, _ := udpInfo["coll"].(string)
  177. if coll == "" {
  178. coll = ExtractColl
  179. }
  180. sess := MongoTool.GetMgoConn()
  181. bcon := false
  182. if sess.DB(db).C(coll).EnsureIndexKey("publishtime_1", "publishtime_-1") == nil {
  183. bcon = true
  184. } else {
  185. log.Println("publishtime_1索引不存在")
  186. }
  187. MongoTool.DestoryMongoConn(sess)
  188. thread := util.IntAllDef(udpInfo["thread"], 3)
  189. if thread > 0 {
  190. p.thread = thread
  191. }
  192. if bcon {
  193. //生成查询语句执行
  194. p.enter(db, coll, map[string]interface{}{})
  195. }
  196. }
  197. //增量合并
  198. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  199. defer util.Catch()
  200. //1、检查pubilshtime索引
  201. db, _ := udpInfo["db"].(string)
  202. if db == "" {
  203. db = MongoTool.DbName
  204. }
  205. coll, _ := udpInfo["coll"].(string)
  206. if coll == "" {
  207. coll = ExtractColl
  208. }
  209. thread := util.IntAllDef(udpInfo["thread"], 3)
  210. if thread > 0 {
  211. p.thread = thread
  212. }
  213. //开始id和结束id
  214. q, _ := udpInfo["query"].(map[string]interface{})
  215. if q == nil {
  216. q = map[string]interface{}{
  217. "_id": map[string]interface{}{
  218. "$gt": util.StringTOBsonId(udpInfo["gtid"].(string)),
  219. "$lte": util.StringTOBsonId(udpInfo["lteid"].(string)),
  220. },
  221. }
  222. }
  223. if q != nil {
  224. //生成查询语句执行
  225. p.enter(db, coll, q)
  226. }
  227. }
  228. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  229. defer util.Catch()
  230. sess := MongoTool.GetMgoConn()
  231. defer MongoTool.DestoryMongoConn(sess)
  232. query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  233. pool := make(chan bool, p.thread)
  234. count := 0
  235. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  236. info := ParseInfo(tmp)
  237. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  238. pool <- true
  239. go func(info *Info, tmp map[string]interface{}) {
  240. defer func() {
  241. p.currentTime = info.Publishtime
  242. <-pool
  243. }()
  244. p.startProjectMerge(info, tmp)
  245. }(info, tmp)
  246. } else {
  247. //信息错误,进行更新
  248. }
  249. if count%1000 == 0 {
  250. log.Println("current", count)
  251. }
  252. tmp = make(map[string]interface{})
  253. }
  254. //阻塞
  255. for n := 0; n < p.thread; n++ {
  256. pool <- true
  257. }
  258. log.Println("所有线程执行完成...", count)
  259. }
  260. var (
  261. //从标题获取项目编号
  262. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  263. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  264. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  265. //项目编号过滤
  266. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  267. //项目编号只是数字或只是字母4个以下
  268. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  269. //纯数字或纯字母
  270. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  271. )
  272. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  273. bys, _ := json.Marshal(tmp)
  274. var thisinfo *Info
  275. json.Unmarshal(bys, &thisinfo)
  276. if thisinfo == nil {
  277. return nil
  278. }
  279. if len(thisinfo.Topscopeclass) == 0 {
  280. thisinfo.Topscopeclass = []string{}
  281. }
  282. if len(thisinfo.Subscopeclass) == 0 {
  283. thisinfo.Subscopeclass = []string{}
  284. }
  285. //从标题中查找项目编号
  286. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  287. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  288. thisinfo.PTC = res[1]
  289. } else {
  290. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  291. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  292. thisinfo.PTC = res[3]
  293. } else {
  294. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  295. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  296. thisinfo.PTC = res[1]
  297. }
  298. }
  299. }
  300. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  301. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  302. if thisinfo.ProjectName != "" {
  303. thisinfo.pnbval++
  304. }
  305. }
  306. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  307. if thisinfo.ProjectCode != "" {
  308. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  309. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  310. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  311. }
  312. } else {
  313. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  314. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  315. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  316. }
  317. }
  318. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  319. thisinfo.pnbval++
  320. }
  321. }
  322. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  323. thisinfo.PTC = ""
  324. }
  325. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  326. thisinfo.pnbval++
  327. } else {
  328. thisinfo.Buyer = ""
  329. }
  330. //winners整理
  331. winner, _ := tmp["winner"].(string)
  332. m1 := map[string]bool{}
  333. winners := []string{}
  334. if winner != "" {
  335. m1[winner] = true
  336. winners = append(winners, winner)
  337. }
  338. if thisinfo.HasPackage {
  339. packageM, _ := tmp["package"].(map[string]interface{})
  340. for _, p := range packageM {
  341. pm, _ := p.(map[string]interface{})
  342. pw, _ := pm["winner"].(string)
  343. if pw != "" {
  344. m1[pw] = true
  345. winners = append(winners, pw)
  346. }
  347. }
  348. }
  349. thisinfo.Winners = winners
  350. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  351. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  352. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  353. return thisinfo
  354. }
  355. //从数组中删除元素
  356. func deleteSlice(arr []string, v string) []string {
  357. for k, v1 := range arr {
  358. if v1 == v {
  359. return append(arr[:k], arr[k+1:]...)
  360. }
  361. }
  362. return arr
  363. }