luamove.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package front
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/http"
  6. qu "qfw/util"
  7. "sort"
  8. "spider"
  9. util "spiderutil"
  10. "strings"
  11. "time"
  12. u "util"
  13. "github.com/go-xweb/xweb"
  14. )
  15. type LuaMove struct {
  16. *xweb.Action
  17. luaMove xweb.Mapper `xweb:"/center/luamove"` //站点列表
  18. luaMoveByCode xweb.Mapper `xweb:"/center/luamove/luamovebycode"` //
  19. closeAll xweb.Mapper `xweb:"/center/luamove/closeall"` //关闭所有爬虫
  20. updateToEvent xweb.Mapper `xweb:"/center/luamove/updatetoevent"` //更新目标节点
  21. }
  22. func (lm *LuaMove) LuaMove() {
  23. defer qu.Catch()
  24. auth := qu.IntAll(lm.GetSession("auth"))
  25. if auth == u.Role_Admin {
  26. if lm.Method() == "GET" {
  27. events := []string{}
  28. for k, _ := range util.Config.Uploadevents {
  29. events = append(events, k)
  30. }
  31. sort.Strings(events)
  32. lm.T["events"] = events
  33. lm.Render("luamovelist.html")
  34. } else {
  35. state, _ := lm.GetInteger("state")
  36. fromevent, _ := lm.GetInteger("fromevent")
  37. toevent, _ := lm.GetInteger("toevent")
  38. ismove, _ := lm.GetInteger("ismove")
  39. start, _ := lm.GetInteger("start")
  40. limit, _ := lm.GetInteger("length")
  41. draw, _ := lm.GetInteger("draw")
  42. searchStr := lm.GetString("search[value]")
  43. search := strings.TrimSpace(searchStr)
  44. query := map[string]interface{}{}
  45. if state != -1 { //是否已处理筛选
  46. query["state"] = state
  47. }
  48. if fromevent != -1 { //历史节点筛选
  49. query["fromevent"] = fromevent
  50. }
  51. if toevent != -1 { //目标节点筛选
  52. query["toevent"] = toevent
  53. }
  54. if ismove != -1 { //是否转移筛选
  55. query["ismove"] = ismove == 1
  56. }
  57. if search != "" {
  58. query["$or"] = []interface{}{
  59. map[string]interface{}{"code": map[string]interface{}{"$regex": search}},
  60. map[string]interface{}{"site": map[string]interface{}{"$regex": search}},
  61. }
  62. }
  63. year, month, _ := time.Now().Date()
  64. //本月1日时间戳
  65. firstOfMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix()
  66. query["comeintime"] = map[string]interface{}{ //本月的信息
  67. "$gte": firstOfMonth,
  68. }
  69. sort := `{"%s":%d}`
  70. orderIndex := lm.GetString("order[0][column]")
  71. orderName := lm.GetString(fmt.Sprintf("columns[%s][data]", orderIndex))
  72. orderType := 1
  73. if lm.GetString("order[0][dir]") != "asc" {
  74. orderType = -1
  75. }
  76. sort = fmt.Sprintf(sort, orderName, orderType)
  77. count := u.MgoEB.Count("luamovevent", query)
  78. qu.Debug("query:", query, sort, count)
  79. data, _ := u.MgoEB.Find("luamovevent", query, sort, nil, false, start, limit)
  80. for _, d := range *data {
  81. lua, _ := u.MgoEB.FindOneByField("luaconfig", map[string]interface{}{"code": d["code"]}, `{"modifytime":1,"event":1}`)
  82. d["modifytime"] = (*lua)["modifytime"]
  83. d["nowevent"] = (*lua)["event"]
  84. d["encode"] = util.Se.Encode2Hex(qu.ObjToString(d["code"]))
  85. }
  86. lm.ServeJson(map[string]interface{}{
  87. "draw": draw,
  88. "data": data,
  89. "recordsFiltered": count,
  90. "recordsTotal": count,
  91. })
  92. }
  93. } else {
  94. lm.Write("您没有权限")
  95. }
  96. }
  97. func (lm *LuaMove) LuaMoveByCode() {
  98. defer qu.Catch()
  99. stype := lm.GetString("stype")
  100. code := lm.GetString("code")
  101. movevent := lm.GetString("movevent")
  102. events := strings.Split(movevent, ",")
  103. codes := strings.Split(code, ",")
  104. ok := false
  105. if stype == "move" { //迁移
  106. if len(codes) != len(events) {
  107. qu.Debug("爬虫个数和节点个数不匹配:", codes, events)
  108. } else {
  109. ok = SpiderMoveLua(codes, events) //节点转移
  110. }
  111. } else if stype == "close" { //关闭,更新数据状态
  112. arr := [][]map[string]interface{}{}
  113. for _, code := range codes {
  114. arr = append(arr, []map[string]interface{}{
  115. map[string]interface{}{"code": code},
  116. map[string]interface{}{
  117. "$set": map[string]interface{}{
  118. "state": 1,
  119. "updatetime": time.Now().Unix(),
  120. },
  121. },
  122. })
  123. }
  124. ok = u.MgoEB.UpdateBulk("luamovevent", arr...)
  125. arr = [][]map[string]interface{}{}
  126. }
  127. lm.ServeJson(map[string]interface{}{"ok": ok})
  128. }
  129. func (lm *LuaMove) CloseAll() {
  130. defer qu.Catch()
  131. //ok := u.MgoEB.Update("luamovevent", map[string]interface{}{"state": 0}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}}, false, true)
  132. ok := u.MgoEB.Del("luamovevent", nil)
  133. lm.ServeJson(map[string]interface{}{"ok": ok})
  134. }
  135. func (lm *LuaMove) UpdateToEvent() {
  136. defer qu.Catch()
  137. id := lm.GetString("id")
  138. event, _ := lm.GetInteger("event")
  139. qu.Debug(id, event)
  140. ok := u.MgoEB.UpdateById("luamovevent", id, map[string]interface{}{"$set": map[string]interface{}{"toevent": event, "updatetime": time.Now().Unix()}})
  141. lm.ServeJson(map[string]interface{}{"ok": ok})
  142. }
  143. // 爬虫迁移
  144. func SpiderMoveLua(codes []string, events []string) bool {
  145. defer qu.Catch()
  146. msg := []string{}
  147. for index, code := range codes {
  148. resultEvent := events[index]
  149. qu.Debug("爬虫节点转移:", code, resultEvent)
  150. lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
  151. if len(*lua) > 0 {
  152. state := qu.IntAll((*lua)["state"])
  153. event := qu.IntAll((*lua)["event"])
  154. if state == 5 { //查询该爬虫是否是已上架状态,是则更新节点后上架,否则只更新
  155. upresult := true
  156. var err interface{}
  157. upresult, err = spider.UpdateSpiderByCodeState(code, "6", event, false) //脚本下架
  158. if upresult && err == nil { //下架成功,更新节点
  159. re := qu.IntAll(resultEvent)
  160. u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": re, "incrementevent": re}}, false, false)
  161. //上架
  162. upresult, err = spider.UpdateSpiderByCodeState(code, "5", re, false) //脚本上架
  163. if !upresult || err != nil {
  164. qu.Debug("爬虫节点转移", code, "上架失败")
  165. msg = append(msg, "爬虫节点转移"+code+"上架失败")
  166. }
  167. } else {
  168. qu.Debug("爬虫节点转移", code, "下架失败")
  169. msg = append(msg, "爬虫节点转移"+code+"下架失败")
  170. }
  171. }
  172. u.MgoEB.Update("luamovevent", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}}, false, false)
  173. } else {
  174. msg = append(msg, "爬虫节点转移未找到爬虫"+code)
  175. }
  176. }
  177. if len(msg) > 0 {
  178. SendMail(strings.Join(msg, ";"))
  179. }
  180. return true
  181. }
  182. func SendMail(text string) {
  183. defer qu.Catch()
  184. for i := 1; i <= 3; i++ {
  185. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", util.Config.JkMail["api"], util.Config.JkMail["to"], "lua-timeluamove-err", text))
  186. if err == nil {
  187. res.Body.Close()
  188. read, err := ioutil.ReadAll(res.Body)
  189. qu.Debug("邮件发送:", string(read), err)
  190. break
  191. }
  192. }
  193. }
  194. /*func SpiderMoveEvent(data string) {
  195. //解析爬虫代码
  196. data = util.Se.DecodeString(data)
  197. infos := []interface{}{}
  198. err := json.Unmarshal([]byte(data), &infos)
  199. if err != nil {
  200. qu.Debug("历史迁移到增量节点失败:", data)
  201. return
  202. }
  203. code := qu.ObjToString(infos[0])
  204. //迁移节点并上架
  205. //lua := *mgdb.FindOne("luaconfig", map[string]interface{}{"code": code})
  206. lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
  207. spidertype := qu.ObjToString((*lua)["spidertype"])
  208. event := qu.IntAll((*lua)["event"])
  209. var upresult bool
  210. set := map[string]interface{}{}
  211. qu.Debug("lua move:", code, event)
  212. if spidertype == "history" {
  213. newevent := GetEvent(code, (*lua))
  214. qu.Debug("new event:", newevent)
  215. set["event"] = newevent
  216. set["spidertype"] = "increment"
  217. type_content, _ := (*lua)["type_content"].(int)
  218. iscopycontent, _ := (*lua)["iscopycontent"].(bool)
  219. str_content := qu.ObjToString((*lua)["str_content"])
  220. str_recontent := qu.ObjToString((*lua)["str_recontent"])
  221. if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码
  222. set["iscopycontent"] = false
  223. set["str_content"] = str_recontent
  224. set["str_recontent"] = str_content
  225. }
  226. //if mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) {
  227. // upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
  228. //}
  229. if u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) {
  230. upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
  231. }
  232. }
  233. ok := false
  234. if upresult && err == nil { //上架成功
  235. ok = true
  236. qu.Debug("Code:", code, "历史迁移到增量节点成功")
  237. } else { //上架失败
  238. //mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
  239. u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
  240. qu.Debug("Code:", code, "历史迁移到增量节点失败")
  241. }
  242. u.MgoEB.Save("luamovelog", map[string]interface{}{
  243. "code": code,
  244. "comeintime": time.Now().Unix(),
  245. "type": "movevent",
  246. "ok": ok,
  247. })
  248. }
  249. //
  250. func GetEvent(code string, lua map[string]interface{}) int {
  251. defer qu.Catch()
  252. //1、历史节点
  253. if lua["incrementevent"] != nil {
  254. return qu.IntAll(lua["incrementevent"])
  255. }
  256. //2、根据站点找节点
  257. param_common := lua["param_common"].([]interface{})
  258. site := qu.ObjToString(param_common[1])
  259. query := map[string]interface{}{
  260. "code": map[string]interface{}{
  261. "$ne": code,
  262. },
  263. "param_common.1": site,
  264. "state": 5,
  265. }
  266. //tmp := *mgdb.FindOne("luaconfig", query)
  267. tmp, _ := u.MgoEB.FindOne("luaconfig", query)
  268. if tmp != nil && len(*tmp) > 0 {
  269. return qu.IntAll((*tmp)["event"])
  270. }
  271. //3、7700
  272. spidermovevent := qu.ObjToString(lua["spidermovevent"])
  273. if spidermovevent == "7700" {
  274. return 7700
  275. }
  276. //4、根据数量分配节点
  277. num := 0
  278. result := 7700
  279. for k, t := range util.Config.Uploadevents {
  280. if qu.ObjToString(t) == spidermovevent { //bid、comm
  281. event := qu.IntAll(k)
  282. //count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
  283. count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
  284. if num == 0 || count < num {
  285. result = event
  286. num = count
  287. }
  288. }
  289. }
  290. return result
  291. }*/