standaragency.go 7.1 KB


  1. // standaragency
  2. package main
  3. import (
  4. "dbutil/mongo"
  5. "dbutil/redis"
  6. "encoding/json"
  7. "log"
  8. qu "qfw/util"
  9. "time"
  10. "unicode/utf8"
  11. "go.mongodb.org/mongo-driver/bson/primitive"
  12. "gopkg.in/mgo.v2/bson"
  13. )
  14. //增量处理
  15. func agencyStandarData(db string, query map[string]interface{}) {
  16. defer qu.Catch()
  17. sess := MongoFrom.GetMgoConn()
  18. defer MongoFrom.Close()
  19. it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
  20. "agencyaddr": 1}).Sort("_id").Iter()
  21. index := 0
  22. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  23. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  24. continue
  25. }
  26. agency := qu.ObjToString(tmp["agency"])
  27. if utf8.RuneCountInString(agency) < 5 {
  28. continue
  29. }
  30. infoid := mongo.BsonTOStringId(tmp["_id"])
  31. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  32. entid, _ := redis.GetRedisStr("agency", agencybd, agency)
  33. ps := []map[string]interface{}{}
  34. agencyperson := qu.ObjToString(tmp["agencyperson"])
  35. agencytel := qu.ObjToString(tmp["agencytel"])
  36. if entid == "" { //redis 未存
  37. savetoerr := true
  38. if agencytel != "" {
  39. v := map[string]interface{}{
  40. "contact_person": agencyperson,
  41. "phone": agencytel,
  42. "topscopeclass": comRepTopscopeclass(topscopeclass),
  43. "infoid": infoid,
  44. }
  45. ps = append(ps, v)
  46. data := comHisMegerNewData(agency, "agency", ps)
  47. if data != nil {
  48. _id := MongoTo.Save(agencyent, data)
  49. redis.PutRedis("agency", agencybd, agency, _id.(primitive.ObjectID).Hex(), -1)
  50. savetoerr = false
  51. }
  52. }
  53. if savetoerr {
  54. t := MongoTo.FindOne(agencyerr, map[string]interface{}{"name": agency})
  55. if len(t) < 1 {
  56. MongoTo.Save(agencyerr, map[string]interface{}{
  57. "name": agency,
  58. "check": comMarkdata(agency, "agency"),
  59. "updatetime": time.Now().Unix(),
  60. })
  61. }
  62. }
  63. } else {
  64. if agencytel != "" {
  65. is_exist := false //电话是否存在
  66. for _, v := range ps {
  67. if v["phone"] == agencytel {
  68. is_exist = true
  69. if agencyperson != "" && v["contact_person"] != agencyperson {
  70. v["contact_person"] = agencyperson
  71. v["infoid"] = infoid
  72. bs, _ := json.Marshal(ps) //替换数据,更新
  73. redis.PutRedis("agency", agencybd, agency, bs, -1)
  74. }
  75. continue
  76. }
  77. }
  78. if !is_exist {
  79. v := map[string]interface{}{
  80. "contact_person": agencyperson,
  81. "phone": agencytel,
  82. "topscopeclass": comRepTopscopeclass(topscopeclass),
  83. "infoid": infoid,
  84. }
  85. MongoTo.UpdateById(agencyent, entid,
  86. map[string]interface{}{
  87. "$set": map[string]interface{}{"updatetime": time.Now().Unix()},
  88. "$push": map[string]interface{}{"contact": v},
  89. },
  90. )
  91. }
  92. }
  93. }
  94. tmp = map[string]interface{}{}
  95. if index%100 == 0 {
  96. log.Println("agency index", index)
  97. }
  98. }
  99. log.Println("agency ok index", index)
  100. }
  101. //历史数据处理
  102. func historyagency(db, fromcoll string) {
  103. defer qu.Catch()
  104. log.Println("history start")
  105. sess := MongoFrom.GetMgoConn()
  106. defer MongoFrom.Close()
  107. it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "agency": 1, "agencytel": 1, "agencyperson": 1, "topscopeclass": 1,
  108. "agencyaddr": 1}).Sort("_id").Iter()
  109. index := 0
  110. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  111. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  112. continue
  113. }
  114. _id := mongo.BsonTOStringId(tmp["_id"])
  115. agencychanbool <- true
  116. go func(tmp map[string]interface{}) {
  117. defer func() {
  118. <-agencychanbool
  119. }()
  120. agency := qu.ObjToString(tmp["agency"])
  121. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  122. if agency != "" && utf8.RuneCountInString(agency) > 4 {
  123. agencyperson := qu.ObjToString(tmp["agencyperson"])
  124. agencytel := qu.ObjToString(tmp["agencytel"])
  125. b, _ := redis.ExistRedis("agency", agencybd, agency)
  126. if b { //redis 存在
  127. if agencytel != "" {
  128. strs, _ := redis.GetRedisStr("agency", agencybd, agency)
  129. ps := []map[string]interface{}{}
  130. err := json.Unmarshal([]byte(strs), &ps)
  131. if err == nil {
  132. is_exist := false //电话是否存在
  133. for _, v := range ps {
  134. if v["phone"] == agencytel {
  135. is_exist = true
  136. if agencyperson != "" && v["contact_person"] != agencyperson {
  137. v["contact_person"] = agencyperson
  138. v["infoid"] = _id
  139. bs, _ := json.Marshal(ps) //替换数据,更新
  140. redis.PutRedis("agency", agencybd, agency, bs, -1)
  141. }
  142. continue
  143. }
  144. }
  145. if !is_exist {
  146. v := map[string]interface{}{
  147. "contact_person": agencyperson,
  148. "phone": agencytel,
  149. "topscopeclass": comRepTopscopeclass(topscopeclass),
  150. "infoid": _id,
  151. }
  152. ps = append(ps, v)
  153. bs, _ := json.Marshal(ps)
  154. redis.PutRedis("agency", agencybd, agency, bs, -1)
  155. }
  156. } else {
  157. log.Println("jsonErr", err)
  158. }
  159. }
  160. } else {
  161. val := []map[string]interface{}{}
  162. if agencytel != "" {
  163. tmp := map[string]interface{}{
  164. "contact_person": agencyperson,
  165. "phone": agencytel,
  166. "topscopeclass": comRepTopscopeclass(topscopeclass),
  167. "infoid": _id,
  168. }
  169. val = append(val, tmp)
  170. }
  171. bs, _ := json.Marshal(val)
  172. redis.PutRedis("agency", agencybd, agency, bs, -1)
  173. MongoTo.Save(agencyerr, map[string]interface{}{
  174. "name": agency,
  175. "updatetime": time.Now().Unix(),
  176. })
  177. }
  178. }
  179. }(tmp)
  180. tmp = map[string]interface{}{}
  181. if index%10000 == 0 {
  182. log.Println("index", index, _id)
  183. }
  184. }
  185. log.Println("history ok index", index)
  186. agencyStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
  187. }
  188. //查询agencyerr标准化历史数据
  189. func agencyStandarHistory(db string) {
  190. defer qu.Catch()
  191. log.Println("开始标准化数据--agency", db)
  192. sessto := MongoTo.GetMgoConn()
  193. defer MongoTo.Close()
  194. it := sessto.DB(db).C(agencyerr).Find(map[string]interface{}{}).Iter()
  195. index := 0
  196. entnum := 0
  197. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  198. err_id := mongo.BsonTOStringId(tmp["_id"])
  199. name := qu.ObjToString(tmp["name"])
  200. agencychanbool <- true
  201. go func(tmp map[string]interface{}) {
  202. defer func() {
  203. <-agencychanbool
  204. }()
  205. strs, err := redis.GetRedisStr("agency", agencybd, name)
  206. if err != nil {
  207. return
  208. }
  209. ps := []map[string]interface{}{}
  210. err = json.Unmarshal([]byte(strs), &ps)
  211. if err == nil {
  212. data := comHisMegerNewData(name, "agency", ps)
  213. if data != nil {
  214. MongoTo.Save(agencyent, data)
  215. MongoTo.DeleteById(agencyerr, err_id)
  216. entnum++
  217. } else { //未查询到企业,打标记并存表
  218. num := comMarkdata(name, "agency")
  219. tmp["check"] = num
  220. MongoTo.UpdateById(agencyerr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}})
  221. }
  222. } else {
  223. log.Println("jsonErr", name, err)
  224. }
  225. }(tmp)
  226. if index%1000 == 0 {
  227. log.Println("标准化历史数据--agency", index, err_id, entnum)
  228. }
  229. tmp = map[string]interface{}{}
  230. }
  231. log.Println("标准化数据完成--agency", index, entnum)
  232. }