task.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. "qfw/util"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. //"strings"
  10. "time"
  11. "gopkg.in/mgo.v2/bson"
  12. )
  13. const (
  14. InitMinTime = int64(1325347200) //最小时间位置2012
  15. )
  16. //全量合并
  17. func taskQl(udpInfo map[string]interface{}) {
  18. defer util.Catch()
  19. //1、检查pubilshtime索引
  20. db, _ := udpInfo["db"].(string)
  21. if db == "" {
  22. db = MongoTool.DbName
  23. }
  24. coll, _ := udpInfo["coll"].(string)
  25. if coll == "" {
  26. coll = ExtractColl
  27. }
  28. sess := MongoTool.GetMgoConn()
  29. bcon := false
  30. if sess.DB(db).C(coll).EnsureIndexKey("publishtime_1", "publishtime_-1") == nil {
  31. bcon = true
  32. } else {
  33. log.Println("publishtime_1索引不存在")
  34. }
  35. MongoTool.DestoryMongoConn(sess)
  36. thread := util.IntAllDef(udpInfo["thread"], 1)
  37. if bcon {
  38. //go SaveQueue()
  39. go updateQueue()
  40. go clearMem()
  41. //获取起始时间
  42. startTime, END := int64(0), int64(0)
  43. sts, bres := MongoTool.Find(ExtractColl, `{}`, "publishtime", `{"publishtime":1}`, true, 0, 1)
  44. if bres && sts != nil && len(*sts) == 1 {
  45. startTime = util.Int64All((*sts)[0]["publishtime"])
  46. sts, bres = MongoTool.Find(ExtractColl, `{}`, "-publishtime", `{"publishtime":1}`, true, 0, 1)
  47. if bres && sts != nil && len(*sts) == 1 {
  48. END = util.Int64All((*sts)[0]["publishtime"])
  49. }
  50. log.Println("查询到的起始时间", startTime, END)
  51. } else {
  52. return
  53. }
  54. startTime -= 1
  55. sum := 0
  56. if startTime < InitMinTime {
  57. q := map[string]interface{}{
  58. "publishtime": map[string]interface{}{
  59. "$gt": startTime,
  60. "$lte": InitMinTime,
  61. },
  62. }
  63. sum = Mql(q, thread, db, coll, sum)
  64. startTime = InitMinTime
  65. }
  66. for {
  67. if startTime >= END {
  68. break
  69. }
  70. et := startTime + 50*86400
  71. if et >= END {
  72. et = END
  73. }
  74. q := map[string]interface{}{
  75. "publishtime": map[string]interface{}{
  76. "$gt": startTime,
  77. "$lte": et,
  78. },
  79. }
  80. sum = Mql(q, thread, db, coll, sum)
  81. startTime = et
  82. time.Sleep(1 * time.Second)
  83. }
  84. }
  85. log.Println("task over!!!")
  86. }
  87. var wg = sync.WaitGroup{}
  88. func Ids(udpInfo map[string]interface{}) {
  89. oid := []interface{}{}
  90. n1, _ := udpInfo["ids"].(string)
  91. idArr := strings.Split(n1, ",")
  92. for _, v := range idArr {
  93. oid = append(oid, util.StringTOBsonId(v))
  94. }
  95. thread := util.IntAllDef(udpInfo["thread"], 1)
  96. q := bson.M{"_id": bson.M{"$in": oid}}
  97. go updateQueue()
  98. go clearMem()
  99. Mql(q, thread, MongoTool.DbName, ExtractColl, 0)
  100. }
  101. func Mql(q map[string]interface{}, thread int, db, coll string, sum int) int {
  102. defer util.Catch()
  103. sess := MongoTool.GetMgoConn()
  104. defer MongoTool.DestoryMongoConn(sess)
  105. query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
  106. pool := make(chan bool, thread)
  107. count := 0
  108. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  109. info := ParseInfo(tmp)
  110. if info != nil && !((info.pnbval == 1 && info.Buyer != "") || info.pnbval == 0) {
  111. pool <- true
  112. go func(info *Info, tmp map[string]interface{}) {
  113. defer func() {
  114. currentTime = info.Publishtime
  115. <-pool
  116. }()
  117. startProjectMerge(info, tmp)
  118. }(info, tmp)
  119. } else {
  120. //log.Println("info err:", tmp["_id"], tmp["title"], tmp["buyer"])
  121. }
  122. if sum%1000 == 0 {
  123. log.Println("current", sum)
  124. }
  125. sum++
  126. tmp = make(map[string]interface{})
  127. }
  128. //阻塞
  129. for n := 0; n < thread; n++ {
  130. pool <- true
  131. }
  132. //完成
  133. log.Println("sontask over:", count, sum, q)
  134. return sum
  135. }
  136. var (
  137. titleGetPc = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
  138. titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
  139. titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
  140. pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+")
  141. StrOrNum = regexp.MustCompile("^[0-9_-]{1,4}$|^[a-zA-Z_-]{1,4}$")
  142. StrOrNum2 = regexp.MustCompile("^[0-9_-]+$|^[a-zA-Z_-]+$")
  143. )
  144. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  145. bys, _ := json.Marshal(tmp)
  146. var thisinfo *Info
  147. json.Unmarshal(bys, &thisinfo)
  148. if thisinfo == nil {
  149. return nil
  150. }
  151. if len(thisinfo.Topscopeclass) == 0 {
  152. thisinfo.Topscopeclass = []string{}
  153. }
  154. if len(thisinfo.Subscopeclass) == 0 {
  155. thisinfo.Subscopeclass = []string{}
  156. }
  157. //从标题中查找项目编号
  158. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  159. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  160. thisinfo.PTC = res[1]
  161. } else {
  162. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  163. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  164. thisinfo.PTC = res[3]
  165. } else {
  166. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  167. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  168. thisinfo.PTC = res[1]
  169. }
  170. }
  171. }
  172. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  173. thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  174. if thisinfo.ProjectName != "" {
  175. thisinfo.pnbval++
  176. }
  177. }
  178. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  179. if thisinfo.ProjectCode != "" {
  180. thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  181. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  182. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  183. }
  184. } else {
  185. thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  186. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  187. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  188. }
  189. }
  190. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  191. thisinfo.pnbval++
  192. }
  193. }
  194. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  195. thisinfo.PTC = ""
  196. }
  197. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  198. thisinfo.pnbval++
  199. } else {
  200. thisinfo.Buyer = ""
  201. }
  202. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  203. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  204. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  205. return thisinfo
  206. }