extractudp.go 10 KB


  1. // extractudp
  2. package extract
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "jy/cluster"
  8. db "jy/mongodbutil"
  9. ju "jy/util"
  10. log2 "log"
  11. mu "mfw/util"
  12. "net"
  13. "net/http"
  14. qu "qfw/util"
  15. "strings"
  16. "sync"
  17. log "github.com/donnie4w/go-logger/logger"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var Udpclient mu.UdpClient //udp对象
  21. var nextNodes []map[string]interface{}
  22. //udp通知抽取
  23. func ExtractUdp() {
  24. nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
  25. Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
  26. Udpclient.Listen(processUdpMsg)
  27. }
  28. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  29. switch act {
  30. case mu.OP_TYPE_DATA:
  31. var rep map[string]interface{}
  32. err := json.Unmarshal(data, &rep)
  33. if err != nil {
  34. log.Debug(err)
  35. } else {
  36. sid, _ := rep["gtid"].(string)
  37. eid, _ := rep["lteid"].(string)
  38. stype, _ := rep["stype"].(string)
  39. if sid == "" || eid == "" {
  40. log.Debug("err", "sid=", sid, ",eid=", eid)
  41. } else {
  42. if stype == "distributed" { //分布式抽取分支
  43. go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
  44. log.Debug("分布式抽取id段", sid, " ", eid)
  45. InstanceId := qu.ObjToString(rep["InstanceId"])
  46. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  47. map[string]interface{}{
  48. "$set": map[string]interface{}{
  49. "extstatus": "running",
  50. },
  51. }, true, false)
  52. ExtractByUdp(sid, eid, ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
  53. db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
  54. map[string]interface{}{
  55. "$set": map[string]interface{}{
  56. "extstatus": "ok",
  57. },
  58. }, true, false)
  59. //<-time.NewTimer(time.Minute * time.Duration(qu.IntAll(ju.Config["DeleteInstanceTimeMinute"]))).C
  60. //cluster.DeleteInstance("instanceId[0]")
  61. log.Debug("分布式抽取完成", sid, " ", eid, "释放esc实例", qu.ObjToString(rep["ip"]))
  62. } else {
  63. udpinfo, _ := rep["key"].(string)
  64. if udpinfo == "" {
  65. udpinfo = "udpok"
  66. }
  67. go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
  68. log.Debug("udp通知抽取id段", sid, " ", eid)
  69. ExtractByUdp(sid, eid, ra)
  70. log.Debug("udp通知抽取完成,eid=", eid)
  71. for _, m := range nextNodes {
  72. by, _ := json.Marshal(map[string]interface{}{
  73. "gtid": sid,
  74. "lteid": eid,
  75. "stype": qu.ObjToString(m["stype"]),
  76. })
  77. err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  78. IP: net.ParseIP(m["addr"].(string)),
  79. Port: qu.IntAll(m["port"]),
  80. })
  81. if err != nil {
  82. log.Debug(err)
  83. }
  84. }
  85. }
  86. }
  87. }
  88. case mu.OP_NOOP: //下个节点回应
  89. log.Debug(string(data))
  90. log2.Println(string(data))
  91. case mu.OP_SEND_EMAIL:
  92. log.Debug("实例抽取完成,发送邮件:", string(data), ra.IP)
  93. log2.Println("实例抽取完成,发送邮件:", string(data), ra.IP)
  94. rep := make(map[string]interface{})
  95. err := json.Unmarshal(data, &rep)
  96. if err != nil {
  97. log.Debug(err)
  98. log2.Println(string(data), ra.IP)
  99. } else {
  100. tmpstr := ""
  101. for k, v := range rep {
  102. switch k {
  103. case "desc":
  104. tmpstr += fmt.Sprint(v) + ","
  105. case "count":
  106. tmpstr += "实际抽取数据量" + fmt.Sprint(v) + ","
  107. case "index":
  108. tmpstr += "区间数据量为" + fmt.Sprint(v) + ","
  109. case "instanceId":
  110. tmpstr += "实例" + fmt.Sprint(v) + ","
  111. }
  112. }
  113. tmpstr = strings.TrimRight(tmpstr, ",")
  114. sendMail(tmpstr)
  115. cluster.ModifyInstanceAutoReleaseTime(qu.ObjToString(rep["instanceId"]), qu.IntAll(ju.Config["deleteInstanceTimeHour"]))
  116. }
  117. }
  118. }
  119. func sendMail(content string) {
  120. log2.Println(ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content)
  121. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", ju.Config["api"], ju.Config["tomail"], "jy-data-extract_3.2", "抽取完成:"+content))
  122. defer res.Body.Close()
  123. if err == nil {
  124. read, err := ioutil.ReadAll(res.Body)
  125. log2.Println("邮件发送:", string(read), err)
  126. }
  127. log2.Println("api email:", err)
  128. }
  129. var ext *ExtractTask
  130. //根据id区间抽取
  131. func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
  132. defer qu.Catch()
  133. if ext == nil {
  134. ext = &ExtractTask{}
  135. ext.Id = qu.ObjToString(ju.Config["udptaskid"])
  136. ext.InitTaskInfo()
  137. ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  138. ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
  139. ext.InitRulePres()
  140. ext.InitRuleBacks()
  141. ext.InitRuleCore()
  142. ext.InitBlockRule()
  143. ext.InitTag()
  144. ext.InitClearFn()
  145. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  146. //初始化城市DFA信息
  147. //ext.InitCityDFA()
  148. ext.InitCityInfo()
  149. ext.InitAreaCode()
  150. ext.InitPostCode()
  151. }
  152. //质量审核
  153. ext.InitAuditFields()
  154. ext.InitAuditRule()
  155. ext.InitAuditClass()
  156. ext.InitAuditRecogField()
  157. //品牌抽取是否开启
  158. ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
  159. ext.ResultSave(true)
  160. ext.BidSave(true)
  161. ext.IsRun = true
  162. } else {
  163. ext.BidTotal = 0
  164. }
  165. index := 0
  166. if len(instanceId) > 0 { //分布式抽取进度
  167. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  168. count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  169. count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
  170. count := count1 + count2
  171. pageNum := (count + PageSize - 1) / PageSize
  172. limit := PageSize
  173. if count < PageSize {
  174. limit = count
  175. }
  176. fmt.Printf("count=%d,pageNum=%d,query=%v\n", count, pageNum, query)
  177. startI := 0 //接着上次任务执行
  178. sidback := sid
  179. esc, _ := db.Mgo.FindOne("ecs", `{"InstanceId":"`+instanceId[0]+`"}`)
  180. startI = qu.IntAll((*esc)["pagecurrent"])
  181. if qu.ObjToString((*esc)["lastId"]) != "" {
  182. sid = qu.ObjToString((*esc)["lastId"])
  183. }
  184. if qu.ObjToString((*esc)["lastIdback"]) != "" {
  185. sidback = qu.ObjToString((*esc)["lastIdback"])
  186. }
  187. go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功,count=%d,pageNum=%d,query=%v\n", instanceId[1], count, pageNum, query)), mu.OP_NOOP, ra)
  188. for i := startI; i < pageNum; i++ {
  189. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  190. fmt.Printf("page=%d,query=%v\n", i+1, query)
  191. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query) > 0 {
  192. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  193. for _, v := range *list {
  194. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  195. continue
  196. }
  197. _id := qu.BsonIdToSId(v["_id"])
  198. var j, jf *ju.Job
  199. if ext.IsFileField && v["projectinfo"] != nil {
  200. v["isextFile"] = true
  201. j, jf = ext.PreInfo(v)
  202. } else {
  203. j, _ = ext.PreInfo(v)
  204. }
  205. ext.TaskInfo.ProcessPool <- true
  206. go ext.ExtractProcess(j, jf)
  207. sid = _id
  208. index++
  209. }
  210. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  211. map[string]interface{}{"$set": map[string]interface{}{
  212. "lastId": sid,
  213. }}, true, false)
  214. }
  215. queryback := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sidback)}}
  216. fmt.Printf("page=%d,queryback=%v\n", i+1, queryback)
  217. if ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", queryback) > 0 {
  218. list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", queryback, nil, Fields, false, 0, limit)
  219. for _, v := range *list2 {
  220. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  221. continue
  222. }
  223. _id := qu.BsonIdToSId(v["_id"])
  224. var j, jf *ju.Job
  225. if ext.IsFileField && v["projectinfo"] != nil {
  226. v["isextFile"] = true
  227. j, jf = ext.PreInfo(v)
  228. } else {
  229. j, _ = ext.PreInfo(v)
  230. }
  231. ext.TaskInfo.ProcessPool <- true
  232. go ext.ExtractProcess(j, jf)
  233. sidback = _id
  234. index++
  235. }
  236. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  237. map[string]interface{}{"$set": map[string]interface{}{
  238. "lastIdback": sidback,
  239. }}, true, false)
  240. }
  241. db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
  242. map[string]interface{}{"$set": map[string]interface{}{
  243. "pagetotal": pageNum,
  244. "pagecurrent": i + 1,
  245. }}, true, false)
  246. }
  247. des := make(map[string]interface{})
  248. des["desc"] = "分布式抽取完成,一小时后释放"
  249. des["count"] = count
  250. des["index"] = index
  251. des["instanceId"] = instanceId[0]
  252. des["instanceIP"] = instanceId[1]
  253. udpbytes, _ := json.Marshal(des)
  254. go Udpclient.WriteUdp(udpbytes, mu.OP_SEND_EMAIL, ra)
  255. log.Debug("抽取完成", "count:", count, "index:", index, "bidtotal:", ext.BidTotal)
  256. } else { //普通抽取
  257. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
  258. count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
  259. log.Debug("查询条件为:", query, "查询条数:", count)
  260. pageNum := (count + PageSize - 1) / PageSize
  261. limit := PageSize
  262. if count < PageSize {
  263. limit = count
  264. }
  265. wg := sync.WaitGroup{}
  266. for i := 0; i < pageNum; i++ {
  267. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(sid)}}
  268. fmt.Printf("page=%d,query=%v\n", i+1, query)
  269. list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  270. for _, v := range *list {
  271. if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
  272. log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
  273. continue
  274. }
  275. _id := qu.BsonIdToSId(v["_id"])
  276. var j, jf *ju.Job
  277. if ext.IsFileField && v["projectinfo"] != nil {
  278. v["isextFile"] = true
  279. j, jf = ext.PreInfo(v)
  280. } else {
  281. j, _ = ext.PreInfo(v)
  282. }
  283. ext.TaskInfo.ProcessPool <- true
  284. wg.Add(1)
  285. go func(wg *sync.WaitGroup, j, jf *ju.Job) {
  286. defer wg.Done()
  287. //log.Debug(index,j.SourceMid,)
  288. ext.ExtractProcess(j, jf)
  289. }(&wg, j, jf)
  290. index++
  291. if index%1000 == 0 {
  292. log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
  293. }
  294. sid = _id
  295. if sid >= eid {
  296. break
  297. }
  298. }
  299. }
  300. wg.Wait()
  301. ext.BidSave(false)
  302. log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
  303. }
  304. }