// msgservice package spider import ( "encoding/json" "fmt" "log" "math/rand" mu "mfw/util" qu "qfw/util" "strings" util "spiderutil" "time" "github.com/donnie4w/go-logger/logger" ) type DynamicIPMap struct { Code string InvalidTime int64 } var Msclient *mu.Client var MsclientFile *mu.Client var MsclientChromedp *mu.Client var Alldownloader map[string]DynamicIPMap = make(map[string]DynamicIPMap) var AlldownloaderFile map[string]DynamicIPMap = make(map[string]DynamicIPMap) var AlldownloaderChromedp map[string]DynamicIPMap = make(map[string]DynamicIPMap) //初始化,启动消息客户端 func InitMsgClient(serveraddr, name string) { Msclient, _ = mu.NewClient(&mu.ClientConfig{ClientName: name, MsgServerAddr: serveraddr, EventHandler: processevent, OnRequestConnect: func() { log.Println("重连", serveraddr, name) }, OnConnectSuccess: func() { log.Println("重连成功") }, CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE, util.Config.Uploadevent}, ReadBufferSize: 500, WriteBufferSize: 500, }) go gc4Alldownloader() } //初始化,启动消息客户端File func InitMsgClientFile(serveraddr, name string) { MsclientFile, _ = mu.NewClient(&mu.ClientConfig{ClientName: name, MsgServerAddr: serveraddr, EventHandler: processeventFile, CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE}, ReadBufferSize: 500, WriteBufferSize: 500, }) go gc4AlldownloaderFile() } //初始化,启动消息客户端chromedp func InitMsgClientChromedp(serveraddr, name string) { MsclientChromedp, _ = mu.NewClient(&mu.ClientConfig{ClientName: name, MsgServerAddr: serveraddr, EventHandler: processeventChromedp, CanHandleEvents: []int{mu.SERVICE_DOWNLOAD_APPEND_NODE, mu.SERVICE_DOWNLOAD_DELETE_NODE}, ReadBufferSize: 200, WriteBufferSize: 200, }) go gc4AlldownloaderChromedp() } // func processevent(p *mu.Packet) { defer qu.Catch() var data []byte switch p.Event { case mu.SERVICE_DOWNLOAD_APPEND_NODE: data = p.GetBusinessData() //log.Println("获取动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) Alldownloader[code] = DynamicIPMap{ Code: code, InvalidTime: time.Now().Unix() + 60*10, } } case mu.SERVICE_DOWNLOAD_DELETE_NODE: data = p.GetBusinessData() //log.Println("删除动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) delete(Alldownloader, code) } case int32(util.Config.Uploadevent): param := map[string]interface{}{} json.Unmarshal(p.GetBusinessData(), ¶m) ret := map[string]interface{}{} if param["code"] != nil { b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string)) ret["b"] = b ret["err"] = err } else { ret["b"] = false ret["err"] = "code或state值不存在" } Msclient.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret) } } // func processeventFile(p *mu.Packet) { defer qu.Catch() var data []byte switch p.Event { case mu.SERVICE_DOWNLOAD_APPEND_NODE: data = p.GetBusinessData() //log.Println("获取动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) AlldownloaderFile[code] = DynamicIPMap{ Code: code, InvalidTime: time.Now().Unix() + 60*10, } } case mu.SERVICE_DOWNLOAD_DELETE_NODE: data = p.GetBusinessData() //log.Println("删除动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) delete(AlldownloaderFile, code) } case int32(util.Config.Uploadevent): param := map[string]interface{}{} json.Unmarshal(p.GetBusinessData(), ¶m) ret := map[string]interface{}{} if param["code"] != nil { b, err := UpdateSpiderByCodeState(param["code"].(string), param["state"].(string)) ret["b"] = b ret["err"] = err } else { ret["b"] = false ret["err"] = "code或state值不存在" } MsclientFile.WriteObj(p.From, p.Msgid, mu.EVENT_RECIVE_CALLBACK, mu.SENDTO_TYPE_P2P, ret) } } func processeventChromedp(p *mu.Packet) { defer qu.Catch() var data []byte switch p.Event { case mu.SERVICE_DOWNLOAD_APPEND_NODE: data = p.GetBusinessData() //log.Println("获取动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) AlldownloaderChromedp[code] = DynamicIPMap{ Code: code, InvalidTime: time.Now().Unix() + 60*10, } } case mu.SERVICE_DOWNLOAD_DELETE_NODE: data = p.GetBusinessData() //log.Println("删除动态地址:", len(data), string(data)) for i := 0; i < len(data)/8; i++ { code := string(data[i*8 : (i+1)*8]) delete(AlldownloaderChromedp, code) } } } // func gc4Alldownloader() { n := time.Now().Unix() for _, v := range Alldownloader { if v.InvalidTime < n { delete(Alldownloader, v.Code) } } util.TimeAfterFunc(1*time.Minute, gc4Alldownloader, TimeChan) } // func gc4AlldownloaderFile() { n := time.Now().Unix() for _, v := range AlldownloaderFile { if v.InvalidTime < n { delete(AlldownloaderFile, v.Code) } } util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderFile, TimeChan) } func gc4AlldownloaderChromedp() { n := time.Now().Unix() for _, v := range AlldownloaderChromedp { if v.InvalidTime < n { delete(AlldownloaderChromedp, v.Code) } } util.TimeAfterFunc(1*time.Minute, gc4AlldownloaderChromedp, TimeChan) } //获取一个下载点 func GetOneDownloader() string { if len(Alldownloader) < 1 { return "" } r := rand.New(rand.NewSource(time.Now().UnixNano())) pos := r.Intn(len(Alldownloader)) index := 0 retcode := "" for k, _ := range Alldownloader { if index == pos { retcode = k break } index++ } //log.Printf("Alldownloader-len:%d,currentdownloader:%s\n", len(Alldownloader), retcode) return retcode } //获取一个下载点 func GetOneDownloaderFile() string { if len(AlldownloaderFile) < 1 { return "" } r := rand.New(rand.NewSource(time.Now().UnixNano())) pos := r.Intn(len(AlldownloaderFile)) index := 0 retcode := "" for k, _ := range AlldownloaderFile { if index == pos { retcode = k break } index++ } return retcode } //完成消息通知 func SendMsgService(event int, data []map[string]interface{}) { switch event { case mu.SERVICE_YCML_SAVE: //通知异常名录下载完成 Msclient.WriteObj("", "", mu.SERVICE_YCML_NOTICE, mu.SENDTO_TYPE_ALL_RECIVER, qu.ObjToString(data[0]["area"])) default: } } //调用消息批量保存 func SaveObjBlak(event int, checkAtrr string, c string, data []map[string]interface{}) { defer qu.Catch() tmp, _ := json.Marshal([]interface{}{checkAtrr, data}) switch event { case mu.SERVICE_YCML_SAVE: //异常名录 Msclient.WriteObj("", "", mu.SERVICE_YCML_SAVE, mu.SENDTO_TYPE_ALL_RECIVER, tmp) case mu.SERVICE_INVNAME_ANALYSIS: //存入企业名录(公示) names := []string{} area := "" for _, v := range data { if area == "" { area = qu.ObjToString(v["area"]) } names = append(names, qu.ObjToString(v["title"])) } if area != "" && len(names) > 0 { rep := map[string]interface{}{"names": names, "area": area} logger.Debug(rep) Msclient.WriteObj("", "", mu.SERVICE_INVNAME_ANALYSIS, mu.SENDTO_TYPE_ALL_RECIVER, rep) } default: flag := true for i := 1; i < 6; i++ { bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 120) if string(bs) != "" && err == nil { flag = false break } util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan) } if flag { for k, info := range data { info["sendflag"] = "false" data[k] = info } logger.Error("未成功传送信息-批量", event, len(data), data[0]["spidercode"]) } MgoS.SaveBulk("data_bak", data...) } } //调用消息保存 func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) { bs, _ := json.Marshal(data) size := len(bs) / (1024 * 1024) if size > 10 { //超大数据过滤 href := fmt.Sprint(data["href"]) util.AddBloomRedis("href", href) data["detail"] = "" //字段太大 data["contenthtml"] = "" //字段太大 MgoS.Save("spider_filterdata", data) //log.Println(event, checkAtrr, data["href"], data["title"], len(bs)) return } defer qu.Catch() tmp, _ := json.Marshal([]interface{}{checkAtrr, []interface{}{data}}) switch event { case mu.SERVICE_SPIDER_ECPS: //著作权等服务 Msclient.WriteObj("", "", mu.SERVICE_SPIDER_ECPS, mu.SENDTO_TYPE_ALL_RECIVER, data) default: flag := true idAndColl := "" for i := 1; i < 6; i++ { bs, err := Msclient.Call("", mu.UUID(8), event, mu.SENDTO_TYPE_ALL_RECIVER, tmp, 30) idAndColl = string(bs) if idAndColl != "" && err == nil { flag = false break } util.TimeSleepFunc(time.Duration(5*i)*time.Second, TimeSleepChan) } //qu.Debug("----------save-------") if flag { data["sendflag"] = "false" logger.Error("未成功传送信息", event, data["title"]) } else { data["sendflag"] = "true" } href := fmt.Sprint(data["href"]) if len(href) > 5 && saveredis { //有效数据 if arr := strings.Split(idAndColl, "+"); len(arr) == 2 { //保存服务未成功推送的信息(异常、重复等),返回值不是id data["biddingid"] = arr[0] data["biddingcoll"] = arr[1] } MgoS.Save("data_bak", data) //DataBakSaveCache <- data } } } //从微信端获取验证码 func GetCodeByWx(img []byte) (string, error) { msgid := mu.UUID(8) ret, err := GetMsgFromWx(msgid, img, true, 300) if err != nil { GetMsgFromWx(msgid, nil, false, 20) } tmp := make(map[string]interface{}) json.Unmarshal(ret, &tmp) return qu.ObjToString(tmp["content"]), err } //从微信获取验证码消息 func GetMsgFromWx(msgid string, img []byte, falg bool, timeout int64) ([]byte, error) { ret, err := Msclient.Call("", msgid, mu.SERVICE_DISTINGUISH, mu.SENDTO_TYPE_ALL_RECIVER, map[string]interface{}{ "img": img, "flag": falg, }, timeout) return ret, err }