msgservice.go 8.6 KB


  1. // msgservice
  2. package spider
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/rand"
  8. mu "mfw/util"
  9. qu "qfw/util"
  10. //"qfw/util/redis"
  11. util "spiderutil"
  12. "time"
  13. "github.com/donnie4w/go-logger/logger"
  14. )
  15. type DynamicIPMap struct {
  16. Code string
  17. InvalidTime int64
  18. }
  19. var Msclient *mu.Client
  20. var MsclientFile *mu.Client
  21. var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  22. var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap)
  23. //初始化,启动消息客户端
  24. func InitMsgClient(serveraddr, name string) {
  25. Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  26. MsgServerAddr: serveraddr,
  27. EventHandler: processevent,
  28. OnRequestConnect: func() {
  29. log.Println("重连", serveraddr, name)
  30. },
  31. OnConnectSuccess: func() {
  32. log.Println("重连成功")
  33. },
  34. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE, util.Config.Uploadevent},
  35. ReadBufferSize: 500,
  36. WriteBufferSize: 500,
  37. })
  38. go gc4Alldownloader()
  39. }
  40. //初始化,启动消息客户端File
  41. func InitMsgClientFile(serveraddr, name string) {
  42. MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name,
  43. MsgServerAddr: serveraddr,
  44. EventHandler: processeventFile,
  45. CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE},
  46. ReadBufferSize: 200,
  47. WriteBufferSize: 200,
  48. })
  49. go gc4AlldownloaderFile()
  50. }
  51. //
  52. func processevent(p *mu.Packet) {
  53. defer mu.Catch()
  54. var data []byte
  55. switch p.Event {
  56. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  57. data = p.GetBusinessData()
  58. //log.Println("获取动态地址:", len(data), string(data))
  59. for i := 0; i < len(data)/8; i++ {
  60. code := string(data[i*8 : (i+1)*8])
  61. Alldownloader[code] = DynamicIPMap{
  62. Code: code,
  63. InvalidTime: time.Now().Unix() + 60*10,
  64. }
  65. }
  66. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  67. data = p.GetBusinessData()
  68. //log.Println("删除动态地址:", len(data), string(data))
  69. for i := 0; i < len(data)/8; i++ {
  70. code := string(data[i*8 : (i+1)*8])
  71. delete(Alldownloader, code)
  72. }
  73. // case int32(util.Config.Uploadevent):
  74. // param := map[string]interface{}{}
  75. // json.Unmarshal(p.GetBusinessData(), &param)
  76. // ret := map[string]interface{}{}
  77. // if param["code"] != nil {
  78. // b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string))
  79. // ret["b"] = b
  80. // ret["err"] = err
  81. // } else {
  82. // ret["b"] = false
  83. // ret["err"] = "code或state值不存在"
  84. // }
  85. // Msclient.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret)
  86. }
  87. }
  88. //
  89. func processeventFile(p *mu.Packet) {
  90. defer mu.Catch()
  91. var data []byte
  92. switch p.Event {
  93. case mu.SERVICE_DOWNLOAD_APPEND_NODE:
  94. data = p.GetBusinessData()
  95. //log.Println("获取动态地址:", len(data), string(data))
  96. for i := 0; i < len(data)/8; i++ {
  97. code := string(data[i*8 : (i+1)*8])
  98. AlldownloaderFile[code] = DynamicIPMap{
  99. Code: code,
  100. InvalidTime: time.Now().Unix() + 60*10,
  101. }
  102. }
  103. case mu.SERVICE_DOWNLOAD_DELETE_NODE:
  104. data = p.GetBusinessData()
  105. //log.Println("删除动态地址:", len(data), string(data))
  106. for i := 0; i < len(data)/8; i++ {
  107. code := string(data[i*8 : (i+1)*8])
  108. delete(AlldownloaderFile, code)
  109. }
  110. // case int32(util.Config.Uploadevent):
  111. // param := map[string]interface{}{}
  112. // json.Unmarshal(p.GetBusinessData(), &param)
  113. // ret := map[string]interface{}{}
  114. // if param["code"] != nil {
  115. // b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string))
  116. // ret["b"] = b
  117. // ret["err"] = err
  118. // } else {
  119. // ret["b"] = false
  120. // ret["err"] = "code或state值不存在"
  121. // }
  122. // MsclientFile.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret)
  123. //
  124. }
  125. }
  126. //
  127. func gc4Alldownloader() {
  128. n := time.Now().Unix()
  129. for _, v := range Alldownloader {
  130. if v.InvalidTime < n {
  131. delete(Alldownloader, v.Code)
  132. }
  133. }
  134. util.TimeAfterFunc(1*time.Minute, gc4Alldownloader, TimeChan)
  135. }
  136. //
  137. func gc4AlldownloaderFile() {
  138. n := time.Now().Unix()
  139. for _, v := range AlldownloaderFile {
  140. if v.InvalidTime < n {
  141. delete(AlldownloaderFile, v.Code)
  142. }
  143. }
  144. util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderFile, TimeChan)
  145. }
  146. //获取一个下载点
  147. func GetOneDownloader() string {
  148. if len(Alldownloader) < 1 {
  149. return ""
  150. }
  151. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  152. pos := r.Intn(len(Alldownloader))
  153. index := 0
  154. retcode := ""
  155. for k, _ := range Alldownloader {
  156. if index == pos {
  157. retcode = k
  158. break
  159. }
  160. index++
  161. }
  162. //log.Printf("Alldownloader-len:%d,currentdownloader:%s\n", len(Alldownloader), retcode)
  163. return retcode
  164. }
  165. //获取一个下载点
  166. func GetOneDownloaderFile() string {
  167. if len(AlldownloaderFile) < 1 {
  168. return ""
  169. }
  170. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  171. pos := r.Intn(len(AlldownloaderFile))
  172. index := 0
  173. retcode := ""
  174. for k, _ := range AlldownloaderFile {
  175. if index == pos {
  176. retcode = k
  177. break
  178. }
  179. index++
  180. }
  181. return retcode
  182. }
  183. //完成消息通知
  184. func SendMsgService(event int, data []map[string]interface{}) {
  185. switch event {
  186. case mu.SERVICE_YCML_SAVE: //通知异常名录下载完成
  187. Msclient.WriteObj("", "", mu.SERVICE_YCML_NOTICE, mu.SENDTO_TYPE_ALL_RECIVER, qu.ObjToString(data[0]["area"]))
  188. default:
  189. }
  190. }
  191. //调用消息批量保存
  192. func SaveObjBlak(event int, checkAtrr string, c string, data []map[string]interface{}) {
  193. defer mu.Catch()
  194. tmp, _ := json.Marshal([]interface{}{checkAtrr, data})
  195. switch event {
  196. case mu.SERVICE_YCML_SAVE: //异常名录
  197. Msclient.WriteObj("", "", mu.SERVICE_YCML_SAVE, mu.SENDTO_TYPE_ALL_RECIVER, tmp)
  198. case mu.SERVICE_INVNAME_ANALYSIS: //存入企业名录(公示)
  199. names := []string{}
  200. area := ""
  201. for _, v := range data {
  202. if area == "" {
  203. area = qu.ObjToString(v["area"])
  204. }
  205. names = append(names, qu.ObjToString(v["title"]))
  206. }
  207. if area != "" && len(names) > 0 {
  208. rep := map[string]interface{}{"names": names, "area": area}
  209. logger.Debug(rep)
  210. Msclient.WriteObj("", "", mu.SERVICE_INVNAME_ANALYSIS, mu.SENDTO_TYPE_ALL_RECIVER, rep)
  211. }
  212. default:
  213. flag := true
  214. for i := 1; i < 6; i++ {
  215. bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 120)
  216. if string(bs) != "" && err == nil {
  217. flag = false
  218. break
  219. }
  220. util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan)
  221. }
  222. if flag {
  223. for k, info := range data {
  224. info["sendflag"] = "false"
  225. data[k] = info
  226. }
  227. logger.Error("未成功传送信息-批量", event, len(data), data[0]["spidercode"])
  228. }
  229. MgoS.SaveBulk("data_bak", data...)
  230. }
  231. }
  232. //调用消息保存
  233. func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) {
  234. bs, _ := json.Marshal(data)
  235. size := len(bs) / (1024 * 1024)
  236. if size > 10 {
  237. log.Println(event, checkAtrr, data["href"], data["title"], len(bs))
  238. return
  239. }
  240. defer mu.Catch()
  241. tmp, _ := json.Marshal([]interface{}{checkAtrr, []interface{}{data}})
  242. switch event {
  243. case mu.SERVICE_SPIDER_ECPS: //著作权等服务
  244. Msclient.WriteObj("", "", mu.SERVICE_SPIDER_ECPS, mu.SENDTO_TYPE_ALL_RECIVER, data)
  245. default:
  246. flag := true
  247. for i := 1; i < 6; i++ {
  248. bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 30)
  249. if string(bs) != "" && err == nil {
  250. flag = false
  251. break
  252. }
  253. util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan)
  254. }
  255. //qu.Debug("----------save-------")
  256. if flag {
  257. data["sendflag"] = "false"
  258. logger.Error("未成功传送信息", event, data["title"])
  259. } else {
  260. data["sendflag"] = "true"
  261. }
  262. id := MgoS.Save("data_bak", data)
  263. if !flag && id != "" {
  264. href := fmt.Sprint(data["href"])
  265. if len(href) > 5 && saveredis { //有效数据
  266. db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
  267. //增量
  268. util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
  269. //全量(判断是否已存在防止覆盖id)
  270. isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
  271. if !isExist {
  272. util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
  273. }
  274. }
  275. }
  276. }
  277. }
  278. //从微信端获取验证码
  279. func GetCodeByWx(img []byte) (string, error) {
  280. msgid := mu.UUID(8)
  281. ret, err := GetMsgFromWx(msgid, img, true, 300)
  282. if err != nil {
  283. GetMsgFromWx(msgid, nil, false, 20)
  284. }
  285. tmp := make(map[string]interface{})
  286. json.Unmarshal(ret, &tmp)
  287. return qu.ObjToString(tmp["content"]), err
  288. }
  289. //从微信获取验证码消息
  290. func GetMsgFromWx(msgid string, img []byte, falg bool, timeout int64) ([]byte, error) {
  291. ret, err := Msclient.Call("", msgid, mu.SERVICE_DISTINGUISH, mu.SENDTO_TYPE_ALL_RECIVER,
  292. map[string]interface{}{
  293. "img": img,
  294. "flag": falg,
  295. }, timeout)
  296. return ret, err
  297. }