errdata.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package luaerrdata
  2. import (
  3. "encoding/json"
  4. "luaweb/spider"
  5. "luaweb/udp"
  6. qu "qfw/util"
  7. mgdb "qfw/util/mongodb"
  8. mgu "qfw/util/mongodbutil"
  9. util "spiderutil"
  10. "strings"
  11. "time"
  12. "github.com/go-xweb/xweb"
  13. "gopkg.in/mgo.v2/bson"
  14. )
  15. const role_admin, role_examine, role_dev = 3, 2, 1 //管理员,审核员,开发员
  16. var Text = "重采失败,请稍后重试"
  17. type LuaInfo struct {
  18. ModifyUser string //爬虫修改人
  19. ReState int //重采状态
  20. State int //爬虫状态
  21. Event int //节点
  22. StateTime int64 //重采状态对应的时间
  23. AllErrNum int //所有数据对应的爬虫总量
  24. ErrNum int //采集失败量
  25. ReErrNum int //重采失败量
  26. SaveErrNum int //保存失败量
  27. }
  28. type ErrorData struct {
  29. *xweb.Action
  30. errorDataIndex xweb.Mapper `xweb:"/center/errorData"` //加载错误数据
  31. findByCode xweb.Mapper `xweb:"/center/errorData/findByCode"` //根据爬虫找所有错误数据
  32. regatherData xweb.Mapper `xweb:"/center/errorData/regatherData"` //数据重采
  33. singleRegather xweb.Mapper `xweb:"/center/errorData/singleRegather"` //单个信息重采
  34. confirmLua xweb.Mapper `xweb:"/center/errorData/confirmLua"` //确认爬虫
  35. updateRestate xweb.Mapper `xweb:"/center/errorData/updateRestate"` //修改restae状态
  36. updateOnlineLua xweb.Mapper `xweb:"/center/errorData/updateonlinelua"` //重新上架
  37. confirmRepair xweb.Mapper `xweb:"/center/errorData/confirmrepair"` //确认修复
  38. }
  39. func (ed *ErrorData) ErrorDataIndex() {
  40. defer qu.Catch()
  41. auth := qu.IntAll(ed.GetSession("auth"))
  42. searchStr := ed.GetString("search[value]")
  43. search := strings.TrimSpace(searchStr)
  44. date := ed.GetString("date")
  45. user := qu.ObjToString(ed.GetSession("loginuser"))
  46. //if auth == role_admin {
  47. if ed.Method() == "POST" {
  48. startTime, endTime := GetStartAndEndTime(date)
  49. if startTime == 0 || endTime == 0 {
  50. return
  51. }
  52. query := map[string]interface{}{
  53. "state": bson.M{"$lte": 3}, //查询state==3及修复成功的数据是为了在页面展示该数据的爬虫方便“更新上架”,“确认修复”
  54. "from": "lua",
  55. "comeintime": bson.M{"$gte": startTime, "$lte": endTime},
  56. }
  57. if auth == 1 {
  58. query["modifyuser"] = user
  59. }
  60. if search != "" {
  61. query = bson.M{"spidercode": bson.M{"$regex": search}}
  62. }
  63. qu.Debug("query:", query)
  64. data := []map[string]interface{}{}
  65. list := *mgu.Find("regatherdata", "spider", "spider", query, nil, nil, false, -1, -1)
  66. if len(list) > 0 {
  67. //alltmp := map[string]int{}
  68. errtmp := map[string]int{}
  69. rerrtmp := map[string]int{}
  70. saverrtmp := map[string]int{}
  71. successtmp := map[string]int{}
  72. codeLua := map[string]*LuaInfo{}
  73. for _, l := range list {
  74. code := qu.ObjToString(l["spidercode"])
  75. state := qu.IntAll(l["state"])
  76. //alltmp[code] = alltmp[code] + 1
  77. if codeLua[code] == nil {
  78. data := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  79. restate := qu.IntAll(data["restate"])
  80. info := &LuaInfo{
  81. ModifyUser: qu.ObjToString(data["modifyuser"]),
  82. ReState: restate,
  83. State: qu.IntAll(data["state"]),
  84. Event: qu.IntAll(data["event"]),
  85. StateTime: int64(0),
  86. }
  87. if restate == 4 {
  88. info.StateTime = qu.Int64All(data["updatetime"])
  89. } else if restate == 3 {
  90. info.StateTime = qu.Int64All(data["auditime"])
  91. } else if restate == 2 {
  92. info.StateTime = qu.Int64All(data["repairtime"])
  93. } else if restate == 1 {
  94. info.StateTime = qu.Int64All(data["confirmtime"])
  95. }
  96. codeLua[code] = info
  97. }
  98. if state == 0 { //错误
  99. errtmp[code] = errtmp[code] + 1
  100. } else if state == 1 { //重下失败
  101. rerrtmp[code] = rerrtmp[code] + 1
  102. } else if state == 2 { //保存服务发送失败
  103. saverrtmp[code] = saverrtmp[code] + 1
  104. } else { //已修复数据
  105. successtmp[code] = successtmp[code] + 1
  106. }
  107. }
  108. vs_err := MapValueSort(errtmp) //采集错误量排序
  109. for i := vs_err.Len() - 1; i >= 0; i-- {
  110. spidercode := vs_err.Keys[i]
  111. data = append(data, map[string]interface{}{
  112. "spidercode": spidercode,
  113. "encode": util.Se.Encode2Hex(spidercode),
  114. "errnum": vs_err.Vals[i],
  115. "rerrnum": rerrtmp[spidercode],
  116. "saverrnum": saverrtmp[spidercode],
  117. "num": vs_err.Len() - i,
  118. "modifyuser": codeLua[spidercode].ModifyUser,
  119. "restate": codeLua[spidercode].ReState,
  120. "state": codeLua[spidercode].State,
  121. "statetime": codeLua[spidercode].StateTime,
  122. "event": codeLua[spidercode].Event,
  123. })
  124. //delete(alltmp, spidercode)
  125. delete(rerrtmp, spidercode) //去除有采集失败的爬虫
  126. delete(saverrtmp, spidercode) //去除有采集失败的爬虫
  127. delete(successtmp, spidercode) //去除有采集失败的爬虫
  128. }
  129. count := vs_err.Len()
  130. vs_reerr := MapValueSort(rerrtmp) //重采错误数据量排序
  131. for j := vs_reerr.Len() - 1; j >= 0; j-- {
  132. count++
  133. spidercode := vs_reerr.Keys[j]
  134. data = append(data, map[string]interface{}{
  135. "spidercode": spidercode,
  136. "encode": util.Se.Encode2Hex(spidercode),
  137. "errnum": 0,
  138. "rerrnum": rerrtmp[spidercode],
  139. "saverrnum": saverrtmp[spidercode],
  140. "num": count,
  141. "modifyuser": codeLua[spidercode].ModifyUser,
  142. "restate": codeLua[spidercode].ReState,
  143. "state": codeLua[spidercode].State,
  144. "statetime": codeLua[spidercode].StateTime,
  145. "event": codeLua[spidercode].Event,
  146. })
  147. delete(saverrtmp, spidercode) //去除有采集失败的爬虫
  148. delete(successtmp, spidercode) //去除有采集失败的爬虫
  149. }
  150. for spidercode, _ := range saverrtmp {
  151. count++
  152. data = append(data, map[string]interface{}{
  153. "spidercode": spidercode,
  154. "encode": util.Se.Encode2Hex(spidercode),
  155. "errnum": 0,
  156. "rerrnum": 0,
  157. "saverrnum": saverrtmp[spidercode],
  158. "num": count,
  159. "modifyuser": codeLua[spidercode].ModifyUser,
  160. "restate": codeLua[spidercode].ReState,
  161. "state": codeLua[spidercode].State,
  162. "statetime": codeLua[spidercode].StateTime,
  163. "event": codeLua[spidercode].Event,
  164. })
  165. delete(successtmp, spidercode) //去除有采集失败的爬虫
  166. }
  167. for spidercode, _ := range successtmp {
  168. count++
  169. data = append(data, map[string]interface{}{
  170. "spidercode": spidercode,
  171. "encode": util.Se.Encode2Hex(spidercode),
  172. "errnum": 0,
  173. "rerrnum": 0,
  174. "saverrnum": 0,
  175. "num": count,
  176. "modifyuser": codeLua[spidercode].ModifyUser,
  177. "restate": codeLua[spidercode].ReState,
  178. "state": codeLua[spidercode].State,
  179. "statetime": codeLua[spidercode].StateTime,
  180. "event": codeLua[spidercode].Event,
  181. })
  182. }
  183. // for code, _ := range alltmp {
  184. // i++
  185. // data = append(data, map[string]interface{}{
  186. // "spidercode": code,
  187. // "encode": util.Se.Encode2Hex(code),
  188. // "errnum": 0,
  189. // "rerrnum": rerrtmp[code],
  190. // "saverrnum": saverrtmp[code],
  191. // "num": i,
  192. // "modifyuser": codeLua[code].ModifyUser,
  193. // "restate": codeLua[code].ReState,
  194. // "state": codeLua[code].State,
  195. // "statetime": codeLua[code].StateTime,
  196. // "event": codeLua[code].Event,
  197. // })
  198. // }
  199. //alltmp = map[string]int{}
  200. errtmp = map[string]int{}
  201. rerrtmp = map[string]int{}
  202. saverrtmp = map[string]int{}
  203. codeLua = map[string]*LuaInfo{}
  204. }
  205. ed.ServeJson(map[string]interface{}{"data": data})
  206. } else {
  207. now := time.Now().AddDate(0, 0, -1)
  208. ed.T["date"] = qu.FormatDate(&now, qu.Date_Short_Layout)
  209. ed.Render("errdata.html", &ed.T)
  210. }
  211. // } else {
  212. // ed.Write("您没有权限")
  213. // }
  214. //ed.Write("数据错误")
  215. }
  216. func (ed *ErrorData) FindByCode() {
  217. defer qu.Catch()
  218. code := ed.GetString("code")
  219. date := ed.GetString("date")
  220. if ed.Method() == "GET" {
  221. restate, _ := ed.GetInteger("restate")
  222. ed.T["date"] = date
  223. ed.T["code"] = ed.GetString("code")
  224. ed.T["restate"] = restate
  225. ed.Render("errhreflist.html", &ed.T)
  226. } else if ed.Method() == "POST" {
  227. startTime, endTime := GetStartAndEndTime(date)
  228. if startTime == 0 || endTime == 0 {
  229. return
  230. }
  231. start, _ := ed.GetInteger("start")
  232. limit, _ := ed.GetInteger("length")
  233. draw, _ := ed.GetInteger("draw")
  234. state, _ := ed.GetInteger("state")
  235. q_state := bson.M{}
  236. if state == -1 {
  237. q_state = bson.M{"$lt": 3}
  238. } else {
  239. q_state = bson.M{"$eq": state}
  240. }
  241. query := bson.M{
  242. "from": "lua",
  243. "spidercode": code,
  244. "state": q_state,
  245. "comeintime": bson.M{"$gte": startTime, "$lte": endTime},
  246. }
  247. qu.Debug("query:", query)
  248. page := start / 10
  249. data := *mgu.Find("regatherdata", "spider", "spider", query, `{"state":1}`, `{"href":1,"spidercode":1,"state":1,comeintime:1}`, false, start, limit)
  250. count := mgu.Count("regatherdata", "spider", "spider", query)
  251. if data != nil {
  252. for k, d := range data {
  253. d["num"] = k + 1 + page*10
  254. state := qu.IntAll(d["state"])
  255. if state == 0 {
  256. d["state"] = "采集失败"
  257. } else if state == 1 {
  258. d["state"] = "重采失败"
  259. } else if state == 2 {
  260. d["state"] = "保存失败"
  261. }
  262. comeintime := qu.Int64All(d["comeintime"])
  263. d["comeintime"] = qu.FormatDateByInt64(&comeintime, qu.Date_Full_Layout)
  264. }
  265. }
  266. ed.ServeJson(map[string]interface{}{"draw": draw, "data": data, "recordsFiltered": count, "recordsTotal": count})
  267. }
  268. return
  269. }
  270. //重采
  271. func (ed *ErrorData) RegatherData() {
  272. defer qu.Catch()
  273. codes := ed.GetString("codes")
  274. date := ed.GetString("date")
  275. state, _ := ed.GetInteger("state")
  276. if len(codes) > 0 {
  277. start, end := GetStartAndEndTime(date)
  278. save := map[string]interface{}{
  279. "start": start,
  280. "end": end,
  281. "state": state,
  282. "codeorid": strings.Split(codes, ","),
  283. "isrun": false,
  284. "type": "code",
  285. }
  286. if id := mgu.Save("regatherudp", "spider", "spider", save); id != "" {
  287. qu.Debug("发送udp", id)
  288. udpMap := map[string]interface{}{
  289. "udpid": id,
  290. }
  291. by, err := json.Marshal(udpMap)
  292. if err == nil && !udp.IsSendUdp {
  293. udp.SendUdpLock.Lock()
  294. udp.SendUdp(by) //发送udp
  295. select {
  296. case info := <-udp.Ch:
  297. udp.IsSendUdp = false
  298. ed.ServeJson(info)
  299. case <-time.After(time.Second * 10):
  300. go func() {
  301. q := map[string]interface{}{"_id": qu.StringTOBsonId(id)}
  302. s := map[string]interface{}{"$set": map[string]interface{}{"isrun": true, "remark": "Udp发送失败"}}
  303. mgu.Update("regatherudp", "spider", "spider", q, s, false, false)
  304. }()
  305. udp.IsSendUdp = false
  306. ed.ServeJson("重新采集失败,请稍后重试")
  307. }
  308. udp.SendUdpLock.Unlock()
  309. }
  310. ed.ServeJson(Text)
  311. } else {
  312. ed.ServeJson("Udp条件保存失败")
  313. }
  314. } else {
  315. ed.ServeJson("爬虫代码选择出错")
  316. }
  317. }
  318. func (ed *ErrorData) SingleRegather() {
  319. defer qu.Catch()
  320. code := ed.GetString("code")
  321. date := ed.GetString("date")
  322. state, _ := ed.GetInteger("state")
  323. id := ed.GetString("id")
  324. start, end := GetStartAndEndTime(date)
  325. save := map[string]interface{}{
  326. "start": start,
  327. "end": end,
  328. "state": state,
  329. "isrun": false,
  330. }
  331. if id != "" {
  332. save["codeorid"] = []string{id}
  333. save["type"] = "id"
  334. } else if id == "" && state == -1 { //全部重采
  335. save["codeorid"] = []string{code}
  336. save["type"] = "code"
  337. }
  338. if id := mgu.Save("regatherudp", "spider", "spider", save); id != "" {
  339. qu.Debug("发送udp", id)
  340. udpMap := map[string]interface{}{
  341. "udpid": id,
  342. }
  343. by, err := json.Marshal(udpMap)
  344. if err == nil && !udp.IsSendUdp {
  345. udp.SendUdpLock.Lock()
  346. udp.SendUdp(by) //发送udp
  347. select {
  348. case info := <-udp.Ch:
  349. udp.IsSendUdp = false
  350. ed.ServeJson(info)
  351. case <-time.After(time.Second * 10):
  352. go func() {
  353. q := map[string]interface{}{"_id": qu.StringTOBsonId(id)}
  354. s := map[string]interface{}{"$set": map[string]interface{}{"isrun": true, "remark": "Udp发送失败"}}
  355. mgu.Update("regatherudp", "spider", "spider", q, s, false, false)
  356. }()
  357. udp.IsSendUdp = false
  358. ed.ServeJson("重新采集失败,请稍后重试")
  359. }
  360. udp.SendUdpLock.Unlock()
  361. }
  362. ed.ServeJson(Text)
  363. } else {
  364. ed.ServeJson("Udp条件保存失败")
  365. }
  366. }
  367. func (ed *ErrorData) ConfirmLua() {
  368. defer qu.Catch()
  369. codes := ed.GetString("codes")
  370. state := true
  371. for _, code := range strings.Split(codes, ",") {
  372. lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  373. if restate := qu.IntAll(lua["restate"]); restate == 0 || restate == 4 || restate == 3 { //0是未修复过的爬虫;4是已经修复过的爬虫
  374. b := mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 1, "confirmtime": time.Now().Unix()}}, false, false)
  375. if !b {
  376. state = false
  377. }
  378. }
  379. }
  380. ed.ServeJson(map[string]interface{}{"state": state})
  381. }
  382. func (ed *ErrorData) UpdateRestate() {
  383. defer qu.Catch()
  384. code := ed.GetString("code")
  385. restateTmp, _ := ed.GetInteger("restate")
  386. lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  387. restate := qu.IntAll(lua["restate"])
  388. if restate == 0 {
  389. ed.ServeJson("该爬虫未确认")
  390. return
  391. }
  392. set := map[string]interface{}{}
  393. if restateTmp == 1 { //提交审核
  394. if restate > 1 {
  395. ed.ServeJson("该爬虫已修复")
  396. return
  397. }
  398. set["restate"] = 2
  399. set["repairtime"] = time.Now().Unix()
  400. } else if restateTmp == 2 { //审核通过
  401. if restate < 2 {
  402. ed.ServeJson("该爬虫未修复")
  403. return
  404. }
  405. set["restate"] = 3
  406. set["auditime"] = time.Now().Unix()
  407. }
  408. b := mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false)
  409. text := ""
  410. if restateTmp == 1 {
  411. if b {
  412. text = "提交修复成功"
  413. } else {
  414. text = "提交修复失败"
  415. }
  416. } else if restateTmp == 2 {
  417. if b {
  418. text = "审核成功"
  419. } else {
  420. text = "审核失败"
  421. }
  422. }
  423. ed.ServeJson(text)
  424. }
  425. func (ed *ErrorData) UpdateOnlineLua() {
  426. defer qu.Catch()
  427. code := ed.GetString("code")
  428. lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  429. if qu.IntAll(lua["state"]) != 5 { //非上架爬虫,不能更新
  430. ed.ServeJson(map[string]interface{}{"state": false})
  431. return
  432. }
  433. event := qu.IntAll(lua["event"])
  434. b, err := spider.UpdateSpiderByCodeState(code, "-1", event)
  435. if b && err == nil {
  436. mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false)
  437. ed.ServeJson(map[string]interface{}{"state": true})
  438. } else {
  439. ed.ServeJson(map[string]interface{}{"state": false})
  440. }
  441. }
  442. func (ed *ErrorData) ConfirmRepair() {
  443. defer qu.Catch()
  444. codes := ed.GetString("codes")
  445. data := []string{}
  446. for _, code := range strings.Split(codes, ",") {
  447. if code == "" {
  448. continue
  449. }
  450. if !mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false) {
  451. data = append(data, code)
  452. }
  453. }
  454. ed.ServeJson(map[string]interface{}{"data": data})
  455. }
  456. func GetStartAndEndTime(date string) (startTime, endTime int64) {
  457. t, err := time.ParseInLocation(qu.Date_Short_Layout, date, time.Local)
  458. if err != nil {
  459. qu.Debug("Time Error:", err)
  460. return
  461. }
  462. startTime = t.Unix()
  463. endTime = t.AddDate(0, 0, 1).Unix()
  464. return
  465. }