standarbuyer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // standarbuyer
  2. package main
  3. import (
  4. "dbutil/mongo"
  5. "dbutil/redis"
  6. "encoding/json"
  7. log "github.com/donnie4w/go-logger/logger"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. "gopkg.in/mgo.v2/bson"
  10. qu "qfw/util"
  11. "time"
  12. "unicode/utf8"
  13. )
  14. //增量处理
  15. func buyerStandarData(db string, query map[string]interface{}) {
  16. defer qu.Catch()
  17. sess := MongoFrom.GetMgoConn()
  18. defer MongoFrom.DestoryMongoConn(sess)
  19. it := sess.DB(db).C(extractcoll).Find(query).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1,
  20. "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter()
  21. index := 0
  22. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  23. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  24. continue
  25. }
  26. buyer := qu.ObjToString(tmp["buyer"])
  27. if utf8.RuneCountInString(buyer) < 5 {
  28. if LenReg.MatchString(buyer)&&utf8.RuneCountInString(buyer)==4 {
  29. }else {
  30. continue
  31. }
  32. }
  33. infoid := mongo.BsonTOStringId(tmp["_id"])
  34. buyerclass := qu.ObjToString(tmp["buyerclass"])
  35. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  36. entid, _ := redis.GetRedisStr("buyer", buyerbd, buyer)
  37. ps := []map[string]interface{}{}
  38. buyerperson := qu.ObjToString(tmp["buyerperson"])
  39. buyertel := qu.ObjToString(tmp["buyertel"])
  40. if entid == "" {
  41. savetoerr := true
  42. if buyerperson != "" || buyertel != "" {
  43. v := map[string]interface{}{
  44. "contact_person": buyerperson,
  45. "phone": buyertel,
  46. "buyerclass": buyerclass,
  47. "topscopeclass": comRepTopscopeclass(topscopeclass),
  48. "infoid": infoid,
  49. }
  50. ps = append(ps, v)
  51. data := comHisMegerNewData(buyer, "buyer", ps)
  52. if data != nil {
  53. province, city, district := "", "", ""
  54. province = qu.ObjToString(data["province"])
  55. if province == "" { //省份为空,buyer优先提取区域信息再company_address
  56. province, city, district = GetProvinceCityDistrict([]string{buyer}) //先buyer
  57. if province == "" { //再address
  58. if address := qu.ObjToString(data["company_address"]); address != "" {
  59. province, city, district = GetProvinceCityDistrict([]string{address})
  60. }
  61. }
  62. data["province"] = province
  63. data["city"] = city
  64. data["district"] = district
  65. }
  66. //提取固话和手机号
  67. contactArr := []interface{}{}
  68. contactArr = append(contactArr, v)
  69. latestFixedPhone, latestMobilePhone, timesFixedPhone, timesMobilePhone := getPhone(contactArr)
  70. data["latestfixedphone"] = latestFixedPhone
  71. data["latestmobilephone"] = latestMobilePhone
  72. data["fixedphone"] = timesFixedPhone
  73. data["mobilephone"] = timesMobilePhone
  74. data["institute_type"] = "企业"
  75. _id := MongoTo.Save(buyerent, data)
  76. if _id!=nil {
  77. redis.PutRedis("buyer", buyerbd, buyer, mongo.BsonTOStringId(_id), -1)
  78. }
  79. savetoerr = false
  80. }
  81. }
  82. if savetoerr {
  83. t := MongoTo.FindOne(buyererr, map[string]interface{}{"name": buyer})
  84. if len(t) < 1 {
  85. province, city, district := GetProvinceCityDistrict([]string{buyer})
  86. MongoTo.Save(buyererr, map[string]interface{}{
  87. "name": buyer,
  88. "buyerclass": buyerclass,
  89. "check": comMarkdata(buyer, "buyer"),
  90. "updatetime": time.Now().Unix(),
  91. "province": province,
  92. "city": city,
  93. "district": district,
  94. })
  95. }
  96. }
  97. } else {
  98. if buyerperson != "" && buyertel != "" {
  99. v := map[string]interface{}{
  100. "contact_person": buyerperson,
  101. "phone": buyertel,
  102. "buyerclass": buyerclass,
  103. "topscopeclass": comRepTopscopeclass(topscopeclass),
  104. "infoid": infoid,
  105. }
  106. data := buyerMegerBuyerclass(entid, v)
  107. MongoTo.UpdateById(buyerent, entid,
  108. map[string]interface{}{
  109. "$set": data,
  110. //"$push": map[string]interface{}{"contact": v},
  111. },
  112. )
  113. }
  114. }
  115. tmp = map[string]interface{}{}
  116. if index%10000 == 0 {
  117. log.Debug("buyer index ", index)
  118. }
  119. }
  120. log.Debug("buyer ok index", index)
  121. }
  122. //历史数据处理
  123. func historybuyer(db, fromcoll string) {
  124. defer qu.Catch()
  125. log.Debug("history start")
  126. sess := MongoFrom.GetMgoConn()
  127. defer MongoFrom.Close()
  128. it := sess.DB(db).C(fromcoll).Find(map[string]interface{}{}).Select(bson.M{"repeat": 1, "buyer": 1, "buyertel": 1, "buyerperson": 1, "buyerclass": 1, "topscopeclass": 1}).Sort("_id").Iter()
  129. index := 0
  130. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  131. if qu.IntAll(tmp["repeat"]) > 0 { //重复数据跳过
  132. continue
  133. }
  134. _id := mongo.BsonTOStringId(tmp["_id"])
  135. buyerchanbool <- true
  136. go func(tmp map[string]interface{}) {
  137. defer func() {
  138. <-buyerchanbool
  139. }()
  140. buyer := qu.ObjToString(tmp["buyer"])
  141. buyerclass :=""
  142. if tb ,ok := tmp["buyerclass"].(primitive.A);ok{
  143. tbn := len(tb)
  144. if tbn>0{
  145. buyerclass = qu.ObjToString(tb[tbn-1])
  146. }
  147. }else if tbs ,ok :=tmp["buyerclass"].(string);ok{
  148. buyerclass = tbs
  149. }
  150. topscopeclass, _ := tmp["topscopeclass"].(primitive.A)
  151. if buyer != "" && (utf8.RuneCountInString(buyer) > 4 || (LenReg.MatchString(buyer)&&utf8.RuneCountInString(buyer)==4)) {
  152. buyerperson := qu.ObjToString(tmp["buyerperson"])
  153. buyertel := qu.ObjToString(tmp["buyertel"])
  154. b, _ := redis.ExistRedis("buyer", buyerbd, buyer)
  155. if b {
  156. if buyerperson != "" || buyertel != "" {
  157. strs, _ := redis.GetRedisStr("buyer", buyerbd, buyer)
  158. ps := []interface{}{}
  159. err := json.Unmarshal([]byte(strs), &ps)
  160. if err == nil {
  161. v := map[string]interface{}{
  162. "contact_person": buyerperson,
  163. "phone": buyertel,
  164. "buyerclass": buyerclass,
  165. "topscopeclass": comRepTopscopeclass(topscopeclass),
  166. "infoid": _id,
  167. }
  168. ps = append(ps, v)
  169. bs, _ := json.Marshal(ps)
  170. redis.PutRedis("buyer", buyerbd, buyer, bs, -1)
  171. }
  172. }
  173. return
  174. }
  175. val := []map[string]interface{}{}
  176. if buyerperson != "" || buyertel != "" {
  177. tmp := map[string]interface{}{
  178. "contact_person": buyerperson,
  179. "phone": buyertel,
  180. "buyerclass": buyerclass,
  181. "topscopeclass": comRepTopscopeclass(topscopeclass),
  182. "infoid": _id,
  183. }
  184. val = append(val, tmp)
  185. }
  186. bs, _ := json.Marshal(val)
  187. redis.PutRedis("buyer", buyerbd, buyer, bs, -1)
  188. MongoTo.Save(buyererr, map[string]interface{}{
  189. "name": buyer,
  190. "buyerclass": buyerclass,
  191. "updatetime": time.Now().Unix(),
  192. })
  193. }
  194. }(tmp)
  195. tmp = map[string]interface{}{}
  196. if index%10000 == 0 {
  197. log.Debug("index", index, _id)
  198. }
  199. }
  200. log.Debug("history ok index", index)
  201. buyerStandarHistory(qu.ObjToString(sysconfig["mgotodb"]))
  202. }
  203. //查询buyererr标准化历史数据
  204. func buyerStandarHistory(db string) {
  205. defer qu.Catch()
  206. log.Debug("开始标准化数据--buyer", db)
  207. sessto := MongoTo.GetMgoConn()
  208. defer MongoTo.Close()
  209. it := sessto.DB(db).C(buyererr).Find(map[string]interface{}{}).Iter()
  210. index := 0
  211. entnum := 0
  212. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  213. err_id := mongo.BsonTOStringId(tmp["_id"])
  214. name := qu.ObjToString(tmp["name"])
  215. buyerchanbool <- true
  216. go func(tmp map[string]interface{}) {
  217. defer func() {
  218. <-buyerchanbool
  219. }()
  220. strs, err := redis.GetRedisStr("buyer", buyerbd, name)
  221. if err != nil {
  222. return
  223. }
  224. ps := []map[string]interface{}{}
  225. err = json.Unmarshal([]byte(strs), &ps)
  226. if err == nil {
  227. data := comHisMegerNewData(name, "buyer", ps)
  228. if data != nil {
  229. MongoTo.Save(buyerent, data)
  230. MongoTo.DeleteById(buyererr, err_id)
  231. entnum++
  232. } else { //未查询到企业,打标记并存表
  233. num := comMarkdata(name, "buyer")
  234. tmp["check"] = num
  235. MongoTo.UpdateById(buyererr, err_id, map[string]interface{}{"$set": map[string]interface{}{"check": num}})
  236. }
  237. } else {
  238. log.Debug("jsonErr", name, err)
  239. }
  240. }(tmp)
  241. if index%1000 == 0 {
  242. log.Debug("标准化历史数据--buyer", index, err_id, entnum)
  243. }
  244. tmp = map[string]interface{}{}
  245. }
  246. log.Debug("标准化数据完成--buyer", index, entnum)
  247. }
  248. //企业数据整合(已有标注信息)
  249. func buyerMegerBuyerclass(id string, ps map[string]interface{}) map[string]interface{} {
  250. tmp := MongoEnt.FindById(buyerent, id, bson.M{"buyerclass": 1, "contact": 1})
  251. if len(tmp) < 1 {
  252. return nil
  253. }
  254. data := map[string]interface{}{}
  255. if buyerclass, ok := tmp["buyerclass"].(primitive.A); ok {
  256. bn := len(buyerclass)
  257. if bn > 0 {
  258. data["buyerclass"] = qu.ObjToString(buyerclass[bn-1])
  259. }
  260. } else if sb, ok := tmp["buyerclass"].(string); ok {
  261. data["buyerclass"] = sb
  262. }
  263. data["updatetime"] = time.Now().Unix()
  264. //contact
  265. contact :=make(primitive.A,0)
  266. if tmp["contact"]!=nil {
  267. contact = tmp["contact"].(primitive.A)
  268. }
  269. contact = append(contact, ps)
  270. //bid_contact
  271. bid_contacts, contacts := bid_contact(contact)
  272. if len(bid_contacts) > 0 {
  273. data["bid_contact"] = bid_contacts
  274. }
  275. //sort 200
  276. contact = contacts
  277. data["contact"] = contact
  278. //提取固话和手机号
  279. latestFixedPhone, latestMobilePhone, timesFixedPhone, timesMobilePhone := getPhone(contact)
  280. data["latestfixedphone"] = latestFixedPhone
  281. data["latestmobilephone"] = latestMobilePhone
  282. data["fixedphone"] = timesFixedPhone
  283. data["mobilephone"] = timesMobilePhone
  284. return data
  285. }
  286. func putbuyerreids() {
  287. sess := MongoTo.GetMgoConn()
  288. defer MongoTo.Close()
  289. it := sess.DB("mixdata").C("buyer_enterprise").Find(map[string]interface{}{}).Select(bson.M{"buyer_name": 1}).Iter()
  290. index := 0
  291. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  292. id := mongo.BsonTOStringId(tmp["_id"])
  293. buyer_name, _ := tmp["buyer_name"].(string)
  294. redis.PutRedis("buyer", 2, buyer_name, id, -1)
  295. tmp = map[string]interface{}{}
  296. if index%1000 == 0 {
  297. log.Debug(id, index)
  298. }
  299. }
  300. }
  301. func putbuyerreids__s() {
  302. sess := MongoTo.GetMgoConn()
  303. defer MongoTo.Close()
  304. it := sess.DB("mixdata").C("buyer_enterprise").Find(map[string]interface{}{}).Select(bson.M{"buyer_name": 1}).Iter()
  305. index := 0
  306. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  307. buyer_name, _ := tmp["buyer_name"].(string)
  308. ts, _ := MongoTo.Find("buyer_enterprise", map[string]interface{}{"buyer_name": buyer_name}, nil, bson.M{"buyer_name": 1})
  309. if len(ts) > 1 {
  310. id := mongo.BsonTOStringId(ts[0]["_id"])
  311. log.Debug(buyer_name, id)
  312. MongoTo.DeleteById("buyer_enterprise", id)
  313. }
  314. if index%10000 == 0 {
  315. log.Debug(index)
  316. }
  317. tmp = map[string]interface{}{}
  318. }
  319. }
  320. func modifybuyer() {
  321. sess := MongoTo.GetMgoConn()
  322. defer MongoTo.Close()
  323. it := sess.DB("mixdata").C("buyer_enterprise").Find(map[string]interface{}{"institute_type": "事业单位"}).Iter()
  324. index := 0
  325. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  326. id := mongo.BsonTOStringId(tmp["_id"])
  327. buyer_name, _ := tmp["buyer_name"].(string)
  328. bys, _ := MongoTo.Find(
  329. "buyer_enterprise",
  330. map[string]interface{}{"buyer_name": buyer_name}, nil,
  331. bson.M{"institute_type": 1, "buyer_name": 1},
  332. )
  333. for _, by := range bys {
  334. byid := mongo.BsonTOStringId(by["_id"])
  335. institute_type, _ := by["institute_type"].(string)
  336. if institute_type == "企业" {
  337. MongoTo.Save("buyer_tmp", tmp)
  338. delete(tmp, "_id")
  339. MongoTo.UpdateById("buyer_enterprise", byid, map[string]interface{}{
  340. "$set": tmp,
  341. })
  342. MongoTo.DeleteById("buyer_enterprise", id)
  343. log.Debug(id)
  344. }
  345. }
  346. tmp = map[string]interface{}{}
  347. }
  348. }