task.go 13 KB


  1. package main
  2. import (
  3. "buyer_data/config"
  4. "database/sql"
  5. "fmt"
  6. "github.com/google/uuid"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. func taskInfo1() {
  17. sess := MongoTool.GetMgoConn()
  18. defer MongoTool.DestoryMongoConn(sess)
  19. ch := make(chan bool, config.Conf.Serve.Thread)
  20. wg := &sync.WaitGroup{}
  21. field := bson.M{"buyer_name": 1, "buyerclass": 1, "area": 1, "city": 1, "district": 1}
  22. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_enterprise").Find(nil).Select(field).Iter()
  23. count := 0
  24. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  25. if count%2000 == 0 {
  26. log.Info(fmt.Sprintf("current --- %d", count))
  27. }
  28. ch <- true
  29. wg.Add(1)
  30. go func(tmp map[string]interface{}) {
  31. defer func() {
  32. <-ch
  33. wg.Done()
  34. }()
  35. save := make(map[string]interface{})
  36. name := util.ObjToString(tmp["buyer_name"])
  37. info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name})
  38. if len(*info) > 0 {
  39. save["name"] = name
  40. save["name_id"] = util.ObjToString((*info)["_id"])
  41. save["name_id_source"] = 1
  42. save["type"] = util.ObjToString((*info)["company_type_old"])
  43. save["buyerclass"] = util.ObjToString(tmp["buyerclass"])
  44. if area := util.ObjToString(tmp["area"]); area != "" {
  45. save["area"] = area
  46. if city := util.ObjToString(tmp["city"]); city != "" {
  47. save["city"] = city
  48. }
  49. if district := util.ObjToString(tmp["district"]); district != "" {
  50. save["city"] = district
  51. }
  52. } else {
  53. if area = util.ObjToString((*info)["company_area"]); area != "" {
  54. save["area"] = area
  55. if city := util.ObjToString((*info)["company_city"]); city != "" {
  56. save["city"] = city
  57. }
  58. if district := util.ObjToString((*info)["company_district"]); district != "" {
  59. save["city"] = district
  60. }
  61. }
  62. }
  63. save["reliability"] = 1
  64. save["legal_person"] = util.ObjToString((*info)["legal_person"])
  65. if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 {
  66. save["historyname"] = arr
  67. }
  68. save["status"] = util.ObjToString((*info)["company_status"])
  69. save["comeintime"] = time.Now().Unix()
  70. } else {
  71. save["name"] = name
  72. save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "")
  73. save["name_id_source"] = 4
  74. save["comeintime"] = time.Now().Unix()
  75. }
  76. savePool <- save
  77. }(tmp)
  78. tmp = make(map[string]interface{})
  79. }
  80. }
  81. func taskInfo2() {
  82. sess := MongoTool.GetMgoConn()
  83. defer MongoTool.DestoryMongoConn(sess)
  84. ch := make(chan bool, config.Conf.Serve.Thread)
  85. wg := &sync.WaitGroup{}
  86. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_err").Find(nil).Iter()
  87. count := 0
  88. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  89. if count%2000 == 0 {
  90. log.Info(fmt.Sprintf("current --- %d", count))
  91. }
  92. ch <- true
  93. wg.Add(1)
  94. go func(tmp map[string]interface{}) {
  95. defer func() {
  96. <-ch
  97. wg.Done()
  98. }()
  99. save := make(map[string]interface{})
  100. name := util.ObjToString(tmp["name"])
  101. info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name})
  102. if len(*info) > 0 {
  103. save["name"] = name
  104. save["name_id"] = util.ObjToString((*info)["_id"])
  105. save["name_id_source"] = 1
  106. save["type"] = util.ObjToString((*info)["company_type_old"])
  107. save["buyerclass"] = util.ObjToString(tmp["buyerclass"])
  108. if area := util.ObjToString(tmp["area"]); area != "" {
  109. save["area"] = area
  110. if city := util.ObjToString(tmp["city"]); city != "" {
  111. save["city"] = city
  112. }
  113. if district := util.ObjToString(tmp["district"]); district != "" {
  114. save["city"] = district
  115. }
  116. } else {
  117. if area = util.ObjToString((*info)["company_area"]); area != "" {
  118. save["area"] = area
  119. if city := util.ObjToString((*info)["company_city"]); city != "" {
  120. save["city"] = city
  121. }
  122. if district := util.ObjToString((*info)["company_district"]); district != "" {
  123. save["city"] = district
  124. }
  125. }
  126. }
  127. save["reliability"] = 1
  128. save["legal_person"] = util.ObjToString((*info)["legal_person"])
  129. if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 {
  130. save["historyname"] = arr
  131. }
  132. save["status"] = util.ObjToString((*info)["company_status"])
  133. save["comeintime"] = time.Now().Unix()
  134. } else {
  135. save["name"] = name
  136. save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "")
  137. save["name_id_source"] = 4
  138. save["comeintime"] = time.Now().Unix()
  139. }
  140. savePool <- save
  141. }(tmp)
  142. tmp = make(map[string]interface{})
  143. }
  144. }
  145. func hisNameFuc(tmp map[string]interface{}) []string {
  146. var nameArr []string
  147. if names, ok := tmp["history_names"].([]interface{}); ok && len(names) > 0 {
  148. for _, n := range names {
  149. nameArr = append(nameArr, util.ObjToString(n))
  150. }
  151. }
  152. if hisname := util.ObjToString(tmp["history_name"]); hisname != "" {
  153. for _, s := range strings.Split(hisname, ",") {
  154. if !strings.Contains(strings.Join(nameArr, ","), s) {
  155. nameArr = append(nameArr, s)
  156. }
  157. }
  158. }
  159. return nameArr
  160. }
  161. func taskMysql() {
  162. pool := make(chan bool, 2) //控制线程数
  163. wg := &sync.WaitGroup{}
  164. finalId := 0
  165. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s where identity_type = 1 ORDER BY id DESC LIMIT 1 ", "dws_f_ent_baseinfo"))
  166. if len(*lastInfo) > 0 {
  167. finalId = util.IntAll((*lastInfo)[0]["id"])
  168. }
  169. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  170. lastid, count := 0, 0
  171. for {
  172. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  173. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d and identity_type = 1 ORDER BY id ASC limit 10000", "dws_f_ent_baseinfo", lastid)
  174. var stmtOut *sql.Stmt
  175. var tx *sql.Tx
  176. var err error
  177. if tx == nil {
  178. stmtOut, err = MysqlTool.DB.Prepare(q)
  179. } else {
  180. stmtOut, err = tx.Prepare(q)
  181. }
  182. rows, err := stmtOut.Query()
  183. if err != nil {
  184. log.Error("mysql query err ", zap.Error(err))
  185. }
  186. columns, err := rows.Columns()
  187. if finalId == lastid {
  188. log.Info("----finish-----", zap.Int("count: ", count))
  189. break
  190. }
  191. for rows.Next() {
  192. scanArgs := make([]interface{}, len(columns))
  193. values := make([]interface{}, len(columns))
  194. ret := make(map[string]interface{})
  195. for k := range values {
  196. scanArgs[k] = &values[k]
  197. }
  198. err = rows.Scan(scanArgs...)
  199. if err != nil {
  200. log.Error("mysql scan err ", zap.Error(err))
  201. break
  202. }
  203. for i, col := range values {
  204. if col == nil {
  205. ret[columns[i]] = nil
  206. } else {
  207. switch val := (*scanArgs[i].(*interface{})).(type) {
  208. case byte:
  209. ret[columns[i]] = val
  210. break
  211. case []byte:
  212. v := string(val)
  213. switch v {
  214. case "\x00": // 处理数据类型为bit的情况
  215. ret[columns[i]] = 0
  216. case "\x01": // 处理数据类型为bit的情况
  217. ret[columns[i]] = 1
  218. default:
  219. ret[columns[i]] = v
  220. break
  221. }
  222. break
  223. case time.Time:
  224. if val.IsZero() {
  225. ret[columns[i]] = nil
  226. } else {
  227. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  228. }
  229. break
  230. default:
  231. ret[columns[i]] = val
  232. }
  233. }
  234. }
  235. lastid = util.IntAll(ret["id"])
  236. count++
  237. if count%2000 == 0 {
  238. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  239. }
  240. pool <- true
  241. wg.Add(1)
  242. go func(tmp map[string]interface{}) {
  243. defer func() {
  244. <-pool
  245. wg.Done()
  246. }()
  247. name := util.ObjToString(tmp["name"])
  248. save := make(map[string]interface{})
  249. if cid := util.ObjToString(tmp["company_id"]); cid != "" {
  250. bfo, _ := MongoTool.FindOne(config.Conf.DB.Mongo.SaveColl, bson.M{"name_id": cid})
  251. if len(*bfo) > 0 {
  252. return
  253. }
  254. info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name})
  255. save["name"] = name
  256. save["name_id"] = cid
  257. save["name_id_source"] = 1
  258. save["type"] = util.ObjToString((*info)["company_type_old"])
  259. if area := util.ObjToString((*info)["company_area"]); area != "" {
  260. save["area"] = area
  261. if city := util.ObjToString((*info)["company_city"]); city != "" {
  262. save["city"] = city
  263. }
  264. if district := util.ObjToString((*info)["company_district"]); district != "" {
  265. save["city"] = district
  266. }
  267. }
  268. save["reliability"] = 1
  269. save["legal_person"] = util.ObjToString((*info)["legal_person"])
  270. if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 {
  271. save["historyname"] = arr
  272. }
  273. save["status"] = util.ObjToString((*info)["company_status"])
  274. updatePool <- []map[string]interface{}{
  275. {"name_id": cid},
  276. {"$set": save},
  277. }
  278. } else {
  279. save["name"] = name
  280. save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "")
  281. save["name_id_source"] = 4
  282. save["comeintime"] = time.Now().Unix()
  283. savePool <- save
  284. }
  285. }(ret)
  286. ret = make(map[string]interface{})
  287. }
  288. _ = rows.Close()
  289. stmtOut.Close()
  290. wg.Wait()
  291. }
  292. log.Info(fmt.Sprintf("over --- %d", count))
  293. }
  294. func taskInfo3() {
  295. sess := MongoTool.GetMgoConn()
  296. defer MongoTool.DestoryMongoConn(sess)
  297. ch := make(chan bool, config.Conf.Serve.Thread)
  298. wg := &sync.WaitGroup{}
  299. f := bson.M{"reliability": nil}
  300. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.SaveColl).Find(f).Iter()
  301. count := 0
  302. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  303. if count%2000 == 0 {
  304. log.Info(fmt.Sprintf("current --- %d", count))
  305. }
  306. ch <- true
  307. wg.Add(1)
  308. go func(tmp map[string]interface{}) {
  309. defer func() {
  310. <-ch
  311. wg.Done()
  312. }()
  313. save := make(map[string]interface{})
  314. name := util.ObjToString(tmp["buyer_name"])
  315. info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name})
  316. if len(*info) > 0 {
  317. save["name"] = name
  318. save["name_id"] = util.ObjToString((*info)["_id"])
  319. save["name_id_source"] = 1
  320. save["type"] = util.ObjToString((*info)["company_type_old"])
  321. save["buyerclass"] = util.ObjToString(tmp["buyerclass"])
  322. if area := util.ObjToString(tmp["area"]); area != "" {
  323. save["area"] = area
  324. if city := util.ObjToString(tmp["city"]); city != "" {
  325. save["city"] = city
  326. }
  327. if district := util.ObjToString(tmp["district"]); district != "" {
  328. save["city"] = district
  329. }
  330. } else {
  331. if area = util.ObjToString((*info)["company_area"]); area != "" {
  332. save["area"] = area
  333. if city := util.ObjToString((*info)["company_city"]); city != "" {
  334. save["city"] = city
  335. }
  336. if district := util.ObjToString((*info)["company_district"]); district != "" {
  337. save["city"] = district
  338. }
  339. }
  340. }
  341. save["reliability"] = 1
  342. save["legal_person"] = util.ObjToString((*info)["legal_person"])
  343. if arr := hisNameFuc(tmp); arr != nil && len(arr) > 0 {
  344. save["historyname"] = arr
  345. }
  346. save["status"] = util.ObjToString((*info)["company_status"])
  347. save["comeintime"] = time.Now().Unix()
  348. } else {
  349. save["name"] = name
  350. save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "")
  351. save["name_id_source"] = 4
  352. save["comeintime"] = time.Now().Unix()
  353. }
  354. savePool <- save
  355. }(tmp)
  356. tmp = make(map[string]interface{})
  357. }
  358. }
  359. func taskInfo4() {
  360. sess := MongoTool.GetMgoConn()
  361. defer MongoTool.DestoryMongoConn(sess)
  362. ch := make(chan bool, config.Conf.Serve.Thread)
  363. wg := &sync.WaitGroup{}
  364. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_detail_1019").Find(nil).Iter()
  365. count := 0
  366. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  367. if count%20000 == 0 {
  368. log.Info(fmt.Sprintf("current --- %d", count))
  369. }
  370. ch <- true
  371. wg.Add(1)
  372. go func(tmp map[string]interface{}) {
  373. defer func() {
  374. <-ch
  375. wg.Done()
  376. }()
  377. name := util.ObjToString(tmp["accurate_entity_name"])
  378. info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.SaveColl, bson.M{"name": name})
  379. if len(*info) > 0 {
  380. update := make(map[string]interface{})
  381. update["reliability"] = 0
  382. update["legal_person"] = util.ObjToString(tmp["fddbr"])
  383. update["status"] = util.ObjToString((*info)["company_status"])
  384. updatePool <- []map[string]interface{}{
  385. {"_id": (*info)["_id"]},
  386. {"$set": update},
  387. }
  388. } else {
  389. save := make(map[string]interface{})
  390. save["name"] = name
  391. save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "")
  392. save["name_id_source"] = 4
  393. save["reliability"] = 0
  394. save["legal_person"] = util.ObjToString(tmp["fddbr"])
  395. save["status"] = util.ObjToString((*info)["company_status"])
  396. save["comeintime"] = time.Now().Unix()
  397. savePool <- save
  398. }
  399. }(tmp)
  400. tmp = make(map[string]interface{})
  401. }
  402. log.Info(fmt.Sprintf("over --- %d", count))
  403. }
  404. func taskInfo5() {
  405. sess := MongoTool.GetMgoConn()
  406. defer MongoTool.DestoryMongoConn(sess)
  407. ch := make(chan bool, config.Conf.Serve.Thread)
  408. wg := &sync.WaitGroup{}
  409. query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.SaveColl).Find(nil).Iter()
  410. count := 0
  411. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  412. if count%20000 == 0 {
  413. log.Info(fmt.Sprintf("current --- %d", count))
  414. }
  415. ch <- true
  416. wg.Add(1)
  417. go func(tmp map[string]interface{}) {
  418. defer func() {
  419. <-ch
  420. wg.Done()
  421. }()
  422. name := util.ObjToString(tmp["name"])
  423. if ok, _ := redis.Exists(config.Conf.DB.Redis.Code, name); ok {
  424. MongoTool.UpdateById(config.Conf.DB.Mongo.SaveColl, tmp["_id"], bson.M{"$set": bson.M{"del": true}})
  425. } else {
  426. redis.PutCKV(config.Conf.DB.Redis.Code, name, 1)
  427. }
  428. }(tmp)
  429. tmp = make(map[string]interface{})
  430. }
  431. log.Info(fmt.Sprintf("over --- %d", count))
  432. }