main.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // main
  2. package main
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  12. "net"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. Mgo *mongodb.MongodbSim
  18. tree = NewMultiTree() //全局的数据匹配
  19. Mb float64 = 1024 * 1024
  20. UdpClient udp.UdpClient
  21. )
  22. func main() {
  23. local, _ := time.LoadLocation("Asia/Shanghai")
  24. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  25. if GF.Cron.Spec != "" {
  26. eid, err := c.AddFunc(GF.Cron.Spec, loadIncBidding) //定时执行芜湖数据
  27. if err != nil {
  28. log.Error("main", zap.Any("AddFunc err", err))
  29. }
  30. log.Info("main", zap.Any("eid", eid))
  31. }
  32. if GF.Cron.Specq != "" {
  33. _, err := c.AddFunc(GF.Cron.Specq, loadIncQyxy)
  34. if err != nil {
  35. log.Error("main", zap.Any("AddFunc loadIncQyxy err", err))
  36. }
  37. }
  38. c.Start()
  39. defer c.Stop()
  40. select {}
  41. }
  42. //processUdpMsg 处理udp
  43. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  44. switch act {
  45. case udp.OP_TYPE_DATA:
  46. var mapInfo map[string]interface{}
  47. err := json.Unmarshal(data, &mapInfo)
  48. log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  49. if err != nil {
  50. log.Error("processUdpMsg", zap.Error(err))
  51. } else {
  52. if mapInfo != nil {
  53. key, _ := mapInfo["key"].(string)
  54. if key == "" {
  55. key = "udpok"
  56. }
  57. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  58. }
  59. tasktype, _ := mapInfo["stype"].(string)
  60. switch tasktype {
  61. case "inc_data": //每天增量bidding数据
  62. go loadIncBidding()
  63. case "inc_qyxy": //每周企业数据
  64. go loadIncQyxy()
  65. case "alldata": // 芜湖存量bidding数据
  66. go biddingAllData()
  67. default:
  68. log.Info("processUdpMsg", zap.String("tasktype", tasktype))
  69. }
  70. }
  71. default:
  72. fmt.Println("processUdpMsg : processUdpMsg =====")
  73. }
  74. }
  75. //loadIncData 加载芜湖每周增量企业数据
  76. func loadIncQyxy() {
  77. log.Info("loadIncQyxy", zap.String("loadIncQyxy", "loadIncQyxy"))
  78. sess := Mgo.GetMgoConn()
  79. defer Mgo.DestoryMongoConn(sess)
  80. // 获取当前时间
  81. now := time.Now()
  82. // 获取当前时间的星期几(0表示周一,1表示周二,以此类推)
  83. weekday := now.Weekday()
  84. // 计算从周日到现在的天数(因为一周从周日开始,所以需要减一天)
  85. daysSinceSunday := int(weekday) - 1
  86. // 计算周一的时间
  87. monday := now.AddDate(0, 0, -daysSinceSunday)
  88. // 将时间格式化为凌晨(00:00:00)
  89. mondayAtMidnight := time.Date(monday.Year(), monday.Month(), monday.Day(), 0, 0, 0, 0, monday.Location())
  90. q := map[string]interface{}{
  91. "updatetime": map[string]interface{}{
  92. "$gt": mondayAtMidnight.Unix(),
  93. //"$lte": todayTime.Unix(),
  94. },
  95. "company_type": map[string]interface{}{
  96. "$ne": "个体工商户",
  97. },
  98. "company_city": "芜湖市",
  99. }
  100. log.Info("loadIncQyxy", zap.Any("q", q))
  101. query := sess.DB("mixdata").C("qyxy_std").Find(q).Select(nil).Iter()
  102. count := 0
  103. ch := make(chan bool, 15)
  104. wg := &sync.WaitGroup{}
  105. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  106. if count%1000 == 0 {
  107. log.Info("loadIncQyxy", zap.Int("current", count))
  108. }
  109. ch <- true
  110. wg.Add(1)
  111. go func(tmp map[string]interface{}) {
  112. defer func() {
  113. <-ch
  114. wg.Done()
  115. }()
  116. saveQyxy(tmp)
  117. }(tmp)
  118. tmp = map[string]interface{}{}
  119. }
  120. wg.Wait()
  121. log.Info("loadIncQyxy", zap.Int("over ", count))
  122. }
  123. //saveQyxy 处理增量企业信息
  124. func saveQyxy(tmp map[string]interface{}) {
  125. if util.ObjToString(tmp["company_status"]) != "存续" {
  126. return
  127. }
  128. tree.Add(util.ObjToString(tmp["company_name"]))
  129. update := make(map[string]interface{})
  130. update["$set"] = tmp
  131. updataInfo := []map[string]interface{}{
  132. {"_id": tmp["_id"]},
  133. update,
  134. }
  135. Mgo.UpSertBulk(GF.Env.QyxyColl, updataInfo)
  136. }
  137. //loadIncBidding 每天标讯数据
  138. func loadIncBidding() {
  139. sess := Mgo.GetMgoConn()
  140. defer Mgo.DestoryMongoConn(sess)
  141. log.Info("loadIncBidding", zap.String("loadIncBidding", "开始处理增量标讯数据"))
  142. // 获取当前时间
  143. now := time.Now()
  144. targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
  145. todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
  146. q := map[string]interface{}{
  147. "comeintime": map[string]interface{}{
  148. "$gt": targetTime.Unix(),
  149. "$lte": todayTime.Unix(),
  150. },
  151. }
  152. log.Info("loadIncBidding", zap.Any("q", q))
  153. query := sess.DB("qfw").C("bidding").Find(q).Select(nil).Iter()
  154. count := 0
  155. ch := make(chan bool, 15)
  156. wg := &sync.WaitGroup{}
  157. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  158. if count%1000 == 0 {
  159. log.Info("loadIncBidding", zap.Int("current", count))
  160. }
  161. ch <- true
  162. wg.Add(1)
  163. go func(tmp map[string]interface{}) {
  164. defer func() {
  165. <-ch
  166. wg.Done()
  167. }()
  168. saveBidding(tmp)
  169. }(tmp)
  170. tmp = map[string]interface{}{}
  171. }
  172. wg.Wait()
  173. log.Info("loadIncBidding", zap.Int("over ", count))
  174. }
  175. //saveBidding 保存芜湖bidding数据
  176. func saveBidding(tmp map[string]interface{}) {
  177. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  178. tmp = make(map[string]interface{})
  179. return
  180. }
  181. // 针对存量数据,重复数据不进索引
  182. if util.IntAll(tmp["extracttype"]) == -1 {
  183. return
  184. }
  185. if util.IntAll(tmp["dataprocess"]) != 8 {
  186. return
  187. }
  188. //1.采购单位
  189. buyer := util.ObjToString(tmp["buyer"])
  190. rests := tree.Match(buyer, true)
  191. if len(rests) > 0 {
  192. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  193. return
  194. }
  195. //2.中标单位
  196. winner := util.ObjToString(tmp["winner"])
  197. rests = tree.Match(winner, true)
  198. if len(rests) > 0 {
  199. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  200. return
  201. }
  202. //3.中标候选人
  203. winnerorder, ok := tmp["winnerorder"].([]map[string]interface{})
  204. if ok {
  205. for _, v := range winnerorder {
  206. res := tree.Match(util.ObjToString(v["entname"]), true)
  207. if len(res) > 0 {
  208. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  209. return
  210. }
  211. }
  212. }
  213. }