task.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  10. "medical_project/config"
  11. "net"
  12. "strings"
  13. "time"
  14. )
  15. var (
  16. queryClose = make(chan bool)
  17. queryCloseOver = make(chan bool)
  18. pool = make(chan bool, 1)
  19. )
  20. func updateAllQueue() {
  21. arru := make([][]map[string]interface{}, saveSize)
  22. indexu := 0
  23. for {
  24. select {
  25. case v := <-updatePool:
  26. arru[indexu] = v
  27. indexu++
  28. if indexu == saveSize {
  29. updateSp <- true
  30. go func(arru [][]map[string]interface{}) {
  31. defer func() {
  32. <-updateSp
  33. }()
  34. MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...)
  35. }(arru)
  36. arru = make([][]map[string]interface{}, saveSize)
  37. indexu = 0
  38. }
  39. case <-time.After(1 * time.Second):
  40. if indexu > 0 {
  41. updateSp <- true
  42. go func(arru [][]map[string]interface{}) {
  43. defer func() {
  44. <-updateSp
  45. }()
  46. MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...)
  47. }(arru[:indexu])
  48. arru = make([][]map[string]interface{}, saveSize)
  49. indexu = 0
  50. }
  51. }
  52. }
  53. }
  54. // 全量合并
  55. func taskQl(udpInfo map[string]interface{}) {
  56. defer util.Catch()
  57. q := make(map[string]interface{})
  58. gtid, _ := udpInfo["gtid"].(string)
  59. lteid, _ := udpInfo["lteid"].(string)
  60. if mongodb.IsObjectIdHex(gtid) && mongodb.IsObjectIdHex(lteid) {
  61. q["_id"] = map[string]interface{}{
  62. "$lte": mongodb.StringTOBsonId(lteid),
  63. "$gte": mongodb.StringTOBsonId(gtid),
  64. }
  65. }
  66. //生成查询语句执行
  67. log.Info("查询语句:", zap.Any("q", q))
  68. Enter(q)
  69. nextNode(udpInfo)
  70. }
  71. // 增量合并
  72. func taskZl(udpInfo map[string]interface{}) {
  73. defer util.Catch()
  74. //开始id和结束id
  75. q, _ := udpInfo["query"].(map[string]interface{})
  76. gtid := udpInfo["gtid"].(string)
  77. lteid := udpInfo["lteid"].(string)
  78. q = map[string]interface{}{
  79. "_id": map[string]interface{}{
  80. "$gt": mongodb.StringTOBsonId(gtid),
  81. "$lte": mongodb.StringTOBsonId(lteid),
  82. },
  83. }
  84. Enter(q)
  85. }
  86. func Enter(q map[string]interface{}) {
  87. defer util.Catch()
  88. count, index := 0, 0
  89. sess := MongoBid.GetMgoConn()
  90. defer MongoBid.DestoryMongoConn(sess)
  91. infoPool := make(chan map[string]interface{}, 2000)
  92. over := make(chan bool)
  93. go func() {
  94. L:
  95. for {
  96. select {
  97. case tmp := <-infoPool:
  98. pool <- true
  99. go func(tmp map[string]interface{}) {
  100. defer func() {
  101. <-pool
  102. }()
  103. info := ParseInfo(tmp)
  104. currentTime = info.Publishtime
  105. startProjectMerge(info, tmp)
  106. }(tmp)
  107. default:
  108. select {
  109. case tmp := <-infoPool:
  110. pool <- true
  111. go func(tmp map[string]interface{}) {
  112. defer func() {
  113. <-pool
  114. }()
  115. info := ParseInfo(tmp)
  116. currentTime = info.Publishtime
  117. startProjectMerge(info, tmp)
  118. }(tmp)
  119. case <-over:
  120. break L
  121. }
  122. }
  123. }
  124. }()
  125. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0, "detail": 0, "contenthtml": 0, "jsondata": 0}
  126. ms := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(fields)
  127. query := ms.Iter()
  128. var lastid interface{}
  129. L:
  130. for {
  131. select {
  132. case <-queryClose:
  133. log.Error("receive interrupt sign")
  134. queryCloseOver <- true
  135. break L
  136. default:
  137. tmp := make(map[string]interface{})
  138. if query.Next(&tmp) {
  139. lastid = tmp["_id"]
  140. if count%2000 == 0 {
  141. log.Info("current", zap.Int("count", count), zap.Any("lastid", lastid))
  142. }
  143. if util.ObjToString(tmp["bid_field"]) == "0101" {
  144. index++
  145. infoPool <- tmp
  146. }
  147. count++
  148. } else {
  149. break L
  150. }
  151. }
  152. }
  153. time.Sleep(5 * time.Second)
  154. over <- true
  155. ////阻塞
  156. for n := 0; n < 1; n++ {
  157. pool <- true
  158. }
  159. log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("index", index))
  160. }
  161. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  162. bys, _ := json.Marshal(tmp)
  163. var thisinfo *Info
  164. _ = json.Unmarshal(bys, &thisinfo)
  165. if thisinfo == nil {
  166. return nil
  167. }
  168. // 处理publishtime为空
  169. if thisinfo.Publishtime <= 0 {
  170. for _, d := range DateTimeSelect {
  171. if tmp[d] != nil {
  172. thisinfo.Publishtime = util.Int64All(tmp[d])
  173. tmp["publishtime"] = tmp[d]
  174. break
  175. }
  176. }
  177. }
  178. if len(thisinfo.Topscopeclass) == 0 {
  179. thisinfo.Topscopeclass = []string{}
  180. }
  181. if len(thisinfo.Subscopeclass) == 0 {
  182. thisinfo.Subscopeclass = []string{}
  183. }
  184. if thisinfo.SubType == "" {
  185. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  186. }
  187. //从标题中查找项目编号
  188. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  189. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  190. thisinfo.PTC = res[1]
  191. } else {
  192. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  193. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  194. thisinfo.PTC = res[3]
  195. } else {
  196. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  197. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  198. thisinfo.PTC = res[1]
  199. }
  200. }
  201. }
  202. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  203. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  204. //if thisinfo.ProjectName != "" {
  205. thisinfo.pnbval++
  206. //}
  207. }
  208. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  209. if thisinfo.ProjectCode != "" {
  210. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  211. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  212. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  213. }
  214. } else {
  215. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  216. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  217. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  218. }
  219. }
  220. thisinfo.pnbval++
  221. }
  222. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  223. thisinfo.PTC = ""
  224. }
  225. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  226. thisinfo.pnbval++
  227. } else {
  228. thisinfo.Buyer = ""
  229. }
  230. //winners整理、清理
  231. winner := util.ObjToString(tmp["winner"])
  232. m1 := map[string]bool{}
  233. winners := []string{}
  234. if winner != "" {
  235. m1[winner] = true
  236. winners = append(winners, winner)
  237. }
  238. packageM, _ := tmp["package"].(map[string]interface{})
  239. if packageM != nil {
  240. thisinfo.HasPackage = true
  241. for _, p := range packageM {
  242. pm, _ := p.(map[string]interface{})
  243. pw := util.ObjToString(pm["winner"])
  244. if pw != "" && !m1[pw] {
  245. m1[pw] = true
  246. winners = append(winners, pw)
  247. }
  248. }
  249. }
  250. thisinfo.Winners = winners
  251. //清理winnerorder
  252. var wins []map[string]interface{}
  253. for _, v := range thisinfo.WinnerOrder {
  254. w := util.ObjToString(v["entname"])
  255. if w != "" {
  256. v["entname"] = w
  257. wins = append(wins, v)
  258. }
  259. }
  260. thisinfo.WinnerOrder = wins
  261. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  262. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  263. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  264. //处理分包中数据异常问题
  265. for k, tmp := range thisinfo.Package {
  266. if ps, ok := tmp.([]map[string]interface{}); ok {
  267. for i, p := range ps {
  268. name, _ := p["name"].(string)
  269. if len([]rune(name)) > 100 {
  270. p["name"] = fmt.Sprint([]rune(name[:100]))
  271. }
  272. ps[i] = p
  273. }
  274. thisinfo.Package[k] = ps
  275. }
  276. }
  277. return thisinfo
  278. }
  279. func nextNode(mapInfo map[string]interface{}) {
  280. mapInfo["stype"] = config.Conf.Udp.Next.Stype
  281. mapInfo["pici"] = pici
  282. datas, _ := json.Marshal(mapInfo)
  283. node := &net.UDPAddr{
  284. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  285. Port: util.IntAll(config.Conf.Udp.Next.Port),
  286. }
  287. log.Info("udp next...", zap.Any("msg", mapInfo), zap.Any("to", node.String()))
  288. _ = udpClient.WriteUdp(datas, udp.OP_TYPE_DATA, node)
  289. }