msgservice.go 10.0 KB


  1. // msgservice
  2. package spider
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. mu "mfw/util"
  9. qu "qfw/util"
  10. "strings"
  11. util "spiderutil"
  12. "time"
  13. "github.com/donnie4w/go-logger/logger"
  14. )
  15. type DynamicIPMap struct {
  16. Code string
  17. InvalidTime int64
  18. }
  19. var Msclient *mu.Client
  20. var MsclientFile *mu.Client
  21. var MsclientChromedp *mu.Client
  22. var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  23. var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  24. var AlldownloaderChromedp map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  25. //初始化,启动消息客户端
  26. func InitMsgClient(serveraddr, name string) {
  27. Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  28. MsgServerAddr: serveraddr,
  29. EventHandler: processevent,
  30. OnRequestConnect: func() {
  31. log.Println("重连", serveraddr, name)
  32. },
  33. OnConnectSuccess: func() {
  34. log.Println("重连成功")
  35. },
  36. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE, util.Config.Uploadevent},
  37. ReadBufferSize: 500,
  38. WriteBufferSize: 500,
  39. })
  40. go gc4Alldownloader()
  41. }
  42. //初始化,启动消息客户端File
  43. func InitMsgClientFile(serveraddr, name string) {
  44. MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  45. MsgServerAddr: serveraddr,
  46. EventHandler: processeventFile,
  47. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  48. ReadBufferSize: 500,
  49. WriteBufferSize: 500,
  50. })
  51. go gc4AlldownloaderFile()
  52. }
  53. //初始化,启动消息客户端chromedp
  54. func InitMsgClientChromedp(serveraddr, name string) {
  55. MsclientChromedp, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  56. MsgServerAddr: serveraddr,
  57. EventHandler: processeventChromedp,
  58. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  59. ReadBufferSize: 200,
  60. WriteBufferSize: 200,
  61. })
  62. go gc4AlldownloaderChromedp()
  63. }
  64. //
  65. func processevent(p *mu.Packet) {
  66. defer qu.Catch()
  67. var data []byte
  68. switch p.Event {
  69. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  70. data = p.GetBusinessData()
  71. //log.Println("获取动态地址:", len(data), string(data))
  72. for i := 0; i < len(data)/8; i++ {
  73. code := string(data[i*8 : (i+1)*8])
  74. Alldownloader[code] = DynamicIPMap{
  75. Code: code,
  76. InvalidTime: time.Now().Unix() + 60*10,
  77. }
  78. }
  79. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  80. data = p.GetBusinessData()
  81. //log.Println("删除动态地址:", len(data), string(data))
  82. for i := 0; i < len(data)/8; i++ {
  83. code := string(data[i*8 : (i+1)*8])
  84. delete(Alldownloader, code)
  85. }
  86. case int32(util.Config.Uploadevent):
  87. param := map[string]interface{}{}
  88. json.Unmarshal(p.GetBusinessData(), &param)
  89. ret := map[string]interface{}{}
  90. if param["code"] != nil {
  91. b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string))
  92. ret["b"] = b
  93. ret["err"] = err
  94. } else {
  95. ret["b"] = false
  96. ret["err"] = "code或state值不存在"
  97. }
  98. Msclient.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret)
  99. }
  100. }
  101. //
  102. func processeventFile(p *mu.Packet) {
  103. defer qu.Catch()
  104. var data []byte
  105. switch p.Event {
  106. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  107. data = p.GetBusinessData()
  108. //log.Println("获取动态地址:", len(data), string(data))
  109. for i := 0; i < len(data)/8; i++ {
  110. code := string(data[i*8 : (i+1)*8])
  111. AlldownloaderFile[code] = DynamicIPMap{
  112. Code: code,
  113. InvalidTime: time.Now().Unix() + 60*10,
  114. }
  115. }
  116. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  117. data = p.GetBusinessData()
  118. //log.Println("删除动态地址:", len(data), string(data))
  119. for i := 0; i < len(data)/8; i++ {
  120. code := string(data[i*8 : (i+1)*8])
  121. delete(AlldownloaderFile, code)
  122. }
  123. case int32(util.Config.Uploadevent):
  124. param := map[string]interface{}{}
  125. json.Unmarshal(p.GetBusinessData(), &param)
  126. ret := map[string]interface{}{}
  127. if param["code"] != nil {
  128. b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string))
  129. ret["b"] = b
  130. ret["err"] = err
  131. } else {
  132. ret["b"] = false
  133. ret["err"] = "code或state值不存在"
  134. }
  135. MsclientFile.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret)
  136. }
  137. }
  138. func processeventChromedp(p *mu.Packet) {
  139. defer qu.Catch()
  140. var data []byte
  141. switch p.Event {
  142. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  143. data = p.GetBusinessData()
  144. //log.Println("获取动态地址:", len(data), string(data))
  145. for i := 0; i < len(data)/8; i++ {
  146. code := string(data[i*8 : (i+1)*8])
  147. AlldownloaderChromedp[code] = DynamicIPMap{
  148. Code: code,
  149. InvalidTime: time.Now().Unix() + 60*10,
  150. }
  151. }
  152. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  153. data = p.GetBusinessData()
  154. //log.Println("删除动态地址:", len(data), string(data))
  155. for i := 0; i < len(data)/8; i++ {
  156. code := string(data[i*8 : (i+1)*8])
  157. delete(AlldownloaderChromedp, code)
  158. }
  159. }
  160. }
  161. //
  162. func gc4Alldownloader() {
  163. n := time.Now().Unix()
  164. for _, v := range Alldownloader {
  165. if v.InvalidTime < n {
  166. delete(Alldownloader, v.Code)
  167. }
  168. }
  169. util.TimeAfterFunc(1*time.Minute, gc4Alldownloader, TimeChan)
  170. }
  171. //
  172. func gc4AlldownloaderFile() {
  173. n := time.Now().Unix()
  174. for _, v := range AlldownloaderFile {
  175. if v.InvalidTime < n {
  176. delete(AlldownloaderFile, v.Code)
  177. }
  178. }
  179. util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderFile, TimeChan)
  180. }
  181. func gc4AlldownloaderChromedp() {
  182. n := time.Now().Unix()
  183. for _, v := range AlldownloaderChromedp {
  184. if v.InvalidTime < n {
  185. delete(AlldownloaderChromedp, v.Code)
  186. }
  187. }
  188. util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderChromedp, TimeChan)
  189. }
  190. //获取一个下载点
  191. func GetOneDownloader() string {
  192. if len(Alldownloader) < 1 {
  193. return ""
  194. }
  195. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  196. pos := r.Intn(len(Alldownloader))
  197. index := 0
  198. retcode := ""
  199. for k, _ := range Alldownloader {
  200. if index == pos {
  201. retcode = k
  202. break
  203. }
  204. index++
  205. }
  206. //log.Printf("Alldownloader-len:%d,currentdownloader:%s\n", len(Alldownloader), retcode)
  207. return retcode
  208. }
  209. //获取一个下载点
  210. func GetOneDownloaderFile() string {
  211. if len(AlldownloaderFile) < 1 {
  212. return ""
  213. }
  214. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  215. pos := r.Intn(len(AlldownloaderFile))
  216. index := 0
  217. retcode := ""
  218. for k, _ := range AlldownloaderFile {
  219. if index == pos {
  220. retcode = k
  221. break
  222. }
  223. index++
  224. }
  225. return retcode
  226. }
  227. //完成消息通知
  228. func SendMsgService(event int, data []map[string]interface{}) {
  229. switch event {
  230. case mu.SERVICE_YCML_SAVE: //通知异常名录下载完成
  231. Msclient.WriteObj("", "", mu.SERVICE_YCML_NOTICE, mu.SENDTO_TYPE_ALL_RECIVER, qu.ObjToString(data[0]["area"]))
  232. default:
  233. }
  234. }
  235. //调用消息批量保存
  236. func SaveObjBlak(event int, checkAtrr string, c string, data []map[string]interface{}) {
  237. defer qu.Catch()
  238. tmp, _ := json.Marshal([]interface{}{checkAtrr, data})
  239. switch event {
  240. case mu.SERVICE_YCML_SAVE: //异常名录
  241. Msclient.WriteObj("", "", mu.SERVICE_YCML_SAVE, mu.SENDTO_TYPE_ALL_RECIVER, tmp)
  242. case mu.SERVICE_INVNAME_ANALYSIS: //存入企业名录(公示)
  243. names := []string{}
  244. area := ""
  245. for _, v := range data {
  246. if area == "" {
  247. area = qu.ObjToString(v["area"])
  248. }
  249. names = append(names, qu.ObjToString(v["title"]))
  250. }
  251. if area != "" && len(names) > 0 {
  252. rep := map[string]interface{}{"names": names, "area": area}
  253. logger.Debug(rep)
  254. Msclient.WriteObj("", "", mu.SERVICE_INVNAME_ANALYSIS, mu.SENDTO_TYPE_ALL_RECIVER, rep)
  255. }
  256. default:
  257. flag := true
  258. for i := 1; i < 6; i++ {
  259. bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 120)
  260. if string(bs) != "" && err == nil {
  261. flag = false
  262. break
  263. }
  264. util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan)
  265. }
  266. if flag {
  267. for k, info := range data {
  268. info["sendflag"] = "false"
  269. data[k] = info
  270. }
  271. logger.Error("未成功传送信息-批量", event, len(data), data[0]["spidercode"])
  272. }
  273. MgoS.SaveBulk("data_bak", data...)
  274. }
  275. }
  276. //调用消息保存
  277. func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) {
  278. bs, _ := json.Marshal(data)
  279. size := len(bs) / (1024 * 1024)
  280. if size > 10 { //超大数据过滤
  281. href := fmt.Sprint(data["href"])
  282. util.AddBloomRedis("href", href)
  283. data["detail"] = "" //字段太大
  284. data["contenthtml"] = "" //字段太大
  285. MgoS.Save("spider_filterdata", data)
  286. //log.Println(event, checkAtrr, data["href"], data["title"], len(bs))
  287. return
  288. }
  289. defer qu.Catch()
  290. tmp, _ := json.Marshal([]interface{}{checkAtrr, []interface{}{data}})
  291. switch event {
  292. case mu.SERVICE_SPIDER_ECPS: //著作权等服务
  293. Msclient.WriteObj("", "", mu.SERVICE_SPIDER_ECPS, mu.SENDTO_TYPE_ALL_RECIVER, data)
  294. default:
  295. flag := true
  296. idAndColl := ""
  297. for i := 1; i < 6; i++ {
  298. bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 30)
  299. idAndColl = string(bs)
  300. if idAndColl != "" && err == nil {
  301. flag = false
  302. break
  303. }
  304. util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan)
  305. }
  306. //qu.Debug("----------save-------")
  307. if flag {
  308. data["sendflag"] = "false"
  309. logger.Error("未成功传送信息", event, data["title"])
  310. } else {
  311. data["sendflag"] = "true"
  312. }
  313. href := fmt.Sprint(data["href"])
  314. if len(href) > 5 && saveredis { //有效数据
  315. if arr := strings.Split(idAndColl, "+"); len(arr) == 2 { //保存服务未成功推送的信息(异常、重复等),返回值不是id
  316. data["biddingid"] = arr[0]
  317. data["biddingcoll"] = arr[1]
  318. }
  319. MgoS.Save("data_bak", data)
  320. //DataBakSaveCache <- data
  321. }
  322. }
  323. }
  324. //从微信端获取验证码
  325. func GetCodeByWx(img []byte) (string, error) {
  326. msgid := mu.UUID(8)
  327. ret, err := GetMsgFromWx(msgid, img, true, 300)
  328. if err != nil {
  329. GetMsgFromWx(msgid, nil, false, 20)
  330. }
  331. tmp := make(map[string]interface{})
  332. json.Unmarshal(ret, &tmp)
  333. return qu.ObjToString(tmp["content"]), err
  334. }
  335. //从微信获取验证码消息
  336. func GetMsgFromWx(msgid string, img []byte, falg bool, timeout int64) ([]byte, error) {
  337. ret, err := Msclient.Call("", msgid, mu.SERVICE_DISTINGUISH, mu.SENDTO_TYPE_ALL_RECIVER,
  338. map[string]interface{}{
  339. "img": img,
  340. "flag": falg,
  341. }, timeout)
  342. return ret, err
  343. }