main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package main
  2. /**
  3. 修复没有合并的抽取数据到bidding表中,并删除缓存和生成索引
  4. ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -gtid 5b582c12a5cb26b9b77fcce6 -lteid 5b582c12a5cb26b9b77fcd01
  5. ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -q "{'comeintime':{'\$gte':1537246800,'\$lte':1537254000}}"
  6. **/
  7. import (
  8. "encoding/json"
  9. "flag"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "qfw/util"
  14. elastic "qfw/util/elastic"
  15. "qfw/util/mongodb"
  16. "qfw/util/redis"
  17. "strings"
  18. "time"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mgo *mongodb.MongodbSim //mongodb操作对象
  23. udpclient mu.UdpClient //udp对象
  24. updport string
  25. winner, bidding, biddingback, project, buyer map[string]interface{}
  26. savesizei = 500
  27. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  28. projectinfoFields []string
  29. force = 0
  30. )
  31. func init() {
  32. util.ReadConfig(&Sysconfig)
  33. inits()
  34. updport, _ = Sysconfig["updport"].(string)
  35. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  36. index, _ = bidding["index"].(string)
  37. itype, _ = bidding["type"].(string)
  38. c, _ = bidding["collect"].(string)
  39. extractc, _ = bidding["extractcollect"].(string)
  40. db, _ = bidding["db"].(string)
  41. //extractdb, _ := bidding["extractdb"].(string)
  42. fields = strings.Split(bidding["fields"].(string), ",")
  43. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  44. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  45. mgo = &mongodb.MongodbSim{
  46. MongodbAddr: mconf["addr"].(string),
  47. Size: util.IntAllDef(mconf["pool"], 5),
  48. DbName: mconf["db"].(string),
  49. }
  50. mgo.InitPool()
  51. econf := Sysconfig["elastic"].(map[string]interface{})
  52. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  53. redisaddr, _ := Sysconfig["redisaddr"].(string)
  54. redis.InitRedisBySize(redisaddr, 10, 5, 240)
  55. if bidding["indexfields"] != nil {
  56. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  57. }
  58. if bidding["projectinfo"] != nil {
  59. pf := util.ObjToString(bidding["projectinfo"])
  60. if pf != "" {
  61. projectinfoFields = strings.Split(pf, ",")
  62. }
  63. }
  64. log.Println(projectinfoFields)
  65. }
  66. var lastId = ""
  67. //func CheckMgoTask() {
  68. // if lastId != "" {
  69. // res, bres := mgo.Find(extractc, `{}`, `{"_id":-1}`, `{"_id":1}`, true, -1, -1)
  70. // if bres && res != nil && len(*res) == 1 {
  71. // id := util.BsonIdToSId(((*res)[0])["_id"])
  72. // if id > lastId {
  73. // mapInfo := map[string]interface{}{
  74. // "gtid": lastId,
  75. // "lteid": id,
  76. // }
  77. // biddingTask(nil, mapInfo)
  78. // log.Println("task over!", lastId, id)
  79. // lastId = id
  80. // }
  81. // }
  82. // }
  83. // time.AfterFunc(5*time.Minute, CheckMgoTask)
  84. //}
  85. var (
  86. index, itype, c, extractc, db string
  87. fields []string
  88. )
  89. func main() {
  90. flag.IntVar(&force, "f", 0, "是否强制执行")
  91. flag.StringVar(&lastId, "id", "", "上次执行id")
  92. flag.Parse()
  93. updport := Sysconfig["udpport"].(string)
  94. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  95. udpclient.Listen(processUdpMsg)
  96. log.Println("Udp服务监听", updport)
  97. go delEsTask()
  98. time.Sleep(99999 * time.Hour)
  99. }
  100. var pool = make(chan bool, 20)
  101. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  102. switch act {
  103. case mu.OP_TYPE_DATA: //上个节点的数据
  104. //从表中开始处理生成企业数据
  105. var mapInfo map[string]interface{}
  106. err := json.Unmarshal(data, &mapInfo)
  107. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  108. if err != nil {
  109. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  110. } else if mapInfo != nil {
  111. udpclient.WriteUdp([]byte("ok,run"), mu.OP_NOOP, ra)
  112. tasktype, _ := mapInfo["stype"].(string)
  113. log.Println("tasktype:", tasktype)
  114. switch tasktype {
  115. case "bidding":
  116. pool <- true
  117. go func() {
  118. defer func() {
  119. <-pool
  120. }()
  121. biddingTask(data, mapInfo)
  122. }()
  123. }
  124. }
  125. case mu.OP_NOOP: //下个节点回应
  126. log.Println("发送成功", string(data))
  127. }
  128. }