task.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "encoding/json"
  7. "fmt"
  8. "go.uber.org/zap"
  9. "medical_project/config"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. queryClose = make(chan bool)
  15. queryCloseOver = make(chan bool)
  16. pool = make(chan bool, 2)
  17. )
  18. func updateAllQueue() {
  19. arru := make([][]map[string]interface{}, saveSize)
  20. indexu := 0
  21. for {
  22. select {
  23. case v := <-updatePool:
  24. arru[indexu] = v
  25. indexu++
  26. if indexu == saveSize {
  27. updateSp <- true
  28. go func(arru [][]map[string]interface{}) {
  29. defer func() {
  30. <-updateSp
  31. }()
  32. MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...)
  33. }(arru)
  34. arru = make([][]map[string]interface{}, saveSize)
  35. indexu = 0
  36. }
  37. case <-time.After(1 * time.Second):
  38. if indexu > 0 {
  39. updateSp <- true
  40. go func(arru [][]map[string]interface{}) {
  41. defer func() {
  42. <-updateSp
  43. }()
  44. MongoPro.UpSertBulk(config.Conf.DB.MongoP.Coll, arru...)
  45. }(arru[:indexu])
  46. arru = make([][]map[string]interface{}, saveSize)
  47. indexu = 0
  48. }
  49. }
  50. }
  51. }
  52. //全量合并
  53. func taskQl(udpInfo map[string]interface{}) {
  54. defer util.Catch()
  55. q := make(map[string]interface{})
  56. gtid, _ := udpInfo["gtid"].(string)
  57. lteid, _ := udpInfo["lteid"].(string)
  58. if mongodb.IsObjectIdHex(gtid) && mongodb.IsObjectIdHex(lteid) {
  59. q["_id"] = map[string]interface{}{
  60. "$lte": mongodb.StringTOBsonId(lteid),
  61. "$gte": mongodb.StringTOBsonId(gtid),
  62. }
  63. }
  64. //生成查询语句执行
  65. log.Info("查询语句:", zap.Any("q", q))
  66. Enter(q)
  67. }
  68. //增量合并
  69. func taskZl(udpInfo map[string]interface{}) {
  70. defer util.Catch()
  71. //开始id和结束id
  72. q, _ := udpInfo["query"].(map[string]interface{})
  73. gtid := udpInfo["gtid"].(string)
  74. lteid := udpInfo["lteid"].(string)
  75. q = map[string]interface{}{
  76. "_id": map[string]interface{}{
  77. "$gt": mongodb.StringTOBsonId(gtid),
  78. "$lte": mongodb.StringTOBsonId(lteid),
  79. },
  80. }
  81. Enter(q)
  82. //if udpInfo["stop"] == nil {
  83. // for i := 0; i < 1; i++ {
  84. // sp <- true
  85. // }
  86. // for i := 0; i < 1; i++ {
  87. // <-sp
  88. // }
  89. //}
  90. }
  91. func Enter(q map[string]interface{}) {
  92. defer util.Catch()
  93. count := 0
  94. sess := MongoBid.GetMgoConn()
  95. defer MongoBid.DestoryMongoConn(sess)
  96. infoPool := make(chan map[string]interface{}, 2000)
  97. over := make(chan bool)
  98. go func() {
  99. L:
  100. for {
  101. select {
  102. case tmp := <-infoPool:
  103. pool <- true
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-pool
  107. }()
  108. info := ParseInfo(tmp)
  109. currentTime = info.Publishtime
  110. startProjectMerge(info, tmp)
  111. }(tmp)
  112. default:
  113. select {
  114. case tmp := <-infoPool:
  115. pool <- true
  116. go func(tmp map[string]interface{}) {
  117. defer func() {
  118. <-pool
  119. }()
  120. info := ParseInfo(tmp)
  121. currentTime = info.Publishtime
  122. startProjectMerge(info, tmp)
  123. }(tmp)
  124. case <-over:
  125. break L
  126. }
  127. }
  128. }
  129. }()
  130. fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0, "detail": 0, "contenthtml": 0, "jsondata": 0}
  131. ms := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(fields).Sort("publishtime")
  132. query := ms.Iter()
  133. var lastid interface{}
  134. L:
  135. for {
  136. select {
  137. case <-queryClose:
  138. log.Error("receive interrupt sign")
  139. queryCloseOver <- true
  140. break L
  141. default:
  142. tmp := make(map[string]interface{})
  143. if query.Next(&tmp) {
  144. lastid = tmp["_id"]
  145. if currentType == "ql" {
  146. if count%50000 == 0 {
  147. util.Debug("current", count, lastid)
  148. }
  149. } else {
  150. if count%2000 == 0 {
  151. util.Debug("current", count, lastid)
  152. }
  153. }
  154. //if util.ObjToString(tmp["bid_field"]) == "0101" {
  155. infoPool <- tmp
  156. //}
  157. count++
  158. } else {
  159. break L
  160. }
  161. }
  162. }
  163. time.Sleep(5 * time.Second)
  164. over <- true
  165. //阻塞
  166. for n := 0; n < config.Conf.Server.Thread; n++ {
  167. pool <- true
  168. }
  169. log.Info("所有线程执行完成...", zap.Int("count:", count))
  170. }
  171. func ParseInfo(tmp map[string]interface{}) (info *Info) {
  172. bys, _ := json.Marshal(tmp)
  173. var thisinfo *Info
  174. _ = json.Unmarshal(bys, &thisinfo)
  175. if thisinfo == nil {
  176. return nil
  177. }
  178. // 处理publishtime为空
  179. if thisinfo.Publishtime <= 0 {
  180. for _, d := range DateTimeSelect {
  181. if tmp[d] != nil {
  182. thisinfo.Publishtime = util.Int64All(tmp[d])
  183. tmp["publishtime"] = tmp[d]
  184. break
  185. }
  186. }
  187. }
  188. if len(thisinfo.Topscopeclass) == 0 {
  189. thisinfo.Topscopeclass = []string{}
  190. }
  191. if len(thisinfo.Subscopeclass) == 0 {
  192. thisinfo.Subscopeclass = []string{}
  193. }
  194. if thisinfo.SubType == "" {
  195. thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
  196. }
  197. //从标题中查找项目编号
  198. res := titleGetPc.FindStringSubmatch(thisinfo.Title)
  199. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  200. thisinfo.PTC = res[1]
  201. } else {
  202. res = titleGetPc1.FindStringSubmatch(thisinfo.Title)
  203. if len(res) > 3 && len(res[3]) > 6 && thisinfo.ProjectCode != res[3] && !numCheckPc.MatchString(res[3]) && !_zimureg1.MatchString(res[3]) {
  204. thisinfo.PTC = res[3]
  205. } else {
  206. res = titleGetPc2.FindStringSubmatch(thisinfo.Title)
  207. if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
  208. thisinfo.PTC = res[1]
  209. }
  210. }
  211. }
  212. if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
  213. //thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
  214. //if thisinfo.ProjectName != "" {
  215. thisinfo.pnbval++
  216. //}
  217. }
  218. if thisinfo.ProjectCode != "" || thisinfo.PTC != "" {
  219. if thisinfo.ProjectCode != "" {
  220. //thisinfo.ProjectCode = pcReplace.ReplaceAllString(thisinfo.ProjectCode, "")
  221. if thisinfo.pnbval == 0 && len([]rune(thisinfo.ProjectCode)) < 5 {
  222. thisinfo.ProjectCode = StrOrNum.ReplaceAllString(thisinfo.ProjectCode, "")
  223. }
  224. } else {
  225. //thisinfo.PTC = pcReplace.ReplaceAllString(thisinfo.PTC, "")
  226. if thisinfo.pnbval == 0 && len([]rune(thisinfo.PTC)) < 5 {
  227. thisinfo.PTC = StrOrNum.ReplaceAllString(thisinfo.PTC, "")
  228. }
  229. }
  230. thisinfo.pnbval++
  231. }
  232. if thisinfo.ProjectCode == thisinfo.PTC || strings.Index(thisinfo.ProjectCode, thisinfo.PTC) > -1 {
  233. thisinfo.PTC = ""
  234. }
  235. if thisinfo.Buyer != "" && len([]rune(thisinfo.Buyer)) > 2 {
  236. thisinfo.pnbval++
  237. } else {
  238. thisinfo.Buyer = ""
  239. }
  240. //winners整理、清理
  241. winner := util.ObjToString(tmp["winner"])
  242. m1 := map[string]bool{}
  243. winners := []string{}
  244. if winner != "" {
  245. m1[winner] = true
  246. winners = append(winners, winner)
  247. }
  248. packageM, _ := tmp["package"].(map[string]interface{})
  249. if packageM != nil {
  250. thisinfo.HasPackage = true
  251. for _, p := range packageM {
  252. pm, _ := p.(map[string]interface{})
  253. pw := util.ObjToString(pm["winner"])
  254. if pw != "" && !m1[pw] {
  255. m1[pw] = true
  256. winners = append(winners, pw)
  257. }
  258. }
  259. }
  260. thisinfo.Winners = winners
  261. //清理winnerorder
  262. var wins []map[string]interface{}
  263. for _, v := range thisinfo.WinnerOrder {
  264. w := util.ObjToString(v["entname"])
  265. if w != "" {
  266. v["entname"] = w
  267. wins = append(wins, v)
  268. }
  269. }
  270. thisinfo.WinnerOrder = wins
  271. thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
  272. thisinfo.LenPTC = len([]rune(thisinfo.PTC))
  273. thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
  274. //处理分包中数据异常问题
  275. for k, tmp := range thisinfo.Package {
  276. if ps, ok := tmp.([]map[string]interface{}); ok {
  277. for i, p := range ps {
  278. name, _ := p["name"].(string)
  279. if len([]rune(name)) > 100 {
  280. p["name"] = fmt.Sprint([]rune(name[:100]))
  281. }
  282. ps[i] = p
  283. }
  284. thisinfo.Package[k] = ps
  285. }
  286. }
  287. return thisinfo
  288. }