luamove.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package front
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "luaweb/spider"
  7. "net/http"
  8. qu "qfw/util"
  9. mgdb "qfw/util/mongodb"
  10. "sort"
  11. util "spiderutil"
  12. "strings"
  13. "time"
  14. "github.com/go-xweb/xweb"
  15. "gopkg.in/mgo.v2/bson"
  16. )
  17. type LuaMove struct {
  18. *xweb.Action
  19. luaMoveManager xweb.Mapper `xweb:"/center/luamove"` //站点列表
  20. luaMoveSite xweb.Mapper `xweb:"/center/luamove/site/(.*)"` //
  21. luaMoveCodeInfo xweb.Mapper `xweb:"/center/luamove/codeinfo"` //爬虫列表
  22. luaMoveBySite xweb.Mapper `xweb:"/center/luamove/luamovebysite"` //按站点迁移、关闭
  23. luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //按爬虫迁移、关闭
  24. updateEventBySite xweb.Mapper `xweb:"/center/spider/updateventbysite"` //更新某站点下爬虫的节点
  25. }
  26. type OtherBase struct {
  27. IsFlow int //爬虫所采集数据是否参与数据流程标识
  28. SpiderType string //爬虫类型:increment增量;history历史
  29. SpiderHistoryMaxPage int //采集历史数据时的采集最大页
  30. SpiderMoveEvent string //爬虫采集完历史后要转移到的节点 comm:队列模式、bid:高性能模式、7700
  31. }
  32. func (lm *LuaMove) LuaMoveManager() {
  33. auth := qu.IntAll(lm.GetSession("auth"))
  34. if auth == role_admin {
  35. if lm.Method() == "GET" {
  36. events := []string{}
  37. for k, _ := range util.Config.Uploadevents {
  38. events = append(events, k)
  39. }
  40. sort.Strings(events)
  41. lm.T["events"] = events
  42. lm.Render("luamovesite.html")
  43. } else {
  44. query := bson.M{"lm_ismove": true}
  45. lualist := *mgdb.Find("luaconfig", query, nil, nil, false, -1, -1)
  46. data := []map[string]interface{}{}
  47. siteMap := map[string][]map[string]interface{}{}
  48. for _, l := range lualist {
  49. pc := l["param_common"].([]interface{})
  50. site := qu.ObjToString(pc[1])
  51. href := ""
  52. if len(pc) >= 12 {
  53. href = qu.ObjToString(pc[11])
  54. }
  55. lm_movevent := l["lm_movevent"]
  56. event := l["event"]
  57. if tmp := siteMap[site]; tmp == nil {
  58. data = append(data, map[string]interface{}{"site": site, "lm_movevent": lm_movevent, "event": event})
  59. siteMap[site] = []map[string]interface{}{
  60. map[string]interface{}{
  61. "code": l["code"],
  62. "channel": pc[2],
  63. "event": event,
  64. "lm_movevent": lm_movevent,
  65. "href": href,
  66. "lm_download": l["lm_download"],
  67. "state": l["state"],
  68. },
  69. }
  70. } else {
  71. tmp = append(tmp, map[string]interface{}{
  72. "code": l["code"],
  73. "channel": pc[2],
  74. "event": event,
  75. "lm_movevent": lm_movevent,
  76. "href": href,
  77. "lm_download": l["lm_download"],
  78. "state": l["state"],
  79. })
  80. siteMap[site] = tmp
  81. }
  82. }
  83. lm.SetSession("sitemap", siteMap)
  84. lm.ServeJson(map[string]interface{}{"data": data})
  85. }
  86. } else {
  87. lm.Write("您没有权限")
  88. }
  89. }
  90. func (lm *LuaMove) LuaMoveSite(site string) {
  91. defer qu.Catch()
  92. if lm.Method() == "GET" {
  93. events := []string{}
  94. for k, _ := range util.Config.Uploadevents {
  95. events = append(events, k)
  96. }
  97. sort.Strings(events)
  98. lm.T["events"] = events
  99. lm.T["site"] = site
  100. lm.Render("luamovecode.html", &lm.T)
  101. }
  102. }
  103. func (lm *LuaMove) LuaMoveCodeInfo() {
  104. site := lm.GetString("site")
  105. sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
  106. lm.ServeJson(map[string]interface{}{"data": sitemap[site]})
  107. }
  108. func (lm *LuaMove) LuaMoveBySite() {
  109. defer qu.Catch()
  110. site := lm.GetString("site")
  111. stype := lm.GetString("stype")
  112. //movevent, _ := lm.GetInteger("movevent")
  113. sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
  114. codesinfo := sitemap[site]
  115. codes := []string{}
  116. for _, info := range codesinfo {
  117. code := qu.ObjToString(info["code"])
  118. codes = append(codes, code)
  119. }
  120. ok := false
  121. if stype == "move" { //迁移
  122. ok = SpiderMoveLua(codes)
  123. } else if stype == "close" { //关闭
  124. ok, _ = lm.SpiderCloseMoveLua(site, codes, "site")
  125. }
  126. lm.ServeJson(map[string]interface{}{"ok": ok})
  127. }
  128. func (lm *LuaMove) LuaMoveByCode() {
  129. defer qu.Catch()
  130. site := lm.GetString("site")
  131. code := lm.GetString("code")
  132. stype := lm.GetString("stype")
  133. //movevent, _ := lm.GetInteger("movevent")
  134. codes := strings.Split(code, ",")
  135. ok := false
  136. flush := false
  137. if stype == "move" { //迁移
  138. ok = SpiderMoveLua(codes)
  139. } else if stype == "close" { //关闭
  140. ok, flush = lm.SpiderCloseMoveLua(site, codes, "code")
  141. }
  142. qu.Debug(ok)
  143. lm.ServeJson(map[string]interface{}{"ok": ok, "flush": flush})
  144. }
  145. func (lm *LuaMove) UpdateEventBySite() {
  146. defer qu.Catch()
  147. site := lm.GetString("site")
  148. movevent, _ := lm.GetInteger("movevent")
  149. sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
  150. codes := []string{}
  151. for _, info := range sitemap[site] {
  152. code := qu.ObjToString(info["code"])
  153. codes = append(codes, code)
  154. info["lm_movevent"] = movevent
  155. }
  156. lm.SetSession("sitemap", sitemap)
  157. query := bson.M{
  158. "code": bson.M{
  159. "$in": codes,
  160. },
  161. }
  162. set := bson.M{
  163. "$set": bson.M{
  164. "lm_movevent": movevent,
  165. },
  166. }
  167. mgdb.Update("luaconfig", query, set, false, true)
  168. }
  169. //爬虫迁移
  170. func SpiderMoveLua(codes []string) bool {
  171. defer qu.Catch()
  172. for _, code := range codes {
  173. lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  174. movevent := qu.IntAll(lua["lm_movevent"])
  175. event := qu.IntAll(lua["event"])
  176. state := qu.IntAll(lua["state"])
  177. upresult := true
  178. var err interface{}
  179. if state < 7 || state == 4 { //无发布、需登录、无法处理、已删除状态无需下架
  180. //下架
  181. upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架
  182. }
  183. if upresult && err == nil { //下架成功,更新节点
  184. //更新节点
  185. query := bson.M{
  186. "code": code,
  187. }
  188. set := bson.M{
  189. "$set": bson.M{
  190. "event": movevent,
  191. },
  192. }
  193. mgdb.Update("luaconfig", query, set, false, false)
  194. //上架
  195. if state == 5 { //只有是已上架状态的爬虫上架
  196. upresult, err = spider.UpdateSpiderByCodeState(code, "5", movevent) //脚本上架
  197. if !upresult || err != nil {
  198. SendMail("爬虫定期节点迁移" + code + "上架失败")
  199. qu.Debug("定期节点迁移", code, "上架失败")
  200. return false
  201. }
  202. }
  203. } else {
  204. SendMail("爬虫定期节点迁移" + code + "下架失败")
  205. qu.Debug("定期节点迁移", code, "下架失败")
  206. return false
  207. }
  208. }
  209. return true
  210. }
  211. //更新爬虫是否要迁移的状态
  212. func (lm *LuaMove) SpiderCloseMoveLua(site string, codes []string, by string) (bool, bool) {
  213. defer qu.Catch()
  214. flush := false
  215. query := bson.M{
  216. "code": bson.M{
  217. "$in": codes,
  218. },
  219. }
  220. set := bson.M{
  221. "$set": bson.M{
  222. "lm_ismove": false,
  223. },
  224. }
  225. sitemap := lm.GetSession("sitemap").(map[string][]map[string]interface{})
  226. //清除session
  227. if by == "code" {
  228. tmpArr := []map[string]interface{}{}
  229. infoArr := sitemap[site]
  230. for _, info := range infoArr {
  231. flag := false
  232. for _, code := range codes {
  233. if qu.ObjToString(info["code"]) == code {
  234. flag = true
  235. break
  236. }
  237. }
  238. if !flag {
  239. tmpArr = append(tmpArr, info)
  240. }
  241. }
  242. if len(tmpArr) == 0 {
  243. delete(sitemap, site)
  244. flush = true
  245. } else {
  246. sitemap[site] = tmpArr
  247. }
  248. lm.SetSession("sitemap", sitemap)
  249. }
  250. return mgdb.Update("luaconfig", query, set, false, true), flush
  251. }
  252. //
  253. func SpiderMoveEvent(data string) {
  254. //解析爬虫代码
  255. data = util.Se.DecodeString(data)
  256. infos := []interface{}{}
  257. err := json.Unmarshal([]byte(data), &infos)
  258. if err != nil {
  259. qu.Debug("历史迁移到增量节点失败:", data)
  260. return
  261. }
  262. code := qu.ObjToString(infos[0])
  263. //迁移节点并上架
  264. lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  265. spidertype := qu.ObjToString(lua["spidertype"])
  266. event := qu.IntAll(lua["event"])
  267. var upresult bool
  268. qu.Debug("lua move:", code, event)
  269. if spidertype == "history" {
  270. newevent := GetEvent(code, lua)
  271. qu.Debug("new event:", newevent)
  272. mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": newevent, "spidertype": "increment"}}, false, false)
  273. upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
  274. }
  275. ok := false
  276. if upresult && err == nil { //上架成功
  277. ok = true
  278. qu.Debug("Code:", code, "历史迁移到增量节点成功")
  279. } else { //上架失败
  280. mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
  281. qu.Debug("Code:", code, "历史迁移到增量节点失败")
  282. }
  283. mgdb.Save("luamovelog", map[string]interface{}{
  284. "code": code,
  285. "comeintime": time.Now().Unix(),
  286. "type": "movevent",
  287. "ok": ok,
  288. })
  289. }
  290. //
  291. func GetEvent(code string, lua map[string]interface{}) int {
  292. defer qu.Catch()
  293. //1、历史节点
  294. if lua["historyevent"] != nil {
  295. return qu.IntAll(lua["historyevent"])
  296. }
  297. //2、根据站点找节点
  298. param_common := lua["param_common"].([]interface{})
  299. site := qu.ObjToString(param_common[1])
  300. query := map[string]interface{}{
  301. "code": map[string]interface{}{
  302. "$ne": code,
  303. },
  304. "param_common.1": site,
  305. "state": 5,
  306. }
  307. tmp := *mgdb.FindOne("luaconfig", query)
  308. if tmp != nil && len(tmp) > 0 {
  309. return qu.IntAll(tmp["event"])
  310. }
  311. //3、7700
  312. spidermovevent := qu.ObjToString(lua["spidermovevent"])
  313. if spidermovevent == "7700" {
  314. return 7700
  315. }
  316. //4、根据数量分配节点
  317. num := 0
  318. result := 7700
  319. for k, t := range util.Config.Uploadevents {
  320. if qu.ObjToString(t) == spidermovevent { //bid、comm
  321. event := qu.IntAll(k)
  322. count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
  323. if num == 0 || count < num {
  324. result = event
  325. num = count
  326. }
  327. }
  328. }
  329. return result
  330. }
  331. func SendMail(text string) {
  332. defer qu.Catch()
  333. for i := 1; i <= 3; i++ {
  334. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text))
  335. if err == nil {
  336. res.Body.Close()
  337. read, err := ioutil.ReadAll(res.Body)
  338. qu.Debug("邮件发送:", string(read), err)
  339. break
  340. }
  341. }
  342. }