task.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "mongodb"
  7. "qfw/util"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. /**
  14. 任务入口
  15. 全量、增量合并
  16. 更新、插入,内存清理
  17. 转换成info对象
  18. **/
  19. var PreRegexp = map[string][]*regexp.Regexp{}
  20. var BackRegexp = map[string][]*regexp.Regexp{}
  21. var BackRepRegexp = map[string][]RegexpInfo{}
  22. var BlackRegexp = map[string][]*regexp.Regexp{}
  23. var (
  24. //从标题获取项目编号
  25. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  26. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  27. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  28. //项目编号过滤
  29. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
  30. //项目编号只是数字或只是字母4个以下
  31. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  32. //纯数字或纯字母
  33. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  34. //含分包词,招标未识别分包 合并到一个项目
  35. KeyPackage = regexp.MustCompile("[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}(包|段)|(包|段)[0-9a-zA-Z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+.{0,2}")
  36. )
  37. type RegexpInfo struct {
  38. regs *regexp.Regexp
  39. repstr string
  40. }
  41. func NewPT() *ProjectTask {
  42. p := &ProjectTask{
  43. InitMinTime: int64(1325347200),
  44. name: "全/增量对象",
  45. thread: Thread,
  46. updatePool: make(chan []map[string]interface{}, 5000),
  47. //savePool: make(chan map[string]interface{}, 2000),
  48. wg: sync.WaitGroup{},
  49. AllIdsMap: make(map[string]*ID, 5000000),
  50. mapPb: make(map[string]*Key, 1500000),
  51. mapPn: make(map[string]*Key, 5000000),
  52. mapPc: make(map[string]*Key, 5000000),
  53. saveSize: 100,
  54. //saveSign: make(chan bool, 1),
  55. //updateSign: make(chan bool, 1),
  56. coll: ProjectColl,
  57. validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
  58. statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
  59. jgTime: int64(util.IntAllDef(7, 7) * 86400),
  60. }
  61. return p
  62. }
  63. var P_QL *ProjectTask
  64. var sp = make(chan bool, 5)
  65. func (p *ProjectTask) updateAllQueue() {
  66. arru := make([][]map[string]interface{}, p.saveSize)
  67. indexu := 0
  68. for {
  69. select {
  70. case v := <-p.updatePool:
  71. arru[indexu] = v
  72. indexu++
  73. if indexu == p.saveSize {
  74. sp <- true
  75. go func(arru [][]map[string]interface{}) {
  76. defer func() {
  77. <-sp
  78. }()
  79. MongoTool.UpSertBulk(p.coll, arru...)
  80. }(arru)
  81. arru = make([][]map[string]interface{}, p.saveSize)
  82. indexu = 0
  83. }
  84. case <-time.After(1000 * time.Millisecond):
  85. if indexu > 0 {
  86. sp <- true
  87. go func(arru [][]map[string]interface{}) {
  88. defer func() {
  89. <-sp
  90. }()
  91. MongoTool.UpSertBulk(p.coll, arru...)
  92. }(arru[:indexu])
  93. arru = make([][]map[string]interface{}, p.saveSize)
  94. indexu = 0
  95. }
  96. }
  97. }
  98. }
  99. //全量合并
  100. func (p *ProjectTask) taskQl() {
  101. defer util.Catch()
  102. p.thread = util.IntAllDef(Thread, 4)
  103. p.enter(MongoTool.DbName, ExtractColl, nil)
  104. }
  105. //增量合并
  106. func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
  107. defer util.Catch()
  108. //1、检查pubilshtime索引
  109. db, _ := udpInfo["db"].(string)
  110. if db == "" {
  111. db = MongoTool.DbName
  112. }
  113. coll, _ := udpInfo["coll"].(string)
  114. if coll == "" {
  115. coll = ExtractColl
  116. }
  117. thread := util.IntAllDef(Thread, 4)
  118. if thread > 0 {
  119. p.thread = thread
  120. }
  121. //开始id和结束id
  122. q, _ := udpInfo["query"].(map[string]interface{})
  123. gtid := udpInfo["gtid"].(string)
  124. lteid := udpInfo["lteid"].(string)
  125. q = map[string]interface{}{
  126. "_id": map[string]interface{}{
  127. "$gt": mongodb.StringTOBsonId(gtid),
  128. "$lte": mongodb.StringTOBsonId(lteid),
  129. },
  130. }
  131. p.enter(db, coll, q)
  132. if udpInfo["stop"] == nil {
  133. for i := 0; i < 5; i++ {
  134. sp <- true
  135. }
  136. for i := 0; i < 5; i++ {
  137. <-sp
  138. }
  139. log.Println("保存完成", p.pici)
  140. }
  141. }
  142. func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
  143. defer util.Catch()
  144. defer func() {
  145. p.Brun = false
  146. }()
  147. p.Brun = true
  148. count := 0
  149. //q = map[string]interface{}{"updatetime": map[string]interface{}{"$gt": 0}}
  150. util.Debug("start project", q, p.pici)
  151. sess := MongoTool.GetMgoConn()
  152. defer MongoTool.DestoryMongoConn(sess)
  153. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
  154. ms := sess.DB(db).C(coll).Find(nil).Select(fields).Sort("publishtime")
  155. query := ms.Iter()
  156. var lastid interface{}
  157. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  158. if count%2000 == 0 {
  159. util.Debug("current ---", count, lastid)
  160. }
  161. if util.ObjToString(tmp["useable"]) == "0" {
  162. continue
  163. }
  164. lastid = tmp["_id"]
  165. info := ParseInfo(tmp)
  166. p.currentTime = info.Publishtime
  167. p.startProjectMerge(info, tmp)
  168. }
  169. log.Println("over...", count)
  170. }
  171. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  172. bys, _ := json.Marshal(tmp)
  173. var thisinfo *Info
  174. _ = json.Unmarshal(bys, &thisinfo)
  175. if thisinfo == nil {
  176. return nil
  177. }
  178. if len(thisinfo.Topscopeclass) == 0 {
  179. thisinfo.Topscopeclass = []string{}
  180. }
  181. if len(thisinfo.Subscopeclass) == 0 {
  182. thisinfo.Subscopeclass = []string{}
  183. }
  184. if thisinfo.SubType == "" {
  185. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  186. }
  187. //从标题中查找项目编号
  188. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  189. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  190. thisinfo.PTC = res[1]
  191. } else {
  192. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  193. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  194. thisinfo.PTC = res[3]
  195. } else {
  196. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  197. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  198. thisinfo.PTC = res[1]
  199. }
  200. }
  201. }
  202. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  203. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  204. //if thisinfo.ProjectName != "" {
  205. thisinfo.pnbval++
  206. //}
  207. }
  208. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  209. if thisinfo.ProjectCode != "" {
  210. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  211. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  212. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  213. }
  214. } else {
  215. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  216. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  217. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  218. }
  219. }
  220. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  221. thisinfo.pnbval++
  222. }
  223. }
  224. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  225. thisinfo.PTC = ""
  226. }
  227. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  228. thisinfo.pnbval++
  229. } else {
  230. thisinfo.Buyer = ""
  231. }
  232. //清理评审专家名单
  233. if len(thisinfo.ReviewExperts) > 0 {
  234. thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
  235. }
  236. //winners整理
  237. thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",")
  238. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  239. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  240. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  241. winner := util.ObjToString(tmp["winner"])
  242. winners := []string{}
  243. m1 := map[string]bool{}
  244. if winner != "" {
  245. m1[winner] = true
  246. winners = append(winners, winner)
  247. }
  248. packageM, _ := tmp["package"].(map[string]interface{})
  249. if packageM != nil {
  250. thisinfo.HasPackage = true
  251. for _, p := range packageM {
  252. pm, _ := p.(map[string]interface{})
  253. if pw := util.ObjToString(pm["winner"]); pw != "" && !m1[pw] {
  254. m1[pw] = true
  255. winners = append(winners, pw)
  256. }
  257. }
  258. }
  259. thisinfo.Winners = winners
  260. //处理分包中数据异常问题
  261. for k, tmp := range thisinfo.Package {
  262. if ps, ok := tmp.([]map[string]interface{}); ok {
  263. for i, p := range ps {
  264. name, _ := p["name"].(string)
  265. if len([]rune(name)) > 100 {
  266. p["name"] = fmt.Sprint([]rune(name[:100]))
  267. }
  268. ps[i] = p
  269. }
  270. thisinfo.Package[k] = ps
  271. }
  272. }
  273. return thisinfo
  274. }
  275. //从数组中删除元素
  276. func deleteSlice(arr []string, v, stype string) []string {
  277. for k, v1 := range arr {
  278. if v1 == v {
  279. ts := time.Now().Unix()
  280. arr = append(arr[:k], arr[k+1:]...)
  281. rt := time.Now().Unix() - ts
  282. if rt > 0 {
  283. log.Println("deleteSlice", stype, rt, v, len(arr))
  284. }
  285. return arr
  286. }
  287. }
  288. return arr
  289. }
  290. //校验评审专家
  291. func ClearRp(tmp []string) []string {
  292. arrTmp := []string{}
  293. for _, v := range tmp {
  294. // 汉字过滤(全汉字,2-4个字)
  295. if ok, _ := regexp.MatchString("^[\\p{Han}]{2,4}$", v); !ok {
  296. continue
  297. }
  298. arrTmp = append(arrTmp, v)
  299. }
  300. return arrTmp
  301. }