main.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package main
  2. import (
  3. "fmt"
  4. "gorm.io/driver/clickhouse"
  5. "gorm.io/gorm"
  6. "gorm.io/gorm/logger"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "net/url"
  12. "regexp"
  13. )
  14. var (
  15. Mgo *mongodb.MongodbSim
  16. GF GlobalConf
  17. Esa *es.Elastic
  18. Esb *es.Elastic
  19. //Labels []LabelData //全局所有标签规则
  20. )
  21. type DataRes struct {
  22. Name string
  23. Weight float64
  24. }
  25. func main() {
  26. esLabels()
  27. }
  28. // esLabels 组织机构标签生索引
  29. func esLabels() {
  30. InitConfig()
  31. InitMgo()
  32. InitEs()
  33. defer util.Catch()
  34. sess := Mgo.GetMgoConn()
  35. defer Mgo.DestoryMongoConn(sess)
  36. it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Iter()
  37. fmt.Println("taskRun 开始")
  38. username := GF.Clickhouse.Username
  39. password := GF.Clickhouse.Password
  40. host := GF.Clickhouse.Host
  41. encodedPassword := url.QueryEscape(password)
  42. dn := fmt.Sprintf("clickhouse://%s:%s@%s/information?dial_timeout=10s&read_timeout=20s", username, encodedPassword, host)
  43. db, err := gorm.Open(clickhouse.Open(dn), &gorm.Config{
  44. Logger: logger.Default.LogMode(logger.Silent),
  45. })
  46. if err != nil {
  47. log.Fatal("打开数据库失败:", err)
  48. } else {
  49. log.Println("连接数据库成功", db.Name())
  50. }
  51. count := 0
  52. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  53. if count%1000 == 0 {
  54. log.Println("current:", count, tmp["company_name"])
  55. }
  56. // 存在组织机构标签
  57. if _, ok := tmp["company_labels"]; ok {
  58. companyName := util.ObjToString(tmp["company_name"])
  59. AInfo := EntInfo{}
  60. db.Model(&EntInfo{}).Where("company_name = ? ", companyName).Select("company_name", "id").First(&AInfo)
  61. if AInfo.ID != "" {
  62. update := map[string]interface{}{
  63. "tag_labels": tmp["company_labels"],
  64. }
  65. err = Esa.UpdateDocument(GF.Env.Esindex, AInfo.ID, update)
  66. if err != nil {
  67. log.Println(AInfo.ID, "更新", update, "错误")
  68. }
  69. }
  70. }
  71. }
  72. log.Println("over")
  73. }
  74. // dealLabels 处理组织机构标签
  75. func dealLabels() {
  76. REG, _ = regexp.Compile(`\(.*?\)\d*`)
  77. InitConfig()
  78. InitMgo()
  79. //data := map[string]interface{}{
  80. // "company_name": "发展和改革投资",
  81. // "cc_name": "ccdd",
  82. // "name": "发展治理委员会",
  83. //}
  84. defer util.Catch()
  85. sess := Mgo.GetMgoConn()
  86. defer Mgo.DestoryMongoConn(sess)
  87. it := sess.DB(GF.Mongo.DB).C(GF.Mongo.Coll).Find(nil).Select(nil).Iter()
  88. fmt.Println("taskRun 开始")
  89. count := 0
  90. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  91. ResMap := make([]LabelData, 0)
  92. //id := mongodb.BsonIdToSId(tmp["_id"])
  93. if count%1000 == 0 {
  94. log.Println("current:", count, tmp["company_name"])
  95. }
  96. for _, v := range GF.Labels {
  97. v.RegRule = DealRules(v.Rule)
  98. //匹配规则,返回规则内匹配的关键词
  99. //根据识别字段,开始各个字段匹配
  100. for kk, vv := range v.Field {
  101. text := util.ObjToString(tmp[vv])
  102. if text == "" {
  103. continue
  104. }
  105. rs, _ := DFAAnalyRules(text, v.RegRule)
  106. if rs {
  107. //多个不同识别字段,多个权重值
  108. v.TotalWeight = round(v.TotalWeight+v.Weight[kk], 2)
  109. ResMap = append(ResMap, v)
  110. }
  111. }
  112. v.TotalWeight = 0
  113. }
  114. if len(ResMap) > 0 {
  115. // 处理有重复的标签
  116. mergedMap := MergeLabelData(ResMap)
  117. for key, values := range mergedMap {
  118. //fmt.Printf("Sfield: %s\n", key)
  119. datas := make([]DataRes, 0)
  120. resTypes := make([]string, 0)
  121. for _, value := range values {
  122. //fmt.Printf(" Name: %s, Weight: %f\n", value.Name, value.TotalWeight)
  123. dar := DataRes{
  124. Name: value.Name,
  125. Weight: value.TotalWeight,
  126. }
  127. datas = append(datas, dar)
  128. resTypes = append(resTypes, value.Name)
  129. }
  130. // 更新对应字段
  131. update := map[string]interface{}{
  132. key: datas,
  133. "company_labels": resTypes,
  134. }
  135. where := map[string]interface{}{
  136. "company_name": tmp["company_name"],
  137. }
  138. Mgo.Update(GF.Mongo.Coll, where, map[string]interface{}{"$set": update}, true, false)
  139. }
  140. }
  141. }
  142. log.Println("over")
  143. }