random.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package timetask
  2. import (
  3. "encoding/json"
  4. qu "qfw/util"
  5. "strings"
  6. "sync"
  7. "time"
  8. "util"
  9. )
  10. type WarnInfo struct {
  11. Fields map[string]bool
  12. MaxLevel int
  13. Data interface{}
  14. Site interface{}
  15. Channel interface{}
  16. Title interface{}
  17. Infos map[string]bool
  18. Code interface{}
  19. Href interface{}
  20. Repeat bool
  21. }
  22. var StypeArr = []string{
  23. "Field Value Is Null",
  24. "Field Value Contains Random Code",
  25. "Field Value Not Contains Chinese",
  26. "Detail File Err",
  27. }
  28. func PushSpiderWarnErrData() {
  29. GetSpiderWarnData()
  30. GetHighlistDetailFilErrData()
  31. }
  32. func GetHighlistDetailFilErrData() {
  33. defer qu.Catch()
  34. sess := util.MgoS.GetMgoConn()
  35. defer util.MgoS.DestoryMongoConn(sess)
  36. stime := util.GetTime(-7)
  37. etime := util.GetTime(-6)
  38. query := map[string]interface{}{
  39. "comeintime": map[string]interface{}{
  40. "$gte": stime,
  41. "$lt": etime,
  42. },
  43. "detailfilerr": true,
  44. "state": -1,
  45. }
  46. fields := map[string]interface{}{
  47. "site": 1,
  48. "channel": 1,
  49. "spidercode": 1,
  50. "area": 1,
  51. "city": 1,
  52. "district": 1,
  53. "jsondata": 1,
  54. "publishtime": 1,
  55. "comeintime": 1,
  56. "href": 1,
  57. "title": 1,
  58. "dataging": 1,
  59. "_id": 0,
  60. }
  61. ch := make(chan bool, 2)
  62. wg := &sync.WaitGroup{}
  63. lock := &sync.Mutex{}
  64. arr := []map[string]interface{}{}
  65. it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
  66. n := 0
  67. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  68. ch <- true
  69. wg.Add(1)
  70. go func(tmp map[string]interface{}) {
  71. defer func() {
  72. <-ch
  73. wg.Done()
  74. }()
  75. result := map[string]interface{}{}
  76. result["from"] = "list"
  77. result["level"] = 2
  78. result["info"] = "Detail File Err"
  79. result["ok"] = false
  80. result["field"] = "detail"
  81. result["site"] = tmp["site"]
  82. result["channel"] = tmp["channel"]
  83. result["title"] = tmp["title"]
  84. result["href"] = tmp["href"]
  85. result["spidercode"] = tmp["spidercode"]
  86. result["comeintime"] = time.Now().Unix()
  87. //publishtime
  88. publishtime_str := qu.ObjToString(tmp["publishtime"])
  89. publishtime_int := int64(0)
  90. if publishtime_str != "0" {
  91. if t, err := time.ParseInLocation(qu.Date_Full_Layout, publishtime_str, time.Local); err == nil {
  92. publishtime_int = t.Unix()
  93. }
  94. }
  95. result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime_int)
  96. //jsondata
  97. if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" {
  98. jsondataMap := map[string]interface{}{}
  99. if json.Unmarshal([]byte(jsondata), &jsondataMap) == nil {
  100. tmp["jsondata"] = jsondataMap
  101. } else {
  102. delete(tmp, "jsondata")
  103. }
  104. }
  105. iscompete := false
  106. coll := "bidding"
  107. lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": tmp["spidercode"]})
  108. if len(*lua) > 0 {
  109. iscompete, _ = (*lua)["spidercompete"].(bool)
  110. param_common := (*lua)["param_common"].([]interface{})
  111. if len(param_common) >= 8 {
  112. coll = qu.ObjToString(param_common[7])
  113. }
  114. }
  115. tmp["iscompete"] = iscompete
  116. tmp["publishtime"] = publishtime_int
  117. tmp["_d"] = "comeintime"
  118. tmp["T"] = coll
  119. result["data"] = tmp
  120. lock.Lock()
  121. arr = append(arr, result)
  122. if len(arr) > 500 {
  123. util.MgoS.SaveBulk("spider_warn_err", arr...)
  124. arr = []map[string]interface{}{}
  125. }
  126. lock.Unlock()
  127. }(tmp)
  128. if n%100 == 0 {
  129. qu.Debug("current:", n)
  130. }
  131. tmp = map[string]interface{}{}
  132. }
  133. wg.Wait()
  134. if len(arr) > 0 {
  135. util.MgoS.SaveBulk("spider_warn_err", arr...)
  136. arr = []map[string]interface{}{}
  137. }
  138. }
  139. func GetSpiderWarnData() {
  140. defer qu.Catch()
  141. qu.Debug("准备spider_warn_err数据")
  142. sess := util.MgoS.GetMgoConn()
  143. defer util.MgoS.DestoryMongoConn(sess)
  144. stime := util.GetTime(-1)
  145. etime := util.GetTime(0)
  146. if time.Now().Weekday().String() == "Monday" {
  147. stime = util.GetTime(-3)
  148. }
  149. query := map[string]interface{}{
  150. "comeintime": map[string]interface{}{
  151. "$gte": stime,
  152. "$lt": etime,
  153. },
  154. "info": map[string]interface{}{ //保存服务更新后这个条件可去掉2022-11-28
  155. "$in": StypeArr,
  156. },
  157. "level": 2,
  158. }
  159. ch := make(chan bool, 2)
  160. wg := &sync.WaitGroup{}
  161. lock := &sync.Mutex{}
  162. result := map[string]*WarnInfo{}
  163. it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Iter()
  164. n := 0
  165. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  166. ch <- true
  167. wg.Add(1)
  168. go func(tmp map[string]interface{}) {
  169. defer func() {
  170. <-ch
  171. wg.Done()
  172. }()
  173. href := qu.ObjToString(tmp["href"])
  174. level := qu.IntAll(tmp["level"])
  175. field := qu.ObjToString(tmp["field"])
  176. info := qu.ObjToString(tmp["info"])
  177. title := qu.ObjToString(tmp["title"])
  178. publishtime := int64(0)
  179. data, ok := tmp["data"].(map[string]interface{})
  180. if ok {
  181. if ptime := data["publishtime"]; ptime != nil {
  182. publishtime = qu.Int64All(ptime)
  183. }
  184. }
  185. //数据验证,是否有title一致,相似publishtime的数据,视为一样的数据,不需要再修复
  186. repeat := RepeatData(title, publishtime)
  187. lock.Lock()
  188. if warnInfo := result[href]; warnInfo == nil {
  189. warnInfo = &WarnInfo{
  190. Fields: map[string]bool{field: true},
  191. MaxLevel: level,
  192. Data: data,
  193. Site: tmp["site"],
  194. Channel: tmp["channel"],
  195. Title: title,
  196. Infos: map[string]bool{info: true},
  197. Code: tmp["code"],
  198. Href: href,
  199. Repeat: repeat,
  200. }
  201. result[href] = warnInfo
  202. } else {
  203. warnInfo.Fields[field] = true
  204. warnInfo.Infos[info] = true
  205. if warnInfo.MaxLevel < level {
  206. warnInfo.MaxLevel = level
  207. }
  208. }
  209. lock.Unlock()
  210. }(tmp)
  211. if n%1000 == 0 {
  212. qu.Debug("current:", n)
  213. }
  214. tmp = map[string]interface{}{}
  215. }
  216. wg.Wait()
  217. saveArr := []map[string]interface{}{}
  218. for _, wi := range result {
  219. ch <- true
  220. wg.Add(1)
  221. go func(w *WarnInfo) {
  222. defer func() {
  223. <-ch
  224. wg.Done()
  225. }()
  226. fields := []string{}
  227. for f, _ := range w.Fields {
  228. fields = append(fields, f)
  229. }
  230. infos := []string{}
  231. for t, _ := range w.Infos {
  232. infos = append(infos, t)
  233. }
  234. lock.Lock()
  235. saveArr = append(saveArr, map[string]interface{}{
  236. "field": strings.Join(fields, ","),
  237. "level": w.MaxLevel,
  238. "site": w.Site,
  239. "channel": w.Channel,
  240. "title": w.Title,
  241. "repeat": w.Repeat,
  242. "comeintime": time.Now().Unix(),
  243. "info": strings.Join(infos, ","),
  244. "spidercode": w.Code,
  245. "href": w.Href,
  246. "data": w.Data,
  247. "ok": false,
  248. "from": "warn",
  249. })
  250. if len(saveArr) > 500 {
  251. util.MgoS.SaveBulk("spider_warn_err", saveArr...)
  252. saveArr = []map[string]interface{}{}
  253. }
  254. lock.Unlock()
  255. }(wi)
  256. }
  257. wg.Wait()
  258. if len(saveArr) > 0 {
  259. util.MgoS.SaveBulk("spider_warn_err", saveArr...)
  260. saveArr = []map[string]interface{}{}
  261. }
  262. }
  263. func RepeatData(title string, publishtime int64) bool {
  264. return util.MgoB.Count("bidding",
  265. map[string]interface{}{
  266. "title": title,
  267. "publishtime": map[string]interface{}{
  268. "$gte": publishtime + 86400*3,
  269. "$lte": publishtime - 86400*3,
  270. },
  271. }) > 0
  272. }
  273. /*
  274. 每天定时推送含乱码数据
  275. */
  276. // var (
  277. // RandomDataPushCron string
  278. // Gmail *gm.GmailAuth
  279. // To string
  280. // )
  281. // type FileWrite struct {
  282. // Byte *bytes.Buffer
  283. // }
  284. // func (fw *FileWrite) Write(p []byte) (n int, err error) {
  285. // n, err = fw.Byte.Write(p)
  286. // return
  287. // }
  288. //PushRandomData 推送乱码数据
  289. // func PushRandomData() {
  290. // defer qu.Catch()
  291. // query := map[string]interface{}{
  292. // //"comeintime": map[string]interface{}{
  293. // // "$gte": GetTime(-1),
  294. // // "$lt": GetTime(0),
  295. // //},
  296. // "info": map[string]interface{}{
  297. // "$in": []string{"Field Value Not Contains Chinese"},
  298. // },
  299. // }
  300. // list, _ := MgoS.Find("spider_warn", query, nil, nil, false, -1, -1)
  301. // if len(*list) > 0 {
  302. // file := xlsx.NewFile()
  303. // sheet, _ := file.AddSheet("乱码数据")
  304. // row := sheet.AddRow()
  305. // row.AddCell().SetValue("站点")
  306. // row.AddCell().SetValue("栏目")
  307. // row.AddCell().SetValue("爬虫")
  308. // row.AddCell().SetValue("字段")
  309. // row.AddCell().SetValue("异常等级")
  310. // row.AddCell().SetValue("标题")
  311. // row.AddCell().SetValue("链接")
  312. // for _, l := range *list {
  313. // textRow := sheet.AddRow()
  314. // textRow.AddCell().SetValue(qu.ObjToString(l["site"]))
  315. // textRow.AddCell().SetValue(qu.ObjToString(l["channel"]))
  316. // textRow.AddCell().SetValue(qu.ObjToString(l["code"]))
  317. // textRow.AddCell().SetValue(qu.ObjToString(l["field"]))
  318. // level := qu.IntAll(l["level"])
  319. // if level == 1 {
  320. // textRow.AddCell().SetValue("警告")
  321. // } else if level == 2 {
  322. // textRow.AddCell().SetValue("错误")
  323. // }
  324. // textRow.AddCell().SetValue(qu.ObjToString(l["title"]))
  325. // textRow.AddCell().SetValue(qu.ObjToString(l["href"]))
  326. // }
  327. // fw := &FileWrite{
  328. // Byte: &bytes.Buffer{},
  329. // }
  330. // file.Write(fw)
  331. // bt := fw.Byte.Bytes()
  332. // gm.GSendMail_Bq("jy@jianyu360.cn", To, "", "", "乱码数据统计", "", "统计报表.xlsx", bt, Gmail)
  333. // }
  334. // }