task.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "mongodb"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. /**
  14. 任务入口
  15. 全量、增量合并
  16. 更新、插入,内存清理
  17. 转换成info对象
  18. **/
  19. var PreRegexp = map[string][]*regexp.Regexp{}
  20. var BackRegexp = map[string][]*regexp.Regexp{}
  21. var BackRepRegexp = map[string][]RegexpInfo{}
  22. var BlackRegexp = map[string][]*regexp.Regexp{}
  23. var (
  24. //从标题获取项目编号
  25. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  26. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  27. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  28. //对标题、项目名称等中英文符号、空格等进行处理
  29. filterReg = regexp.MustCompile("[`~!@#$^&*()=|{}':;,\\[\\].<>/?!¥…()—【】‘;:”“。,、?%+_-]")
  30. //项目编号过滤
  31. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  32. //项目编号只是数字或只是字母4个以下
  33. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  34. //纯数字或纯字母
  35. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  36. //含分包词,招标未识别分包 合并到一个项目
  37. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  38. )
  39. type RegexpInfo struct {
  40. regs *regexp.Regexp
  41. repstr string
  42. }
  43. func NewPT() *ProjectTask {
  44. p := &ProjectTask{
  45. InitMinTime: int64(1325347200),
  46. name: "全/增量对象",
  47. thread: Thread,
  48. updatePool: make(chan []map[string]interface{}, 5000),
  49. //savePool: make(chan map[string]interface{}, 2000),
  50. wg: sync.WaitGroup{},
  51. AllIdsMap: make(map[string]*ID, 5000000),
  52. mapPb: make(map[string]*Key, 1500000),
  53. mapPn: make(map[string]*Key, 5000000),
  54. mapPc: make(map[string]*Key, 5000000),
  55. saveSize: 100,
  56. //saveSign: make(chan bool, 1),
  57. //updateSign: make(chan bool, 1),
  58. coll: ProjectColl,
  59. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  60. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  61. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  62. }
  63. return p
  64. }
  65. var P_QL *ProjectTask
  66. var sp = make(chan bool, 5)
  67. func (p *ProjectTask) updateAllQueue() {
  68. arru := make([][]map[string]interface{}, p.saveSize)
  69. indexu := 0
  70. for {
  71. select {
  72. case v := <-p.updatePool:
  73. arru[indexu] = v
  74. indexu++
  75. if indexu == p.saveSize {
  76. sp <- true
  77. go func(arru [][]map[string]interface{}) {
  78. defer func() {
  79. <-sp
  80. }()
  81. MongoTool.UpSertBulk(p.coll, arru...)
  82. }(arru)
  83. arru = make([][]map[string]interface{}, p.saveSize)
  84. indexu = 0
  85. }
  86. case <-time.After(1000 * time.Millisecond):
  87. if indexu > 0 {
  88. sp <- true
  89. go func(arru [][]map[string]interface{}) {
  90. defer func() {
  91. <-sp
  92. }()
  93. MongoTool.UpSertBulk(p.coll, arru...)
  94. }(arru[:indexu])
  95. arru = make([][]map[string]interface{}, p.saveSize)
  96. indexu = 0
  97. }
  98. }
  99. }
  100. }
  101. //全量合并
  102. func (p *ProjectTask) taskQl() {
  103. defer util.Catch()
  104. p.thread = util.IntAllDef(Thread, 4)
  105. p.enter(MongoTool.DbName, ExtractColl, nil)
  106. }
  107. //增量合并
  108. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  109. defer util.Catch()
  110. //1、检查pubilshtime索引
  111. db, _ := udpInfo["db"].(string)
  112. if db == "" {
  113. db = MongoTool.DbName
  114. }
  115. coll, _ := udpInfo["coll"].(string)
  116. if coll == "" {
  117. coll = ExtractColl
  118. }
  119. thread := util.IntAllDef(Thread, 4)
  120. if thread > 0 {
  121. p.thread = thread
  122. }
  123. //开始id和结束id
  124. q, _ := udpInfo["query"].(map[string]interface{})
  125. gtid := udpInfo["gtid"].(string)
  126. lteid := udpInfo["lteid"].(string)
  127. q = map[string]interface{}{
  128. "_id": map[string]interface{}{
  129. "$gt": mongodb.StringTOBsonId(gtid),
  130. "$lte": mongodb.StringTOBsonId(lteid),
  131. },
  132. }
  133. p.enter(db, coll, q)
  134. if udpInfo["stop"] == nil {
  135. for i := 0; i < 5; i++ {
  136. sp <- true
  137. }
  138. for i := 0; i < 5; i++ {
  139. <-sp
  140. }
  141. log.Println("保存完成", p.pici)
  142. }
  143. }
  144. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  145. defer util.Catch()
  146. defer func() {
  147. p.Brun = false
  148. }()
  149. p.Brun = true
  150. count := 0
  151. //q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 0}}
  152. util.Debug("start project", q, p.pici)
  153. sess := MongoTool.GetMgoConn()
  154. defer MongoTool.DestoryMongoConn(sess)
  155. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  156. ms := sess.DB(db).C(coll).Find(nil).Select(fields).Sort("publishtime")
  157. query := ms.Iter()
  158. var lastid interface{}
  159. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  160. if count%2000 == 0 {
  161. util.Debug("current ---", count, lastid)
  162. }
  163. if util.ObjToString(tmp["useable"]) == "0" {
  164. continue
  165. }
  166. lastid = tmp["_id"]
  167. info := ParseInfo(tmp)
  168. p.currentTime = info.Publishtime
  169. p.startProjectMerge(info, tmp)
  170. }
  171. log.Println("over...", count)
  172. }
  173. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  174. bys, _ := json.Marshal(tmp)
  175. var thisinfo *Info
  176. _ = json.Unmarshal(bys, &thisinfo)
  177. if thisinfo == nil {
  178. return nil
  179. }
  180. thisinfo.Bidamount = util.Float64All(tmp["bidamount"])
  181. thisinfo.Budget = util.Float64All(tmp["budget"])
  182. if tmp["pt_modify"] != nil {
  183. thisinfo.Publishtime = util.Int64All(tmp["pt_modify"])
  184. tmp["publishtime"] = tmp["pt_modify"]
  185. }
  186. // 处理publishtime为空
  187. if thisinfo.Publishtime <= 0 {
  188. for _, d := range DateTimeSelect {
  189. if tmp[d] != nil {
  190. thisinfo.Publishtime = util.Int64All(tmp[d])
  191. tmp["publishtime"] = tmp[d]
  192. }
  193. }
  194. }
  195. if len(thisinfo.Topscopeclass) == 0 {
  196. thisinfo.Topscopeclass = []string{}
  197. }
  198. if len(thisinfo.Subscopeclass) == 0 {
  199. thisinfo.Subscopeclass = []string{}
  200. }
  201. if thisinfo.SubType == "" {
  202. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  203. }
  204. // 项目名称、项目标题过滤
  205. thisinfo.ProjectName = filterReg.ReplaceAllString(thisinfo.ProjectName, "")
  206. thisinfo.ProjectCode = filterReg.ReplaceAllString(thisinfo.ProjectCode, "")
  207. //从标题中查找项目编号
  208. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  209. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  210. thisinfo.PTC = res[1]
  211. } else {
  212. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  213. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  214. thisinfo.PTC = res[3]
  215. } else {
  216. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  217. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  218. thisinfo.PTC = res[1]
  219. }
  220. }
  221. }
  222. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  223. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  224. //if thisinfo.ProjectName != "" {
  225. thisinfo.pnbval++
  226. //}
  227. }
  228. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  229. if thisinfo.ProjectCode != "" {
  230. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  231. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  232. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  233. }
  234. } else {
  235. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  236. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  237. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  238. }
  239. }
  240. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  241. thisinfo.pnbval++
  242. }
  243. }
  244. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  245. thisinfo.PTC = ""
  246. }
  247. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  248. thisinfo.pnbval++
  249. } else {
  250. thisinfo.Buyer = ""
  251. }
  252. //清理评审专家名单
  253. if len(thisinfo.ReviewExperts) > 0 {
  254. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  255. }
  256. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  257. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  258. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  259. //winner := util.ObjToString(tmp["winner"])
  260. //winners := []string{}
  261. //m1 := map[string]bool{}
  262. //if winner != "" {
  263. // m1[winner] = true
  264. // winners = append(winners, winner)
  265. //}
  266. //packageM, _ := tmp["package"].(map[string]interface{})
  267. //if packageM != nil {
  268. // thisinfo.HasPackage = true
  269. // for _, p := range packageM {
  270. // pm, _ := p.(map[string]interface{})
  271. // if pw := util.ObjToString(pm["winner"]); pw != "" && !m1[pw] {
  272. // m1[pw] = true
  273. // winners = append(winners, pw)
  274. // }
  275. // }
  276. //}
  277. //winners整理
  278. if util.ObjToString(tmp["s_winner"]) != "" {
  279. thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  280. }
  281. //thisinfo.Winners = winners
  282. //处理分包中数据异常问题
  283. for k, tmp := range thisinfo.Package {
  284. if ps, ok := tmp.([]map[string]interface{}); ok {
  285. for i, p := range ps {
  286. name, _ := p["name"].(string)
  287. if len([]rune(name)) > 100 {
  288. p["name"] = fmt.Sprint([]rune(name[:100]))
  289. }
  290. ps[i] = p
  291. }
  292. thisinfo.Package[k] = ps
  293. }
  294. }
  295. return thisinfo
  296. }
  297. //从数组中删除元素
  298. func deleteSlice(arr []string, v, stype string) []string {
  299. for k, v1 := range arr {
  300. if v1 == v {
  301. ts := time.Now().Unix()
  302. arr = append(arr[:k], arr[k+1:]...)
  303. rt := time.Now().Unix() - ts
  304. if rt > 0 {
  305. log.Println("deleteSlice", stype, rt, v, len(arr))
  306. }
  307. return arr
  308. }
  309. }
  310. return arr
  311. }
  312. //校验评审专家
  313. func ClearRp(tmp []string) []string {
  314. arrTmp := []string{}
  315. for _, v := range tmp {
  316. // 汉字过滤(全汉字,2-4个字)
  317. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  318. continue
  319. }
  320. arrTmp = append(arrTmp, v)
  321. }
  322. return arrTmp
  323. }