task.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package main
  2. import (
  3. "fmt"
  4. mgo "mgoutil/mongodb"
  5. qu "qfw/util"
  6. "regexp"
  7. "strings"
  8. "sync"
  9. "github.com/cron"
  10. "github.com/donnie4w/go-logger/logger"
  11. )
  12. //匹配数字类型
  13. //var Reg2 = regexp.MustCompile(`(((http|https)[::]//(www.)?|www.|WWW.)[0-9\.]{7,})[::]{0,}`)
  14. //固定后缀
  15. //var Reg2 = regexp.MustCompile(`((http|https)[::]//(www.)?|www.|WWW.)([0-9A-Za-z_]+[-\.]{0,})+\.(cn|asia|hn|citic|ltd|tv|shop|com|mo|co|net|cnpc|CN|CC|cc|pro|aero|coop|hk|tw|me|rec|arts|store|firm|int|info|org|top|wang|ren|xyz|xin|pub|tech|ink|biz|red|gov|vip|art|edu)+`)
  16. //支持空格
  17. var Reg1 = regexp.MustCompile("((http|https)[::]//(www\\.)?|www\\.|WWW\\.)([\\s\u3000\u2003\u00a0]{0,}[-A-Za-z0-9&@$??#/%=~_|.::,]+)+([\\s\u3000\u2003\u00a0]{0,}(com|cn|net))?[-A-Za-z0-9&@$??#/%=~_|.::,]+")
  18. //var Reg1 = regexp.MustCompile(`((http|https)[::]//(www\.)?|www\.|WWW\.)[-A-Za-z0-9&@$??#/%=~_|.::,]+`)
  19. var Reg2 = regexp.MustCompile("((http|https)[::]//(www\\.)?|www\\.|WWW\\.)(\\w+[-.\\s\u3000\u2003\u00a0]{0,})+")
  20. var Clear1 = regexp.MustCompile(".*(cn|com|org|net|co|mo|vn|en)((\\d)+[.]{0,}(\\d){0,})$")
  21. var RegSpace = regexp.MustCompile("[\\s\u3000\u2003\u00a0]+")
  22. var Replace = map[string]string{
  23. ":": ":",
  24. ",": ".",
  25. "。": ".",
  26. }
  27. //定时任务
  28. func TimeTask() {
  29. defer qu.Catch()
  30. //StartTask()
  31. c := cron.New()
  32. cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
  33. c.AddFunc(cronstr, func() { StartTask() })
  34. c.Start()
  35. }
  36. //开始任务
  37. func StartTask() {
  38. fmt.Println("开始任务...")
  39. defer qu.Catch()
  40. sess := Mgo.GetMgoConn()
  41. defer Mgo.DestoryMongoConn(sess)
  42. q := map[string]interface{}{
  43. "_id": map[string]interface{}{
  44. "$gt": mgo.StringTOBsonId(LatestId),
  45. },
  46. }
  47. endId, ok := GetEndId(q) //获取bidding表最后一个数据id
  48. if !ok || endId == "" {
  49. return
  50. }
  51. q = map[string]interface{}{
  52. "_id": map[string]interface{}{
  53. "$gt": mgo.StringTOBsonId(LatestId),
  54. "$lte": mgo.StringTOBsonId(endId),
  55. },
  56. }
  57. LatestId = endId //替换起始id
  58. field := map[string]interface{}{"detail": 1}
  59. logger.Debug("query:", q)
  60. it := sess.DB("qfw").C("bidding").Find(q).Select(field).Sort("_id").Iter()
  61. count := Mgo.Count("test", q)
  62. fmt.Println("共加载数据", count)
  63. sum := 0
  64. wg := &sync.WaitGroup{}
  65. //lock_bid := &sync.Mutex{}
  66. lock_dmn := &sync.Mutex{}
  67. save := []map[string]interface{}{}
  68. //arr := [][]map[string]interface{}{}
  69. ch := make(chan bool, 20)
  70. for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
  71. ch <- true
  72. wg.Add(1)
  73. go func(d map[string]interface{}) {
  74. defer func() {
  75. <-ch
  76. wg.Done()
  77. }()
  78. id := mgo.BsonIdToSId(d["_id"])
  79. detail := qu.ObjToString(d["detail"])
  80. hrefArr := Reg1.FindAllString(detail, -1) //匹配detail
  81. for _, href := range hrefArr {
  82. if len(href) < 13 {
  83. continue
  84. }
  85. hrefTmp := RegSpace.ReplaceAllString(href, "") //去除空格
  86. hrefTmp = strings.ToLower(hrefTmp) //转小写
  87. for {
  88. if strings.HasSuffix(hrefTmp, ".") || strings.HasSuffix(hrefTmp, "-") {
  89. hrefTmp = hrefTmp[:len(hrefTmp)-1]
  90. } else {
  91. break
  92. }
  93. }
  94. dmName := Reg2.FindString(hrefTmp) //匹配域名
  95. for k, v := range Replace { //替换字符
  96. dmName = strings.ReplaceAll(dmName, k, v)
  97. }
  98. if dmName == "" {
  99. continue
  100. }
  101. //特殊情况处理 https://cloudmeeting.189.cn6.7 清理6.7
  102. text := ""
  103. apos := Clear1.FindAllStringSubmatchIndex(dmName, -1)
  104. if len(apos) > 0 {
  105. for _, pos := range apos {
  106. if len(pos) > 4 {
  107. text = dmName[pos[4]:pos[5]] //6.7
  108. }
  109. }
  110. }
  111. if text != "" {
  112. lastIndex := strings.LastIndex(dmName, text)
  113. dmName = dmName[:lastIndex] //https://cloudmeeting.189.cn
  114. }
  115. lock_dmn.Lock()
  116. if !DomainNameMap[dmName] { //不在白名单
  117. tmpMap := map[string]interface{}{"id": id, "domainame": dmName, "href": href, "detail": detail, "clear": href}
  118. if text != "" {
  119. lastIndex := strings.LastIndex(href, text)
  120. href = href[:lastIndex]
  121. tmpMap["clear"] = href
  122. }
  123. save = append(save, tmpMap)
  124. if len(save) > 500 {
  125. tmps := save
  126. Mgo.SaveBulk("domainlog", tmps...)
  127. save = []map[string]interface{}{}
  128. }
  129. //清理detail
  130. // detail = strings.ReplaceAll(detail, href, "")
  131. // query := map[string]interface{}{"_id": d["_id"]}
  132. // set := map[string]interface{}{
  133. // "$set": map[string]interface{}{
  134. // "detail": detail,
  135. // },
  136. // }
  137. // update := []map[string]interface{}{}
  138. // update = append(update, query)
  139. // update = append(update, set)
  140. // arr = append(arr, update)
  141. // if len(arr) > 500 {
  142. // tmps := arr
  143. // Mgo.UpdateBulk("test1", tmps...)
  144. // arr = [][]map[string]interface{}{}
  145. // }
  146. }
  147. lock_dmn.Unlock()
  148. }
  149. }(tmp)
  150. if sum%100 == 0 {
  151. fmt.Println("current:", sum)
  152. }
  153. tmp = map[string]interface{}{}
  154. }
  155. wg.Wait()
  156. lock_dmn.Lock()
  157. if len(save) > 0 {
  158. Mgo.SaveBulk("domainlog", save...)
  159. save = []map[string]interface{}{}
  160. }
  161. // if len(arr) > 0 {
  162. // Mgo.UpdateBulk("test1", arr...)
  163. // arr = [][]map[string]interface{}{}
  164. // }
  165. lock_dmn.Unlock()
  166. fmt.Println("本轮任务结束")
  167. }
  168. //加载域名信息
  169. func InitDomainName() {
  170. defer qu.Catch()
  171. fmt.Println("初始化域名...")
  172. if DomainNameMap == nil {
  173. DomainNameMap = make(map[string]bool)
  174. }
  175. list, _ := Mgo.Find("domainame", nil, nil, nil, false, -1, -1)
  176. for _, l := range *list {
  177. href := qu.ObjToString(l["href"])
  178. DomainNameMap[href] = true
  179. }
  180. fmt.Println("域名初始化完毕...", len(DomainNameMap))
  181. }
  182. //获取最后endId
  183. func GetEndId(query map[string]interface{}) (string, bool) {
  184. endId := ""
  185. ok := false
  186. list, _ := Mgo.Find("bidding", query, `{"_id":-1}`, `{_id:1}`, false, 0, 1)
  187. if len(*list) == 1 {
  188. endId = mgo.BsonIdToSId((*list)[0]["_id"])
  189. if endId >= LatestId {
  190. return endId, true
  191. }
  192. }
  193. return endId, ok
  194. }