tagTask.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "fmt"
  6. "go.uber.org/zap"
  7. "proposed_project/config"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var TagField = map[string]string{
  13. "owner": "ownerclass",
  14. //"projecttype": "",
  15. "top_category": "category",
  16. "sub_category": "category",
  17. "nature": "nature",
  18. "project_stage": "project_stage",
  19. }
  20. var (
  21. MatchArr []TagMatching
  22. SelectF = make(map[string]int)
  23. )
  24. func InitRule() {
  25. info, _ := MgoBid.Find(config.Conf.Serve.TagRule, nil, `{"_id": 1}`, nil, false, -1, -1)
  26. for _, m := range *info {
  27. tag := TagMatching{}
  28. tag.tagName = util.ObjToString(m["label_name"])
  29. tag.tagCode = util.ObjToString(m["code"])
  30. // 关键词
  31. if f := util.ObjToString(m["match_keyword"]); f != "" {
  32. tag.matchField = strings.Split(f, ",")
  33. for _, s := range tag.matchField {
  34. SelectF[s] = 1
  35. }
  36. if v := util.ObjToString(m["keyword"]); v != "" {
  37. tag.matchKey = util.ObjToString(m["keyword"])
  38. tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
  39. }
  40. }
  41. // 附件词
  42. if f := util.ObjToString(m["match_fjword"]); f != "" {
  43. tag.addField = strings.Split(f, ",")
  44. for _, s := range tag.addField {
  45. SelectF[s] = 1
  46. }
  47. if v := util.ObjToString(m["fjword"]); v != "" {
  48. tag.addKey = util.ObjToString(m["fjword"])
  49. tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
  50. }
  51. }
  52. // 排除词
  53. if f := util.ObjToString(m["match_pcword"]); f != "" {
  54. tag.excludeField = strings.Split(f, ",")
  55. for _, s := range tag.excludeField {
  56. SelectF[s] = 1
  57. }
  58. if v := util.ObjToString(m["pcword"]); v != "" {
  59. tag.excludeKey = util.ObjToString(m["pcword"])
  60. tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
  61. }
  62. }
  63. // 清理词
  64. if v := util.ObjToString(m["qlword"]); v != "" {
  65. tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
  66. }
  67. MatchArr = append(MatchArr, tag)
  68. }
  69. log.Info("InitRule", zap.Int("MatchArr", len(MatchArr)))
  70. }
  71. func taskRun() {
  72. sess := MgoPro.GetMgoConn()
  73. defer MgoPro.DestoryMongoConn(sess)
  74. ch := make(chan bool, config.Conf.Serve.Thread)
  75. wg := &sync.WaitGroup{}
  76. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.Coll).Find(nil).Select(SelectF).Iter()
  77. count := 0
  78. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  79. if count%20000 == 0 {
  80. log.Info(fmt.Sprintf("current --- %d", count))
  81. }
  82. ch <- true
  83. wg.Add(1)
  84. go func(tmp map[string]interface{}) {
  85. defer func() {
  86. <-ch
  87. wg.Done()
  88. }()
  89. tag := taskFuc(tmp)
  90. update := make(map[string]interface{})
  91. if tag["nature"] != "" {
  92. update["nature_code"] = tag["nature"]
  93. tmp["nature_code"] = tag["nature"]
  94. } else {
  95. update["nature_code"] = "00"
  96. tmp["nature_code"] = "00"
  97. }
  98. if tag["project_stage"] != "" {
  99. update["project_stage_code"] = tag["project_stage"]
  100. tmp["project_stage_code"] = tag["project_stage"]
  101. } else {
  102. update["project_stage_code"] = "00"
  103. tmp["project_stage_code"] = "00"
  104. }
  105. if tag["owner"] != "" {
  106. update["ownerclass_code"] = tag["owner"]
  107. tmp["ownerclass_code"] = tag["owner"]
  108. } else {
  109. update["ownerclass_code"] = "00"
  110. tmp["ownerclass_code"] = "00"
  111. }
  112. if tag["sub_category"] != "" {
  113. update["category_code"] = tag["sub_category"]
  114. tmp["category_code"] = tag["sub_category"]
  115. } else {
  116. if tag["top_category"] != "" {
  117. update["category_code"] = tag["top_category"]
  118. tmp["category_code"] = tag["top_category"]
  119. }
  120. }
  121. if util.ObjToString(update["category_code"]) == "" {
  122. update["category_code"] = "04"
  123. tmp["category_code"] = "04"
  124. }
  125. //updatePool <- []map[string]interface{}{
  126. // {"_id": tmp["_id"]},
  127. // {"$set": update},
  128. //}
  129. savePool <- tmp
  130. }(tmp)
  131. tmp = make(map[string]interface{})
  132. }
  133. wg.Wait()
  134. log.Info(fmt.Sprintf("over --- %d", count))
  135. }
  136. func taskFuc(tmp map[string]interface{}) map[string]string {
  137. tag := make(map[string]string) // 打上的标签
  138. L:
  139. for _, v := range MatchArr {
  140. // 同个类型的标签如果存在,就不需要再打
  141. if tag[v.tagName] != "" {
  142. continue
  143. }
  144. // 排除词
  145. if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 {
  146. for _, f := range v.excludeField {
  147. if val := util.ObjToString(tmp[f]); val != "" {
  148. for _, e1 := range v.excludeKeyReg {
  149. if e1.regs != nil && e1.regs.MatchString(val) {
  150. break L
  151. } else {
  152. // && 特殊处理
  153. if strings.Contains(e1.keyStr, "&&") {
  154. flag := true
  155. for _, s := range strings.Split(e1.keyStr, "&&") {
  156. if !strings.Contains(val, s) {
  157. flag = false
  158. break
  159. }
  160. }
  161. if flag {
  162. break L
  163. }
  164. }
  165. }
  166. }
  167. }
  168. }
  169. }
  170. // 清理词
  171. if len(v.clearKey) > 0 && len(v.matchField) > 0 {
  172. for _, s := range v.clearKey {
  173. for _, f := range v.matchField {
  174. if val := util.ObjToString(tmp[f]); val != "" {
  175. tmp[f] = strings.ReplaceAll(val, s, "")
  176. }
  177. }
  178. }
  179. }
  180. // 关键词
  181. if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 {
  182. for _, f := range v.matchField {
  183. if val := util.ObjToString(tmp[f]); val != "" {
  184. for _, r1 := range v.matchKeyReg {
  185. if r1.regs.MatchString(val) {
  186. if len(v.addField) > 0 && len(v.addKeyReg) > 0 {
  187. // 匹配附加词
  188. isCt := false
  189. for _, f1 := range v.addField {
  190. if v1 := util.ObjToString(tmp[f1]); v1 != "" {
  191. for _, r2 := range v.addKeyReg {
  192. if r2.regs != nil && r2.regs.MatchString(v1) {
  193. isCt = true
  194. } else {
  195. // && 特殊处理
  196. if strings.Contains(r2.keyStr, "&&") {
  197. flag := true
  198. for _, s := range strings.Split(r2.keyStr, "&&") {
  199. if !strings.Contains(v1, s) {
  200. flag = false
  201. break
  202. }
  203. }
  204. if flag {
  205. isCt = true
  206. }
  207. }
  208. }
  209. }
  210. }
  211. }
  212. if isCt {
  213. tag[v.tagName] = v.tagCode
  214. }
  215. } else {
  216. tag[v.tagName] = v.tagCode
  217. }
  218. }
  219. }
  220. }
  221. }
  222. }
  223. }
  224. return tag
  225. }
  226. func UpdateMethod() {
  227. arru := make([][]map[string]interface{}, saveSize)
  228. indexu := 0
  229. for {
  230. select {
  231. case v := <-updatePool:
  232. arru[indexu] = v
  233. indexu++
  234. if indexu == saveSize {
  235. updateSp <- true
  236. go func(arru [][]map[string]interface{}) {
  237. defer func() {
  238. <-updateSp
  239. }()
  240. MgoPro.UpdateBulk(config.Conf.DB.MongoP.Coll, arru...)
  241. }(arru)
  242. arru = make([][]map[string]interface{}, saveSize)
  243. indexu = 0
  244. }
  245. case <-time.After(1000 * time.Millisecond):
  246. if indexu > 0 {
  247. updateSp <- true
  248. go func(arru [][]map[string]interface{}) {
  249. defer func() {
  250. <-updateSp
  251. }()
  252. MgoPro.UpdateBulk(config.Conf.DB.MongoP.Coll, arru...)
  253. }(arru[:indexu])
  254. arru = make([][]map[string]interface{}, saveSize)
  255. indexu = 0
  256. }
  257. }
  258. }
  259. }