main.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // main
  2. package main
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "github.com/robfig/cron/v3"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  12. "net"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. Mgo *mongodb.MongodbSim
  18. tree = NewMultiTree() //全局的数据匹配
  19. Mb float64 = 1024 * 1024
  20. UdpClient udp.UdpClient
  21. )
  22. func main() {
  23. local, _ := time.LoadLocation("Asia/Shanghai")
  24. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  25. if GF.Cron.Spec != "" {
  26. eid, err := c.AddFunc(GF.Cron.Spec, loadIncBidding) //定时执行芜湖数据
  27. if err != nil {
  28. log.Error("main", zap.Any("AddFunc err", err))
  29. }
  30. log.Info("main", zap.Any("eid", eid))
  31. }
  32. if GF.Cron.Specq != "" {
  33. _, err := c.AddFunc(GF.Cron.Specq, loadIncQyxy)
  34. if err != nil {
  35. log.Error("main", zap.Any("AddFunc loadIncQyxy err", err))
  36. }
  37. }
  38. c.Start()
  39. defer c.Stop()
  40. select {}
  41. }
  42. //processUdpMsg 处理udp
  43. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  44. switch act {
  45. case udp.OP_TYPE_DATA:
  46. var mapInfo map[string]interface{}
  47. err := json.Unmarshal(data, &mapInfo)
  48. log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  49. if err != nil {
  50. log.Error("processUdpMsg", zap.Error(err))
  51. } else {
  52. if mapInfo != nil {
  53. key, _ := mapInfo["key"].(string)
  54. if key == "" {
  55. key = "udpok"
  56. }
  57. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  58. }
  59. tasktype, _ := mapInfo["stype"].(string)
  60. switch tasktype {
  61. case "inc_data": //每天增量bidding数据
  62. loadIncBidding()
  63. case "inc_qyxy": //每周企业数据
  64. loadIncQyxy()
  65. case "alldata": // 芜湖存量bidding数据
  66. biddingAllData()
  67. }
  68. }
  69. default:
  70. fmt.Println("processUdpMsg : processUdpMsg =====")
  71. }
  72. }
  73. //loadIncData 加载芜湖每周增量企业数据
  74. func loadIncQyxy() {
  75. log.Info("loadIncQyxy", zap.String("loadIncQyxy", "loadIncQyxy"))
  76. sess := Mgo.GetMgoConn()
  77. defer Mgo.DestoryMongoConn(sess)
  78. // 获取当前时间
  79. now := time.Now()
  80. // 获取当前时间的星期几(0表示周一,1表示周二,以此类推)
  81. weekday := now.Weekday()
  82. // 计算从周日到现在的天数(因为一周从周日开始,所以需要减一天)
  83. daysSinceSunday := int(weekday) - 1
  84. // 计算周一的时间
  85. monday := now.AddDate(0, 0, -daysSinceSunday)
  86. // 将时间格式化为凌晨(00:00:00)
  87. mondayAtMidnight := time.Date(monday.Year(), monday.Month(), monday.Day(), 0, 0, 0, 0, monday.Location())
  88. q := map[string]interface{}{
  89. "updatetime": map[string]interface{}{
  90. "$gt": mondayAtMidnight.Unix(),
  91. //"$lte": todayTime.Unix(),
  92. },
  93. "company_type": map[string]interface{}{
  94. "$ne": "个体工商户",
  95. },
  96. "company_city": "芜湖市",
  97. }
  98. log.Info("loadIncQyxy", zap.Any("q", q))
  99. query := sess.DB("mixdata").C("qyxy_std").Find(q).Select(nil).Iter()
  100. count := 0
  101. ch := make(chan bool, 15)
  102. wg := &sync.WaitGroup{}
  103. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  104. if count%1000 == 0 {
  105. log.Info("loadIncQyxy", zap.Int("current", count))
  106. }
  107. ch <- true
  108. wg.Add(1)
  109. go func(tmp map[string]interface{}) {
  110. defer func() {
  111. <-ch
  112. wg.Done()
  113. }()
  114. saveQyxy(tmp)
  115. }(tmp)
  116. tmp = map[string]interface{}{}
  117. }
  118. wg.Wait()
  119. log.Info("loadIncQyxy", zap.Int("over ", count))
  120. }
  121. //saveQyxy 处理增量企业信息
  122. func saveQyxy(tmp map[string]interface{}) {
  123. if util.ObjToString(tmp["company_status"]) != "存续" {
  124. return
  125. }
  126. tree.Add(util.ObjToString(tmp["company_name"]))
  127. update := make(map[string]interface{})
  128. update["$set"] = tmp
  129. updataInfo := []map[string]interface{}{
  130. {"_id": tmp["_id"]},
  131. update,
  132. }
  133. Mgo.UpSertBulk(GF.Env.QyxyColl, updataInfo)
  134. }
  135. //loadIncBidding 每天标讯数据
  136. func loadIncBidding() {
  137. sess := Mgo.GetMgoConn()
  138. defer Mgo.DestoryMongoConn(sess)
  139. log.Info("loadIncBidding", zap.String("loadIncBidding", "开始处理增量标讯数据"))
  140. // 获取当前时间
  141. now := time.Now()
  142. targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
  143. todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
  144. q := map[string]interface{}{
  145. "comeintime": map[string]interface{}{
  146. "$gt": targetTime.Unix(),
  147. "$lte": todayTime.Unix(),
  148. },
  149. }
  150. log.Info("loadIncBidding", zap.Any("q", q))
  151. query := sess.DB("qfw").C("bidding").Find(q).Select(nil).Iter()
  152. count := 0
  153. ch := make(chan bool, 15)
  154. wg := &sync.WaitGroup{}
  155. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  156. if count%1000 == 0 {
  157. log.Info("loadIncBidding", zap.Int("current", count))
  158. }
  159. ch <- true
  160. wg.Add(1)
  161. go func(tmp map[string]interface{}) {
  162. defer func() {
  163. <-ch
  164. wg.Done()
  165. }()
  166. saveBidding(tmp)
  167. }(tmp)
  168. tmp = map[string]interface{}{}
  169. }
  170. wg.Wait()
  171. log.Info("loadIncBidding", zap.Int("over ", count))
  172. }
  173. //saveBidding 保存芜湖bidding数据
  174. func saveBidding(tmp map[string]interface{}) {
  175. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  176. tmp = make(map[string]interface{})
  177. return
  178. }
  179. // 针对存量数据,重复数据不进索引
  180. if util.IntAll(tmp["extracttype"]) == -1 {
  181. return
  182. }
  183. if util.IntAll(tmp["dataprocess"]) != 8 {
  184. return
  185. }
  186. //1.采购单位
  187. buyer := util.ObjToString(tmp["buyer"])
  188. rests := tree.Match(buyer, true)
  189. if len(rests) > 0 {
  190. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  191. return
  192. }
  193. //2.中标单位
  194. winner := util.ObjToString(tmp["winner"])
  195. rests = tree.Match(winner, true)
  196. if len(rests) > 0 {
  197. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  198. return
  199. }
  200. //3.中标候选人
  201. winnerorder, ok := tmp["winnerorder"].([]map[string]interface{})
  202. if ok {
  203. for _, v := range winnerorder {
  204. res := tree.Match(util.ObjToString(v["entname"]), true)
  205. if len(res) > 0 {
  206. Mgo.SaveByOriID(GF.Env.BiddingColl, tmp)
  207. return
  208. }
  209. }
  210. }
  211. }