standarbuyer.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // standarbuyer
  2. package main
  3. import (
  4. "dbutil/mongo"
  5. "dbutil/redis"
  6. "encoding/json"
  7. "log"
  8. qu "qfw/util"
  9. "time"
  10. "unicode/utf8"
  11. "go.mongodb.org/mongo-driver/bson/primitive"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. //增量处理
  15. func buyerStandarData(db string, query map[string]interface{}) {
  16. defer qu.Catch()
  17. sess := MongoFrom.GetMgoConn()
  18. defer MongoFrom.Close()
  19. it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1, "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter()
  20. index := 0
  21. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  22. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  23. continue
  24. }
  25. buyer := qu.ObjToString(tmp["buyer"])
  26. if utf8.RuneCountInString(buyer) < 5 {
  27. continue
  28. }
  29. infoid := mongo.BsonTOStringId(tmp["_id"])
  30. buyerclass := qu.ObjToString(tmp["buyerclass"])
  31. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  32. entid, _ := redis.GetRedisStr("buyer", buyerbd, buyer)
  33. ps := []map[string]interface{}{}
  34. buyerperson := qu.ObjToString(tmp["buyerperson"])
  35. buyertel := qu.ObjToString(tmp["buyertel"])
  36. if entid == "" {
  37. savetoerr := true
  38. if buyerperson != "" || buyertel != "" {
  39. v := map[string]interface{}{
  40. "contact_person": buyerperson,
  41. "phone": buyertel,
  42. "buyerclass": buyerclass,
  43. "topscopeclass": comRepTopscopeclass(topscopeclass),
  44. "infoid": infoid,
  45. }
  46. ps = append(ps, v)
  47. data := comHisMegerNewData(buyer, "buyer", ps)
  48. if data != nil {
  49. _id := MongoTo.Save(buyerent, data)
  50. redis.PutRedis("buyer", buyerbd, buyer, _id.(primitive.ObjectID).Hex(), -1)
  51. savetoerr = false
  52. }
  53. }
  54. if savetoerr {
  55. t := MongoTo.FindOne(buyererr, map[string]interface{}{"name": buyer})
  56. if len(t) < 1 {
  57. MongoTo.Save(buyererr, map[string]interface{}{
  58. "name": buyer,
  59. "buyerlass": buyerclass,
  60. "check": comMarkdata(buyer, "buyer"),
  61. "updatetime": time.Now().Unix(),
  62. })
  63. }
  64. }
  65. } else {
  66. if buyerperson != "" || buyertel != "" {
  67. v := map[string]interface{}{
  68. "contact_person": buyerperson,
  69. "phone": buyertel,
  70. "buyerclass": buyerclass,
  71. "topscopeclass": comRepTopscopeclass(topscopeclass),
  72. "infoid": infoid,
  73. }
  74. data := buyerMegerBuyerclass(entid, v)
  75. MongoTo.UpdateById(buyerent, entid,
  76. map[string]interface{}{
  77. "$set": data,
  78. "$push": map[string]interface{}{"contact": v},
  79. },
  80. )
  81. }
  82. }
  83. tmp = map[string]interface{}{}
  84. if index%100 == 0 {
  85. log.Println("buyer index", index)
  86. }
  87. }
  88. log.Println("buyer ok index", index)
  89. }
  90. //历史数据处理
  91. func historybuyer(db, fromcoll string) {
  92. defer qu.Catch()
  93. log.Println("history start")
  94. sess := MongoFrom.GetMgoConn()
  95. defer MongoFrom.Close()
  96. it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1, "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter()
  97. index := 0
  98. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  99. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  100. continue
  101. }
  102. _id := mongo.BsonTOStringId(tmp["_id"])
  103. buyerchanbool <- true
  104. go func(tmp map[string]interface{}) {
  105. defer func() {
  106. <-buyerchanbool
  107. }()
  108. buyer := qu.ObjToString(tmp["buyer"])
  109. buyerclass := qu.ObjToString(tmp["buyerclass"])
  110. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  111. if buyer != "" && utf8.RuneCountInString(buyer) > 4 {
  112. buyerperson := qu.ObjToString(tmp["buyerperson"])
  113. buyertel := qu.ObjToString(tmp["buyertel"])
  114. b, _ := redis.ExistRedis("buyer", buyerbd, buyer)
  115. if b {
  116. if buyerperson != "" || buyertel != "" {
  117. strs, _ := redis.GetRedisStr("buyer", buyerbd, buyer)
  118. ps := []interface{}{}
  119. err := json.Unmarshal([]byte(strs), &ps)
  120. if err == nil {
  121. v := map[string]interface{}{
  122. "contact_person": buyerperson,
  123. "phone": buyertel,
  124. "buyerclass": buyerclass,
  125. "topscopeclass": comRepTopscopeclass(topscopeclass),
  126. "infoid": _id,
  127. }
  128. ps = append(ps, v)
  129. bs, _ := json.Marshal(ps)
  130. redis.PutRedis("buyer", buyerbd, buyer, bs, -1)
  131. } else {
  132. log.Println("jsonErr", err)
  133. }
  134. }
  135. } else {
  136. val := []map[string]interface{}{}
  137. if buyerperson != "" || buyertel != "" {
  138. tmp := map[string]interface{}{
  139. "contact_person": buyerperson,
  140. "phone": buyertel,
  141. "buyerclass": buyerclass,
  142. "topscopeclass": comRepTopscopeclass(topscopeclass),
  143. "infoid": _id,
  144. }
  145. val = append(val, tmp)
  146. }
  147. bs, _ := json.Marshal(val)
  148. redis.PutRedis("buyer", buyerbd, buyer, bs, -1)
  149. MongoTo.Save(buyererr, map[string]interface{}{
  150. "name": buyer,
  151. "buyerclass": buyerclass,
  152. "updatetime": time.Now().Unix(),
  153. })
  154. }
  155. }
  156. }(tmp)
  157. tmp = map[string]interface{}{}
  158. if index%10000 == 0 {
  159. log.Println("index", index, _id)
  160. }
  161. }
  162. log.Println("history ok index", index)
  163. buyerStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
  164. }
  165. //查询buyererr标准化历史数据
  166. func buyerStandarHistory(db string) {
  167. defer qu.Catch()
  168. log.Println("开始标准化数据--buyer", db)
  169. sessto := MongoTo.GetMgoConn()
  170. defer MongoTo.Close()
  171. it := sessto.DB(db).C(buyererr).Find(map[string]interface{}{}).Iter()
  172. index := 0
  173. entnum := 0
  174. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  175. err_id := mongo.BsonTOStringId(tmp["_id"])
  176. name := qu.ObjToString(tmp["name"])
  177. buyerchanbool <- true
  178. go func(tmp map[string]interface{}) {
  179. defer func() {
  180. <-buyerchanbool
  181. }()
  182. strs, err := redis.GetRedisStr("buyer", buyerbd, name)
  183. if err != nil {
  184. return
  185. }
  186. ps := []map[string]interface{}{}
  187. err = json.Unmarshal([]byte(strs), &ps)
  188. if err == nil {
  189. data := comHisMegerNewData(name, "buyer", ps)
  190. if data != nil {
  191. MongoTo.Save(buyerent, data)
  192. MongoTo.DeleteById(buyererr, err_id)
  193. entnum++
  194. } else { //未查询到企业,打标记并存表
  195. num := comMarkdata(name, "buyer")
  196. tmp["check"] = num
  197. MongoTo.UpdateById(buyererr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}})
  198. }
  199. } else {
  200. log.Println("jsonErr", name, err)
  201. }
  202. }(tmp)
  203. if index%1000 == 0 {
  204. log.Println("标准化历史数据--buyer", index, err_id, entnum)
  205. }
  206. tmp = map[string]interface{}{}
  207. }
  208. log.Println("标准化数据完成--buyer", index, entnum)
  209. }
  210. //企业数据整合(已有标注信息)
  211. func buyerMegerBuyerclass(id string, ps map[string]interface{}) map[string]interface{} {
  212. tmp := MongoEnt.FindById(buyerent, id, bson.M{"buyerclass": 1})
  213. if len(tmp) < 1 {
  214. return nil
  215. }
  216. data := map[string]interface{}{}
  217. buyerclass := tmp["buyerclass"].(primitive.A)
  218. tmpbuyerclass := map[string]bool{}
  219. for _, v := range buyerclass {
  220. tt := qu.ObjToString(v)
  221. tmpbuyerclass[tt] = true
  222. }
  223. tmpbuyerclass[qu.ObjToString(ps["buyerclass"])] = true
  224. newbuyerclass := []interface{}{}
  225. for k, _ := range tmpbuyerclass {
  226. newbuyerclass = append(newbuyerclass, k)
  227. }
  228. data["buyerclass"] = newbuyerclass
  229. data["updatetime"] = time.Now().Unix()
  230. return data
  231. }