data.go 11 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/importcjj/sensitive"
  5. "go.mongodb.org/mongo-driver/bson/primitive"
  6. "go.mongodb.org/mongo-driver/mongo/options"
  7. "gopkg.in/mgo.v2/bson"
  8. "log"
  9. "regexp"
  10. "runtime"
  11. "sensitiveWords.udp/util"
  12. "strings"
  13. "time"
  14. )
  15. var reg_alias = regexp.MustCompile("(税务局|工商行政管理局|文化广播电视新闻出版局|外国专家局|" +
  16. "中医药管理局|市场监督管理局|广播电视局|医疗保障局|机关事务管理局|粮食和物资储备局|" +
  17. "监狱管理局|畜牧兽医局|食品药品监督管理局|城市管理行政执法局|城市管理局|国家保密局|密码管理局|" +
  18. "地方金融监督管理局|住房保障和房屋管理局|质量技术监督局|人力资源与社会保障局|公路管理局|国土资源局|" +
  19. "卫生和计划生育局|民事政务局|公众安全局|交通管理局|人力资源和社会保障局|劳动和社会保障局|" +
  20. "住房和城乡建设局|就业服务局|文物管理局|环境保护局|粮食和物资储备局|教育体育局|" +
  21. "体育局|教育局|招商局|农业局|农机局|水务局|林业局|财政局|审计局|统计局|商务局)$")
  22. var reglen *regexp.Regexp = regexp.MustCompile("^(.{1,5}|.{40,})$")
  23. var strReg *regexp.Regexp = regexp.MustCompile("^(.{0,3}工程队|.{0,3}总公司|_+|.{0,2}设备安装公司|.{0,2}装[饰修潢]公司|.{0,2}开发公司|.{0,4}有限公司|.{0,4}有限责任公司|.{0,4}设计院|建筑设计研?究?院|省文物考古研究所|经济开发区|省.*|镇人民政府|.{0,2}服务公司|" +
  24. ".{0,2}工程质量监督站|.{0,3}经[营销]部|.{0,3}事务所|.{0,4}工程公司|.{0,4}责任公司|.*勘测|.{0,4}研究院|.*能源建|.{0,2}安装工程|.*[市省]{1}|.{0,4}中心|.*区.?|" +
  25. ".{0,3}税务局|.{0,3}财政局|.{0,3}商行|.{0,2}公安处|.{0,2}测绘院|.{0,3}开发|.{0,2}建设局|.{0,2}经销部|.{0,3}委员会|.{0,2}分公司|.{0,2}管理站|.{0,2}事务管理局|" +
  26. ".*资料|.{0,2}办公用品.{1,2}|.*唯亭|.*设备|.+安装|.{0,2}技术服务|市.+[台院社局司]|城?区.+[府局室院]|县.+[院台局]|.{0,2}发展公司|经济技术开发|" +
  27. "发展和改革局|贵州有色地质|铝塑门窗加工|生产力促进中心|特殊普通合伙|工业集团公司|人民调解协会|人民政府办公厅|机电设备公司|房地产开发有限公司|.{0,4}商店|中等专业学校|" +
  28. "农村信用联社|.{0,4}经营部|.{0,4}销售部|驾驶员培训学校|.{2}县.{2}镇|保安服务总公司|住房和城乡建设局|地产评估事务所|生产资料门市部|×+|.{0,3}[0-9]{15}|.*[0-9]+|.*路|.*无字号名称.*|.*车|.*[,,]{1}.*|.*个体工商户|.*运输户)$")
  29. //非中文开头...
  30. var unstart_strReg *regexp.Regexp = regexp.MustCompile("^([\u4e00-\u9fa5])")
  31. //开头
  32. var start_strReg *regexp.Regexp = regexp.MustCompile("^([a-zA-Z]{1,2}[\u4e00-\u9fa5]{6,}|省|市|县|区|业绩|资格|中标|项目|预算单位)")
  33. //结尾
  34. var end_strReg *regexp.Regexp = regexp.MustCompile("(\\.|\\.\\.|餐馆|店|腻子|肉庄|画社|美发屋|发廊|网吧|网咖|零售点|新街|包子铺|奶茶铺|(株)|先生|女士|小姐|" +
  35. "资格|业绩|中标|项目|预算单位|摊位号|号|厅|室|部|点|馆|场|厂|床|所|处|站|行|中心|合作社|ATMS|" +
  36. "吧|楼|摊|摊位|廊|茶社|坊|圃|汤锅|园|民宿|美容院|房|排挡|府|庄|栈|队|批发|苑|养殖户|棋牌|农家乐|货运|" +
  37. "城|社|基地|会|服务|娱乐|种植|百货|汽修|农家菜|亭|小吃|快餐|粮库|卫生院|书画院|面|门窗|鸡排|屋|橱|堂|肉铺|服务|服饰|/*)$")
  38. //包含
  39. var con_strReg *regexp.Regexp = regexp.MustCompile("(\\?|?|%|代码标识|删除|错误|吊销|注销|发起人|待清理|&#|护照号|身份证号|" +
  40. "法人|&nbsp|国家拨入|借款|积累资金|单位自有|认股人|--|、|&|`|美元|[\u4e00-\u9fa5]{2,6}·[\u4e00-\u9fa5]{2,6})|" +
  41. "[a-zA-Z]{5,}")
  42. var uncon_strReg *regexp.Regexp = regexp.MustCompile("(园|政府|集团|公司|有限|合伙|企|院|学|局|处)")
  43. //部署-历史-敏感词库
  44. func initSensitiveWordsData() {
  45. log.Println("初始化敏感词-源数据...")
  46. gteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskGteId)
  47. if err != nil {
  48. log.Fatalln(err)
  49. }
  50. lteid, err := primitive.ObjectIDFromHex(YamlConfig.TaskLteId)
  51. if err != nil {
  52. log.Fatalln(err)
  53. }
  54. log.Println("id段落:",util.BsonTOStringId(gteid),util.BsonTOStringId(lteid))
  55. sess := MixDataMgo.GetMgoConn()
  56. defer MixDataMgo.DestoryMongoConn(sess)
  57. iter := sess.DB(MixDataMgo.DbName).C("unique_qyxy").Find(map[string]interface{}{
  58. "_id": map[string]interface{}{
  59. "$gte": gteid,
  60. "$lte": lteid,
  61. },
  62. }).Iter()
  63. Filter = sensitive.New()
  64. var initnum uint
  65. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  66. initnum++
  67. if initnum%10000==0 {
  68. log.Println("current index ", initnum,tmp["qy_name"])
  69. }
  70. Filter.AddWord(tmp["qy_name"].(string))
  71. }
  72. log.Println("init ok", initnum)
  73. }
  74. //定时增量数据处理---冯
  75. func addTaskSensitiveWordsData() {
  76. defer func() {
  77. if err := recover(); err != nil {
  78. log.Println("func() addTaskSensitiveWordsData",err)
  79. }
  80. }()
  81. mmmgo, err := util.InitMgoEn("mongodb://172.17.4.187:27082,172.17.145.163:27083", 20, "fengweiqiang", "fwq@123123")
  82. if err != nil {
  83. log.Fatalln(err)
  84. }
  85. con := mmmgo.GetCon()
  86. if con == nil {
  87. log.Fatalln("mgo con err")
  88. }
  89. Filter = sensitive.New()
  90. tick := time.Tick(time.Hour * 24 * 7)//查询七天前
  91. for {//定时任务
  92. ctime := <-tick
  93. cronData := time.Date(ctime.Year(), ctime.Month(), ctime.Day()-7, ctime.Hour(), ctime.Minute(), ctime.Second(), 0, time.Local)
  94. findByupdate, err := con.Database("mixdata").Collection("qyxy_std").Find(nil, bson.M{
  95. "updatetime": bson.M{"$gte": cronData.Unix()},
  96. }, options.Find().SetProjection(bson.M{"company_name": 1, "updatetime": 1,"company_type": 1,"company_type_old": 1}))
  97. if err != nil {
  98. log.Println("tick err", cronData)
  99. continue
  100. }
  101. defer findByupdate.Close(nil)
  102. for tmp := make(map[string]interface{}); findByupdate.Next(nil); tmp = map[string]interface{}{} {
  103. err := findByupdate.Decode(&tmp)
  104. if err == nil {
  105. if company_name, ok := tmp["company_name"].(string); ok {
  106. if reglen.MatchString(company_name) || strReg.MatchString(company_name) ||
  107. !uncon_strReg.MatchString(company_name)|| !unstart_strReg.MatchString(company_name)||
  108. start_strReg.MatchString(company_name)|| end_strReg.MatchString(company_name)||
  109. con_strReg.MatchString(company_name) {
  110. continue
  111. }
  112. if strings.Contains(util.ObjToString(tmp["company_type"]),"个人")||
  113. strings.Contains(util.ObjToString(tmp["company_type"]),"个体")||
  114. strings.Contains(util.ObjToString(tmp["company_type_old"]),"个人")||
  115. strings.Contains(util.ObjToString(tmp["company_type_old"]),"个体") {
  116. continue
  117. }
  118. //存mgo
  119. con.Database("mixdata").Collection("unique_qyxy").InsertOne(nil, bson.M{
  120. "qy_name": company_name,
  121. })
  122. //存敏感词
  123. Filter.AddWord(company_name)
  124. //存es=判断+新增
  125. dealWithEsData(company_name,util.BsonTOStringId(tmp["_id"]))
  126. }
  127. }
  128. }
  129. log.Println("tick ok", cronData)
  130. }
  131. }
  132. //处理是否新增es
  133. func dealWithEsData(name string,tmpid string) {
  134. query:= `{"query":{"bool":{"must":[{"term":{"`+es_index+`.name":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"facets":{}}`
  135. tmp := make(map[string]interface{})
  136. json.Unmarshal([]byte(query),&tmp)
  137. searchResult, err := Client_Es.Search().Index(es_index).Type(es_type).Source(tmp).Do()
  138. if err != nil {
  139. log.Println("从ES查询出错", err.Error())
  140. }else {
  141. data := make(map[string]interface{},0)
  142. if searchResult.Hits != nil {
  143. for _, hit := range searchResult.Hits.Hits {
  144. json.Unmarshal(*hit.Source, &data)
  145. }
  146. }
  147. if len(data)==0{
  148. //log.Println("无数据-新增")
  149. _, err := Client_Es.Index().Index(es_index).Type(es_type).Id(tmpid).BodyJson(map[string]interface{}{
  150. "name":name,
  151. "name_word":name,
  152. }).Do()
  153. if err != nil {
  154. //log.Println("新增失败:",name,tmpid)
  155. }
  156. }
  157. }
  158. }
  159. //处理内存分段
  160. func dealWithDataMemory() {
  161. //临时测试
  162. MixDataMgo = &util.MongodbSim{
  163. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
  164. Size: 20,
  165. DbName: "mixdata",
  166. UserName: "fengweiqiang",
  167. PassWord: "fwq@123123",
  168. }
  169. MixDataMgo.InitPool()
  170. sess := MixDataMgo.GetMgoConn()
  171. defer MixDataMgo.DestoryMongoConn(sess)
  172. iter := sess.DB(MixDataMgo.DbName).C("unique_qyxy").Find(map[string]interface{}{
  173. "_id": map[string]interface{}{
  174. "$gte": util.StringTOBsonId("1fffffffffffffffffffffff"),
  175. "$lte": util.StringTOBsonId("9fffffffffffffffffffffff"),
  176. },
  177. }).Sort("_id").Iter()
  178. Filter = sensitive.New()
  179. var initnum uint
  180. saveIdArr ,start_id:= make([]map[string]string,0),""
  181. var m runtime.MemStats
  182. for tmp := map[string]interface{}{}; iter.Next(&tmp); tmp = map[string]interface{}{} {
  183. if start_id=="" {
  184. start_id = util.BsonTOStringId(tmp["_id"])
  185. }
  186. Filter.AddWord(tmp["qy_name"].(string))
  187. initnum++
  188. if initnum%50000==0 {
  189. runtime.ReadMemStats(&m)
  190. men :=util.ToMegaBytes(m.HeapAlloc)
  191. log.Printf("current index %d\tos %.2f M",initnum, men)
  192. if men>7.5*1024 { //7.5G
  193. saveIdArr = append(saveIdArr, map[string]string{
  194. "start":start_id,
  195. "end":util.BsonTOStringId(tmp["_id"]),
  196. })
  197. log.Println("分段:",start_id,util.BsonTOStringId(tmp["_id"]),men)
  198. Filter = sensitive.New()
  199. runtime.GC()
  200. start_id = ""
  201. time.Sleep(time.Second*30)
  202. }
  203. }
  204. }
  205. saveIdArr = append(saveIdArr, map[string]string{
  206. "start":start_id,
  207. "end":"",
  208. })
  209. for k,v:=range saveIdArr{
  210. log.Println("第",k,"段",v["start"],v["end"])
  211. }
  212. log.Println("memory is ok", initnum)
  213. }
  214. func temporaryTest() {
  215. log.Println("测试......导出数据")
  216. q := map[string]interface{}{
  217. "check_history":map[string]interface{}{
  218. "$exists":0,
  219. },
  220. }
  221. sess := MixDataMgo.GetMgoConn()
  222. defer MixDataMgo.DestoryMongoConn(sess)
  223. it := sess.DB(MixDataMgo.DbName).C("winner_err_new").Find(&q).Iter()
  224. total,isOK:=0,0
  225. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  226. if total % 100 == 0 {
  227. log.Println("current index",total,isOK)
  228. }
  229. name:=util.ObjToString(tmp["name"])
  230. tmpid := util.BsonTOStringId(tmp["_id"])
  231. new_name,b :=dealWithNameScoreRules(name)
  232. if new_name!="" && b {
  233. isOK++
  234. MixDataMgo.UpdateById("winner_err_new",tmpid,map[string]interface{}{
  235. "$set": map[string]interface{}{
  236. "is_word": 1,
  237. "name_word" : new_name,
  238. },
  239. })
  240. }else {
  241. MixDataMgo.UpdateById("winner_err_new",tmpid,map[string]interface{}{
  242. "$set": map[string]interface{}{
  243. "is_word": -1,
  244. "name_word" : new_name,
  245. },
  246. })
  247. }
  248. tmp = make(map[string]interface{})
  249. }
  250. log.Println("is over",total,isOK)
  251. }