data.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/importcjj/sensitive"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "go.mongodb.org/mongo-driver/mongo/options"
  7. "gopkg.in/mgo.v2/bson"
  8. "log"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. //部署-历史-敏感词库
  15. func initSensitiveWordsData() {
  16. log.Println("初始化敏感词-源数据...")
  17. gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId)
  18. if err != nil {
  19. log.Fatalln(err)
  20. }
  21. lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId)
  22. if err != nil {
  23. log.Fatalln(err)
  24. }
  25. log.Println("id段落:",BsonTOStringId(gteid),BsonTOStringId(lteid))
  26. iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
  27. "_id": map[string]interface{}{
  28. "$gte": gteid,
  29. "$lte": lteid,
  30. },
  31. }).Sort("_id").Iter()
  32. Filter = sensitive.New()
  33. var initnum uint
  34. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  35. initnum++
  36. if initnum%10000==0 {
  37. log.Println("current index ", initnum,tmp["qy_name"])
  38. }
  39. Filter.AddWord(tmp["qy_name"].(string))
  40. }
  41. log.Println("init ok", initnum)
  42. }
  43. //定时增量数据处理---冯
  44. func addTaskSensitiveWordsData() {
  45. mmmgo, err := InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123")
  46. if err != nil {
  47. log.Fatalln(err)
  48. }
  49. con := mmmgo.GetCon()
  50. if con == nil {
  51. log.Fatalln("mgo con err")
  52. }
  53. Filter = sensitive.New()
  54. tick := time.Tick(time.Hour * 24 * 7)//查询七天前
  55. for {//定时任务
  56. ctime := <-tick
  57. cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
  58. findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{
  59. "updatetime": bson.M{"$gte": cronData.Unix()},
  60. }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1,"company_type": 1,"company_type_old": 1}).SetSort(bson.M{"_id": 1}))
  61. if err != nil {
  62. log.Println("tick err", cronData)
  63. continue
  64. }
  65. defer findByupdate.Close(nil)
  66. for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} {
  67. err := findByupdate.Decode(&tmp)
  68. if err == nil {
  69. if company_name, ok := tmp["company_name"].(string); ok {
  70. if reglen.MatchString(company_name) || strReg.MatchString(company_name) ||
  71. !uncon_strReg.MatchString(company_name)|| !unstart_strReg.MatchString(company_name)||
  72. start_strReg.MatchString(company_name)|| end_strReg.MatchString(company_name)||
  73. con_strReg.MatchString(company_name) {
  74. continue
  75. }
  76. if strings.Contains(ObjToString(tmp["company_type"]),"个人")||
  77. strings.Contains(ObjToString(tmp["company_type"]),"个体")||
  78. strings.Contains(ObjToString(tmp["company_type_old"]),"个人")||
  79. strings.Contains(ObjToString(tmp["company_type_old"]),"个体") {
  80. continue
  81. }
  82. //存mgo
  83. con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{
  84. "qy_name": company_name,
  85. })
  86. //存敏感词
  87. Filter.AddWord(company_name)
  88. //存es=判断+新增
  89. dealWithEsData(company_name,BsonTOStringId(tmp["_id"]))
  90. }
  91. }
  92. }
  93. log.Println("tick ok", cronData)
  94. }
  95. }
  96. //处理是否新增es
  97. func dealWithEsData(name string,tmpid string) {
  98. query:= `{"query":{"bool":{"must":[{"term":{"`+es_index+`.name":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
  99. tmp := make(map[string]interface{})
  100. json.Unmarshal([]byte(query),&tmp)
  101. searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
  102. if err != nil {
  103. log.Println("从ES查询出错", err.Error())
  104. }else {
  105. data := make(map[string]interface{},0)
  106. if searchResult.Hits != nil {
  107. for _, hit := range searchResult.Hits.Hits {
  108. json.Unmarshal(*hit.Source, &data)
  109. }
  110. }
  111. if len(data)==0{
  112. //log.Println("无数据-新增")
  113. _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{
  114. "name":name,
  115. "name_word":name,
  116. }).Do()
  117. if err != nil {
  118. //log.Println("新增失败:",name,tmpid)
  119. }
  120. }
  121. }
  122. }
  123. //处理内存分段
  124. func dealWithDataMemory() {
  125. iter := MixDataMgo.GetMgoConn().C("unique_qyxy").Find(map[string]interface{}{
  126. "_id": map[string]interface{}{
  127. "$gte": BsonTOStringId("1fffffffffffffffffffffff"),
  128. "$lte": BsonTOStringId("9fffffffffffffffffffffff"),
  129. },
  130. }).Sort("_id").Iter()
  131. Filter = sensitive.New()
  132. var initnum uint
  133. saveIdArr ,start_id:= make([]map[string]string,0),""
  134. var m runtime.MemStats
  135. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  136. if start_id=="" {
  137. start_id = BsonTOStringId(tmp["_id"])
  138. }
  139. Filter.AddWord(tmp["qy_name"].(string))
  140. initnum++
  141. if initnum%100000==0 {
  142. runtime.ReadMemStats(&m)
  143. men :=toMegaBytes(m.HeapAlloc)
  144. log.Printf("current index %d\tos %.2f M",initnum, men)
  145. if men>7.5*1024 { //7.5G
  146. saveIdArr = append(saveIdArr, map[string]string{
  147. "start":start_id,
  148. "end":BsonTOStringId(tmp["_id"]),
  149. })
  150. runtime.GC()
  151. Filter = sensitive.New()
  152. start_id = ""
  153. time.Sleep(time.Second*5)
  154. }
  155. }
  156. break
  157. }
  158. saveIdArr = append(saveIdArr, map[string]string{
  159. "start":start_id,
  160. "end":"",
  161. })
  162. for k,v:=range saveIdArr{
  163. log.Println("第",k,"段",v["start"],v["end"])
  164. }
  165. log.Println("memory is ok", initnum)
  166. }
  167. func temporaryTest() {
  168. log.Println("测试......导出数据")
  169. q := map[string]interface{}{}
  170. sess := data_mgo.GetMgoConn()
  171. defer data_mgo.DestoryMongoConn(sess)
  172. //多线程升索引
  173. pool_es := make(chan bool, 20)
  174. wg_es := &sync.WaitGroup{}
  175. it := sess.DB(data_mgo.DbName).C("zk_company_test").Find(&q).Iter()
  176. total,isOK:=0,0
  177. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  178. if total % 1000 == 0 {
  179. log.Println("current index",total,isOK)
  180. }
  181. name:=ObjToString(tmp["name"])
  182. pool_es <- true
  183. wg_es.Add(1)
  184. go func(name string) {
  185. defer func() {
  186. <-pool_es
  187. wg_es.Done()
  188. }()
  189. new_name,b :=dealWithNameScoreRules(name)
  190. if new_name!="" && b {
  191. isOK++
  192. data_mgo.Save("zk_words_test_test", map[string]interface{}{
  193. "name":name,
  194. "new_name":new_name,
  195. })
  196. }
  197. }(name)
  198. tmp = make(map[string]interface{})
  199. }
  200. wg_es.Wait()
  201. log.Println("is over",total)
  202. }