main.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/tealeg/xlsx"
  6. "log"
  7. "os"
  8. qu "qfw/util"
  9. "qfw/util/elastic"
  10. "strings"
  11. "sync"
  12. "unicode/utf8"
  13. "go.mongodb.org/mongo-driver/bson/primitive"
  14. )
  15. var (
  16. sysconfig map[string]interface{} //配置文件
  17. save_mgo *MongodbSim
  18. )
  19. func init() {
  20. save_mgo = &MongodbSim{
  21. MongodbAddr: "192.168.3.207:27092",
  22. DbName: "zhengkun",
  23. Size: 5,
  24. }
  25. save_mgo.InitPool()
  26. elastic.InitElasticSize("http://192.168.3.11:9800",20)
  27. }
  28. func dealWithDataXlsx() {
  29. q := map[string]interface{}{}
  30. sess := save_mgo.GetMgoConn()
  31. defer save_mgo.DestoryMongoConn(sess)
  32. it := sess.DB(save_mgo.DbName).C("zk_test_words").Find(&q).Iter()
  33. total:=0
  34. saveArr := make([]map[string]string,0)
  35. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  36. if total % 10000 == 0 {
  37. log.Println("current index",total,tmp["_id"])
  38. }
  39. if total % 30 ==0 {
  40. name:=qu.ObjToString(tmp["name"])
  41. dict := make(map[string]string)
  42. dict["name"] = name
  43. for i:=0; i<5;i++ {
  44. value,total,hit :="","",""
  45. key := "word_"+fmt.Sprintf("%d",i)
  46. if tmp[key]!=nil {
  47. if arr,ok := tmp[key].(primitive.A);ok {
  48. dataArr :=qu.ObjArrToMapArr(arr)
  49. value =qu.ObjToString(dataArr[0]["name"])
  50. if i!=0 {
  51. total = fmt.Sprintf("%d",dataArr[0]["all_words"])
  52. hit = fmt.Sprintf("%d",dataArr[0]["hit_words"])
  53. }
  54. }
  55. }
  56. key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
  57. dict[key] = value
  58. dict[key1] = total
  59. dict[key2] = hit
  60. }
  61. saveArr= append(saveArr,dict)
  62. }
  63. tmp = make(map[string]interface{})
  64. }
  65. os.Remove("words.xlsx") //写excle
  66. f :=xlsx.NewFile()
  67. for i:=0; i<5;i++ {
  68. key := "word_"+fmt.Sprintf("%d",i)
  69. sheet, _ := f.AddSheet("统计"+key)
  70. row := sheet.AddRow()
  71. row.AddCell().Value = "name"
  72. row.AddCell().Value = key
  73. if i!=0 {
  74. row.AddCell().Value = "total"
  75. row.AddCell().Value = "hit"
  76. }
  77. key1,key2:="total"+fmt.Sprintf("%d",i),"hit"+fmt.Sprintf("%d",i)
  78. for _,tmp := range saveArr {
  79. row = sheet.AddRow()
  80. row.AddCell().SetString(tmp["name"])
  81. row.AddCell().SetString(tmp[key])
  82. row.AddCell().SetString(fmt.Sprintf("%s",tmp[key1]))
  83. row.AddCell().SetString(fmt.Sprintf("%s",tmp[key2]))
  84. }
  85. }
  86. err := f.Save("words.xlsx")
  87. if err != nil {
  88. log.Println("保存xlsx失败:", err)
  89. }else {
  90. log.Println("保存xlsx成功:", err)
  91. }
  92. }
  93. func main() {
  94. //导出xlsx
  95. dealWithDataXlsx()
  96. return
  97. defer qu.Catch()
  98. log.Println("处理 ... 指定企业名称 ...")
  99. //分析错误数据
  100. //
  101. q := map[string]interface{}{}
  102. sess := save_mgo.GetMgoConn()
  103. defer save_mgo.DestoryMongoConn(sess)
  104. //细节才需要遍历
  105. it := sess.DB(save_mgo.DbName).C("zk_company_test").Find(&q).Iter()
  106. total:=0
  107. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  108. if total % 10000 == 0 {
  109. log.Println("current index",total,tmp["_id"])
  110. }
  111. name:=qu.ObjToString(tmp["name"])
  112. save_dict := make(map[string]interface{},0)
  113. for i:=0; i<5;i++ {
  114. key := "word_"+fmt.Sprintf("%d",i)
  115. dataArr :=dealWithScoreRules(name,i)
  116. if dataArr ==nil || len(dataArr)<1 {
  117. //无数据
  118. }else {
  119. save_dict[key] = dealWithWordsRules(name,dataArr,i)
  120. }
  121. }
  122. if len(save_dict)>0 {
  123. save_dict["name"] = name
  124. save_mgo.Save("zk_test_words",save_dict)
  125. }
  126. tmp = make(map[string]interface{})
  127. }
  128. }
  129. //分数维度
  130. func dealWithScoreRules(name string,space int) []map[string]interface{} {
  131. key := ""
  132. if space>0&&space<5{
  133. key = fmt.Sprintf("%d",space)
  134. }
  135. query:= `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name_`+key+`","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
  136. if key=="" {
  137. query = `{"query":{"bool":{"must":[{"query_string":{"default_field":"azktest.name","query":"`+name+`"}}],"must_not":[],"should":[]}},"from":0,"size":3,"sort":[],"facets":{}}`
  138. }
  139. client := elastic.GetEsConn()
  140. defer elastic.DestoryEsConn(client)
  141. searchResult, err := client.Search().Index("azktest").Type("azktest").Source(query).Do()
  142. if err != nil {
  143. log.Println("从ES查询出错", err.Error())
  144. return nil
  145. }
  146. resNum := len(searchResult.Hits.Hits)
  147. res := make([]map[string]interface{}, resNum)
  148. if searchResult.Hits != nil {
  149. if resNum < 5000 {
  150. for i, hit := range searchResult.Hits.Hits {
  151. data := make(map[string]interface{},0)
  152. json.Unmarshal(*hit.Source, &data)
  153. res[i] = map[string]interface{}{
  154. "name":data["name"],
  155. "score":*hit.Score,
  156. }
  157. }
  158. } else {
  159. log.Println("查询结果太多,查询到:", resNum, "条")
  160. }
  161. }
  162. return res
  163. }
  164. //击中数量以及比例
  165. func dealWithWordsRules(name string ,source []map[string]interface{},space int) []map[string]interface{} {
  166. nameArr,_ := calculateWordCount(name,space)
  167. newArr := make([]map[string]interface{},0)
  168. for _,v := range source {
  169. total,hit :=0,0
  170. source_name :=qu.ObjToString(v["name"])
  171. _,total = calculateWordCount(source_name,space)
  172. for _,v1 := range nameArr {
  173. if strings.Contains(source_name,v1) {
  174. hit++
  175. }
  176. }
  177. if space==0 {
  178. newArr = append(newArr, map[string]interface{}{
  179. "name":source_name,
  180. "score":qu.Float64All(v["score"]),
  181. })
  182. }else {
  183. newArr = append(newArr, map[string]interface{}{
  184. "name":source_name,
  185. "score":qu.Float64All(v["score"]),
  186. "all_words" : total,
  187. "hit_words" : hit,
  188. })
  189. }
  190. }
  191. return newArr
  192. }
  193. //分词结果
  194. func calculateWordCount(name string,space int) ([]string,int) {
  195. arr := make([]string,0)
  196. total := utf8.RuneCountInString(name)-(space-1)
  197. if name == "" || space<=0 || total<=0 {
  198. return arr,0
  199. }
  200. nameRune := []rune(name)
  201. for i:=0;i<total ;i++ {
  202. new_str := string(nameRune[i:space+i])
  203. arr = append(arr,new_str)
  204. }
  205. return arr,len(arr)
  206. }
  207. func readyDataEs() {
  208. q := map[string]interface{}{}
  209. sess := save_mgo.GetMgoConn()
  210. defer save_mgo.DestoryMongoConn(sess)
  211. //多线程升索引
  212. pool_es := make(chan bool, 10)
  213. wg_es := &sync.WaitGroup{}
  214. //细节才需要遍历
  215. it := sess.DB(save_mgo.DbName).C("zk_company_name").Find(&q).Iter()
  216. total:=0
  217. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  218. if total % 10000 == 0 {
  219. log.Println("current index",total,tmp["_id"])
  220. }
  221. savetmp := make(map[string]interface{}, 0)
  222. savetmp["_id"] = tmp["_id"]
  223. savetmp["name"] = qu.ObjToString(tmp["company_name"])
  224. savetmp["name_1"] = qu.ObjToString(tmp["company_name"])
  225. savetmp["name_2"] = qu.ObjToString(tmp["company_name"])
  226. savetmp["name_3"] = qu.ObjToString(tmp["company_name"])
  227. savetmp["name_4"] = qu.ObjToString(tmp["company_name"])
  228. pool_es <- true
  229. wg_es.Add(1)
  230. go func(savetmp map[string]interface{}) {
  231. defer func() {
  232. <-pool_es
  233. wg_es.Done()
  234. }()
  235. elastic.Save("azktest","azktest", savetmp)
  236. }(savetmp)
  237. tmp = make(map[string]interface{})
  238. }
  239. wg_es.Wait()
  240. log.Println("is over",total)
  241. }