buyertask.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "esindex/config"
  6. "fmt"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "sync"
  11. "time"
  12. )
  13. func buyerTest() {
  14. names := []string{"海南东方新丝路职业学院", "海口旅游职业学院", "海南卫生健康职业学院", "海南科技职业大学", "海南省旅游学校"}
  15. areaMap, err := loadCodeRegionMap(Mysql.DB)
  16. if err != nil {
  17. fmt.Println(err, areaMap)
  18. }
  19. for _, na := range names {
  20. query := fmt.Sprintf(`
  21. SELECT
  22. b.name,
  23. b.area_code,
  24. b.city_code,
  25. b.seo_id,
  26. t.id,
  27. t.name_id,
  28. b.company_id,
  29. t.createtime,
  30. t.updatetime,
  31. class.name AS buyerclass
  32. FROM
  33. dws_f_ent_tags AS t
  34. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  35. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  36. WHERE b.name = '%v'
  37. ;
  38. `, na)
  39. ctx := context.Background()
  40. rows, err := Mysql.DB.QueryContext(ctx, query)
  41. if err != nil {
  42. log.Info("dealData", zap.Any("QueryContext err", err))
  43. }
  44. columns, err := rows.Columns()
  45. if err != nil {
  46. log.Info("buyerOnce", zap.Any("rows.Columns", err))
  47. }
  48. for rows.Next() {
  49. scanArgs := make([]interface{}, len(columns))
  50. values := make([]interface{}, len(columns))
  51. ret := make(map[string]interface{})
  52. //MySQL 更新
  53. update := map[string]interface{}{}
  54. for k := range values {
  55. scanArgs[k] = &values[k]
  56. }
  57. err = rows.Scan(scanArgs...)
  58. if err != nil {
  59. log.Info("buyerOnce", zap.Any("rows.Scan", err))
  60. break
  61. }
  62. for i, col := range values {
  63. if v, ok := col.([]uint8); ok {
  64. ret[columns[i]] = string(v)
  65. } else {
  66. ret[columns[i]] = col
  67. }
  68. }
  69. name := util.ObjToString(ret["name"])
  70. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  71. if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
  72. update["status"] = 1
  73. } else if ruleBuyer(name) { //不符合条件,排除
  74. update["status"] = -1
  75. } else { //默认2,认为可信
  76. update["status"] = 2
  77. }
  78. //1.更新MySQL
  79. where := map[string]interface{}{
  80. "name_id": ret["name_id"],
  81. }
  82. if len(update) > 0 {
  83. Mysql.Update("dws_f_ent_tags", where, update)
  84. }
  85. //2.生索引,status = 1或者2 才生索引
  86. if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
  87. data := make(map[string]interface{}, 0)
  88. data["name"] = name
  89. data["name_id"] = ret["name_id"]
  90. if ret["seo_id"] != nil {
  91. data["seo_id"] = ret["seo_id"]
  92. }
  93. data["id"] = ret["name_id"]
  94. data["buyer_name"] = name
  95. areaCode := util.ObjToString(ret["area_code"])
  96. data["province"] = areaMap[areaCode].Province
  97. cityCode := util.ObjToString(ret["city_code"])
  98. data["city"] = areaMap[cityCode].City
  99. data["buyerclass"] = ret["buyerclass"]
  100. if ret["createtime"] != nil {
  101. if createtime, ok := ret["createtime"].(time.Time); ok {
  102. data["createtime"] = createtime.Unix()
  103. if ret["updatetime"] != nil {
  104. if updatetime, ok := ret["updatetime"].(time.Time); ok {
  105. data["updatetime"] = updatetime.Unix()
  106. }
  107. } else {
  108. data["updatetime"] = createtime.Unix()
  109. }
  110. }
  111. }
  112. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
  113. counts := Mysql.CountBySql(sql)
  114. if counts > 0 {
  115. data["is_contact"] = true
  116. } else {
  117. data["is_contact"] = false
  118. }
  119. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  120. arrEs = append(arrEs, data)
  121. err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  122. if err != nil {
  123. log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
  124. }
  125. // 华为云新集群,存储标讯、项目、凭安数据
  126. if config.Conf.DB.Es.Addr3 != "" {
  127. err = Es3.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  128. if err != nil {
  129. log.Info("buyerOnce", zap.Any("Addr3 InsertOrUpdate err", err))
  130. }
  131. }
  132. }
  133. }
  134. rows.Close()
  135. if err := rows.Err(); err != nil {
  136. log.Info("buyerOnce", zap.Any("err", err))
  137. }
  138. }
  139. }
  140. // buyerOnce 处理增量数据
  141. func buyerOnce() {
  142. if len(specialNames) < 1 {
  143. initSpecialNames()
  144. }
  145. areaMap, err := loadCodeRegionMap(Mysql.DB)
  146. if err != nil {
  147. log.Info("buyerOnce", zap.Error(err), zap.Any("areaMap", areaMap))
  148. }
  149. rowsPerPage := 1000
  150. now := time.Now()
  151. tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
  152. curTime := tarTime.Format("2006-01-02")
  153. countSql := fmt.Sprintf(`
  154. SELECT
  155. count(t.id)
  156. FROM
  157. dws_f_ent_tags AS t
  158. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  159. WHERE t.createtime >= '%v' OR t.updatetime >= '%v'
  160. ORDER BY t.id DESC LIMIT 1
  161. `, curTime, curTime)
  162. dataCounts := Mysql.CountBySql(countSql)
  163. if dataCounts > 0 {
  164. log.Info("buyerOnce", zap.Any(fmt.Sprintf("总数:%s", curTime), dataCounts))
  165. } else {
  166. log.Info("buyerOnce", zap.String(curTime, "没有更新数据"))
  167. return
  168. }
  169. lastid, total := 0, 0
  170. realCount := 0
  171. for {
  172. query := fmt.Sprintf(`
  173. SELECT
  174. b.name,
  175. b.area_code,
  176. b.city_code,
  177. b.seo_id,
  178. t.id,
  179. t.name_id,
  180. b.company_id,
  181. t.createtime,
  182. t.updatetime,
  183. c.area,
  184. c.city,
  185. class.name AS buyerclass
  186. FROM
  187. dws_f_ent_tags AS t
  188. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  189. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  190. WHERE (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d )
  191. ORDER BY t.id ASC
  192. LIMIT %d;
  193. `, curTime, curTime, lastid, rowsPerPage)
  194. ctx := context.Background()
  195. rows, err := Mysql.DB.QueryContext(ctx, query)
  196. if err != nil {
  197. log.Info("dealData", zap.Any("QueryContext err", err))
  198. }
  199. if total >= int(dataCounts) {
  200. log.Info("buyerOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
  201. break
  202. }
  203. columns, err := rows.Columns()
  204. if err != nil {
  205. log.Info("buyerOnce", zap.Any("rows.Columns", err))
  206. }
  207. for rows.Next() {
  208. scanArgs := make([]interface{}, len(columns))
  209. values := make([]interface{}, len(columns))
  210. ret := make(map[string]interface{})
  211. //MySQL 更新
  212. update := map[string]interface{}{}
  213. for k := range values {
  214. scanArgs[k] = &values[k]
  215. }
  216. err = rows.Scan(scanArgs...)
  217. if err != nil {
  218. log.Info("buyerOnce", zap.Any("rows.Scan", err))
  219. break
  220. }
  221. for i, col := range values {
  222. if v, ok := col.([]uint8); ok {
  223. ret[columns[i]] = string(v)
  224. } else {
  225. ret[columns[i]] = col
  226. }
  227. }
  228. total++
  229. if total%100 == 0 {
  230. log.Info("buyerOnce", zap.Int("current total", total))
  231. }
  232. lastid = util.IntAll(ret["id"])
  233. name := util.ObjToString(ret["name"])
  234. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  235. if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
  236. realCount++
  237. update["status"] = 1
  238. } else if ruleBuyer(name) { //不符合条件,排除
  239. update["status"] = -1
  240. } else { //默认2,认为可信
  241. realCount++
  242. update["status"] = 2
  243. }
  244. //1.更新MySQL
  245. where := map[string]interface{}{
  246. "name_id": ret["name_id"],
  247. }
  248. if len(update) > 0 {
  249. Mysql.Update("dws_f_ent_tags", where, update)
  250. }
  251. //2.生索引,status = 1或者2 才生索引
  252. if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
  253. data := make(map[string]interface{}, 0)
  254. data["name"] = name
  255. data["name_id"] = ret["name_id"]
  256. if ret["seo_id"] != nil {
  257. data["seo_id"] = ret["seo_id"]
  258. }
  259. data["id"] = ret["name_id"]
  260. data["buyer_name"] = name
  261. //data["province"] = ret["area"]
  262. //data["city"] = ret["city"]
  263. areaCode := util.ObjToString(ret["area_code"])
  264. data["province"] = areaMap[areaCode].Province
  265. cityCode := util.ObjToString(ret["city_code"])
  266. data["city"] = areaMap[cityCode].City
  267. data["buyerclass"] = ret["buyerclass"]
  268. if ret["createtime"] != nil {
  269. if createtime, ok := ret["createtime"].(time.Time); ok {
  270. data["createtime"] = createtime.Unix()
  271. if ret["updatetime"] != nil {
  272. if updatetime, ok := ret["updatetime"].(time.Time); ok {
  273. data["updatetime"] = updatetime.Unix()
  274. }
  275. } else {
  276. data["updatetime"] = createtime.Unix()
  277. }
  278. }
  279. }
  280. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
  281. counts := Mysql.CountBySql(sql)
  282. if counts > 0 {
  283. data["is_contact"] = true
  284. } else {
  285. data["is_contact"] = false
  286. }
  287. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  288. arrEs = append(arrEs, data)
  289. err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  290. if err != nil {
  291. log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
  292. }
  293. // 华为云新集群,存储标讯、项目、凭安数据
  294. if config.Conf.DB.Es.Addr3 != "" {
  295. err = Es3.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  296. if err != nil {
  297. log.Info("buyerOnce", zap.Any("Addr3 InsertOrUpdate err", err))
  298. }
  299. }
  300. }
  301. }
  302. rows.Close()
  303. if err := rows.Err(); err != nil {
  304. log.Info("buyerOnce", zap.Any("err", err))
  305. }
  306. }
  307. log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
  308. }
  309. // dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
  310. func initSpecialNames() {
  311. sess := MgoS.GetMgoConn()
  312. defer MgoS.DestoryMongoConn(sess)
  313. query := sess.DB(MgoS.DbName).C("special_enterprise").Find(nil).Iter()
  314. count := 0
  315. log.Info("dealSpecialNames", zap.String("special_enterprise", "special_enterprise"))
  316. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  317. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  318. if specialNames[tmp["company_name"].(string)] {
  319. continue
  320. } else {
  321. specialNames[tmp["company_name"].(string)] = true
  322. }
  323. }
  324. }
  325. //2.
  326. time.Sleep(time.Second)
  327. log.Info("dealSpecialNames", zap.String("special_foundation", "special_foundation"))
  328. query2 := sess.DB("mixdata").C("special_foundation").Find(nil).Iter()
  329. count = 0
  330. for tmp := make(map[string]interface{}); query2.Next(tmp); count++ {
  331. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  332. if specialNames[tmp["company_name"].(string)] {
  333. continue
  334. } else {
  335. specialNames[tmp["company_name"].(string)] = true
  336. }
  337. }
  338. }
  339. //3.
  340. time.Sleep(time.Second)
  341. log.Info("dealSpecialNames", zap.String("special_gov_unit", "special_gov_unit"))
  342. query3 := sess.DB("mixdata").C("special_gov_unit").Find(nil).Iter()
  343. count = 0
  344. for tmp := make(map[string]interface{}); query3.Next(tmp); count++ {
  345. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  346. if specialNames[tmp["company_name"].(string)] {
  347. continue
  348. } else {
  349. specialNames[tmp["company_name"].(string)] = true
  350. }
  351. }
  352. }
  353. //4.
  354. time.Sleep(time.Second)
  355. log.Info("dealSpecialNames", zap.String("special_hongkong_company", "special_hongkong_company"))
  356. query4 := sess.DB("mixdata").C("special_hongkong_company").Find(nil).Iter()
  357. count = 0
  358. for tmp := make(map[string]interface{}); query4.Next(tmp); count++ {
  359. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  360. if specialNames[tmp["company_name"].(string)] {
  361. continue
  362. } else {
  363. specialNames[tmp["company_name"].(string)] = true
  364. }
  365. }
  366. }
  367. //5.
  368. time.Sleep(time.Second)
  369. log.Info("dealSpecialNames", zap.String("special_hongkong_company_history", "special_hongkong_company_history"))
  370. query5 := sess.DB("mixdata").C("special_hongkong_company_history").Find(nil).Iter()
  371. count = 0
  372. for tmp := make(map[string]interface{}); query5.Next(tmp); count++ {
  373. if tmp["use_name"] != nil && tmp["use_name"] != "" {
  374. if specialNames[tmp["use_name"].(string)] {
  375. continue
  376. }
  377. specialNames[tmp["use_name"].(string)] = true
  378. }
  379. }
  380. //6.
  381. time.Sleep(time.Second)
  382. log.Info("dealSpecialNames", zap.String("special_law_office", "special_law_office"))
  383. query6 := sess.DB("mixdata").C("special_law_office").Find(nil).Iter()
  384. count = 0
  385. for tmp := make(map[string]interface{}); query6.Next(tmp); count++ {
  386. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  387. if specialNames[tmp["company_name"].(string)] {
  388. continue
  389. }
  390. specialNames[tmp["company_name"].(string)] = true
  391. }
  392. }
  393. //7.
  394. log.Info("dealSpecialNames", zap.String("special_social_organ", "special_social_organ"))
  395. query7 := sess.DB("mixdata").C("special_social_organ").Find(nil).Iter()
  396. count = 0
  397. for tmp := make(map[string]interface{}); query7.Next(tmp); count++ {
  398. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  399. if specialNames[tmp["company_name"].(string)] {
  400. continue
  401. }
  402. specialNames[tmp["company_name"].(string)] = true
  403. }
  404. }
  405. log.Info("dealSpecialNames", zap.String("special_trade_union", "special_trade_union"))
  406. //8.
  407. query8 := sess.DB("mixdata").C("special_trade_union").Find(nil).Iter()
  408. count = 0
  409. for tmp := make(map[string]interface{}); query8.Next(tmp); count++ {
  410. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  411. if specialNames[tmp["company_name"].(string)] {
  412. continue
  413. }
  414. specialNames[tmp["company_name"].(string)] = true
  415. }
  416. }
  417. //9. 爬虫采集的采购单位
  418. query9 := sess.DB("mixdata").C("special_buyer_name").Find(nil).Iter()
  419. count = 0
  420. for tmp := make(map[string]interface{}); query9.Next(tmp); count++ {
  421. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  422. if specialNames[tmp["company_name"].(string)] {
  423. continue
  424. }
  425. specialNames[tmp["company_name"].(string)] = true
  426. }
  427. }
  428. }
  429. // buyerAll 采购单位全量数据
  430. func buyerAll() {
  431. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  432. winerEsLock := &sync.Mutex{}
  433. //if len(specialNames) < 1 {
  434. // initSpecialNames()
  435. //}
  436. countSql := fmt.Sprintf(`SELECT count(id) FROM dws_f_ent_tags `)
  437. dataCounts := Mysql.CountBySql(countSql)
  438. if dataCounts > 0 {
  439. log.Info("buyerAll", zap.Any("采购单位全量:", dataCounts))
  440. } else {
  441. log.Info("buyerAll", zap.String("采购单位全量是0", "没有更新数据"))
  442. return
  443. }
  444. finalId := 0
  445. lastSql := fmt.Sprintf(`SELECT id FROM dws_f_ent_tags ORDER BY id DESC LIMIT 1`)
  446. lastInfo := Mysql.SelectBySql(lastSql)
  447. if len(*lastInfo) > 0 {
  448. finalId = util.IntAll((*lastInfo)[0]["id"])
  449. } else {
  450. log.Info("buyerAll", zap.String("获取最大ID失败", "没有数据"))
  451. return
  452. }
  453. log.Info("buyerAll", zap.Int("finalId", finalId))
  454. buyerPool := make(chan bool, 10) //控制线程数
  455. wg := &sync.WaitGroup{}
  456. lastid, total := 0, 0
  457. realCount := 0
  458. for {
  459. query := fmt.Sprintf(`
  460. SELECT
  461. b.name,
  462. b.seo_id,
  463. t.id,
  464. t.status,
  465. t.name_id,
  466. b.company_id,
  467. t.createtime,
  468. t.updatetime,
  469. c.area,
  470. c.city,
  471. class.name AS buyerclass
  472. FROM
  473. dws_f_ent_tags AS t
  474. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  475. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  476. LEFT JOIN code_area AS c ON b.city_code = c.code
  477. WHERE t.id > %d and t.status > 0
  478. ORDER BY t.id ASC
  479. LIMIT %d;
  480. `, lastid, 1000)
  481. ctx := context.Background()
  482. rows, err := Mysql.DB.QueryContext(ctx, query)
  483. if err != nil {
  484. log.Info("buyerAll", zap.Any("QueryContext err", err))
  485. }
  486. if finalId == lastid {
  487. log.Info("buyerAll over", zap.Any("total", total), zap.Any("lastid", lastid))
  488. break
  489. }
  490. columns, err := rows.Columns()
  491. if err != nil {
  492. log.Info("buyerAll", zap.Any("rows.Columns", err))
  493. }
  494. for rows.Next() {
  495. scanArgs := make([]interface{}, len(columns))
  496. values := make([]interface{}, len(columns))
  497. ret := make(map[string]interface{})
  498. for k := range values {
  499. scanArgs[k] = &values[k]
  500. }
  501. err = rows.Scan(scanArgs...)
  502. if err != nil {
  503. log.Info("buyerAll", zap.Any("rows.Scan", err))
  504. break
  505. }
  506. for i, col := range values {
  507. if v, ok := col.([]uint8); ok {
  508. ret[columns[i]] = string(v)
  509. } else {
  510. ret[columns[i]] = col
  511. }
  512. }
  513. lastid = util.IntAll(ret["id"])
  514. total++
  515. if total%2000 == 0 {
  516. log.Info("buyerAll", zap.Int("current total", total), zap.Int("lastid", lastid))
  517. }
  518. buyerPool <- true
  519. wg.Add(1)
  520. go func(tmp map[string]interface{}) {
  521. defer func() {
  522. <-buyerPool
  523. wg.Done()
  524. }()
  525. name := util.ObjToString(tmp["name"])
  526. //MySQL 更新
  527. //update := map[string]interface{}{}
  528. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  529. //if util.ObjToString(tmp["company_id"]) != "" || specialNames[name] {
  530. // realCount++
  531. // update["status"] = 1
  532. //} else if ruleBuyer(name) { //不符合条件,排除
  533. // update["status"] = -1
  534. //} else { //默认2,认为可信
  535. // realCount++
  536. // update["status"] = 2
  537. //}
  538. ////1.更新MySQL
  539. //where := map[string]interface{}{
  540. // "name_id": tmp["name_id"],
  541. //}
  542. //if len(update) > 0 {
  543. // Mysql.Update("dws_f_ent_tags", where, update)
  544. //}
  545. //2.生索引,status = 1或者2 才生索引
  546. if util.IntAll(tmp["status"]) == 1 || util.IntAll(tmp["status"]) == 2 {
  547. data := make(map[string]interface{}, 0)
  548. data["name"] = name
  549. data["name_id"] = tmp["name_id"]
  550. if ret["seo_id"] != nil {
  551. data["seo_id"] = tmp["seo_id"]
  552. }
  553. data["id"] = tmp["name_id"]
  554. data["buyer_name"] = name
  555. data["province"] = tmp["area"]
  556. data["city"] = tmp["city"]
  557. data["buyerclass"] = tmp["buyerclass"]
  558. if ret["createtime"] != nil {
  559. if createtime, ok := tmp["createtime"].(time.Time); ok {
  560. data["createtime"] = createtime.Unix()
  561. if ret["updatetime"] != nil {
  562. if updatetime, ok := tmp["updatetime"].(time.Time); ok {
  563. data["updatetime"] = updatetime.Unix()
  564. }
  565. } else {
  566. data["updatetime"] = createtime.Unix()
  567. }
  568. }
  569. }
  570. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, tmp["name_id"])
  571. counts := Mysql.CountBySql(sql)
  572. if counts > 0 {
  573. data["is_contact"] = true
  574. } else {
  575. data["is_contact"] = false
  576. }
  577. //写入es
  578. winerEsLock.Lock()
  579. arrEs = append(arrEs, data)
  580. if len(arrEs) >= EsBulkSize {
  581. tmps := arrEs
  582. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  583. arrEs = []map[string]interface{}{}
  584. }
  585. winerEsLock.Unlock()
  586. }
  587. }(ret)
  588. }
  589. if err := rows.Err(); err != nil {
  590. log.Info("buyerAll", zap.Any("rows.Err()", err))
  591. }
  592. err = rows.Close()
  593. if err != nil {
  594. log.Info("buyerAll", zap.Any("rows.Close() err", err))
  595. }
  596. wg.Wait()
  597. }
  598. if len(arrEs) > 0 {
  599. tmps := arrEs
  600. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  601. arrEs = []map[string]interface{}{}
  602. }
  603. log.Info("buyerAll", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
  604. }
  605. type RegionInfo struct {
  606. Province string
  607. City string
  608. District string
  609. }
  610. func loadCodeRegionMap(db *sql.DB) (map[string]RegionInfo, error) {
  611. query := `SELECT code, area, city, district FROM code_area`
  612. rows, err := db.Query(query)
  613. if err != nil {
  614. return nil, err
  615. }
  616. defer rows.Close()
  617. codeMap := make(map[string]RegionInfo)
  618. for rows.Next() {
  619. var code, area, city, district sql.NullString
  620. if err := rows.Scan(&code, &area, &city, &district); err != nil {
  621. return nil, err
  622. }
  623. codeStr := code.String
  624. codeMap[codeStr] = RegionInfo{
  625. Province: area.String,
  626. City: city.String,
  627. District: district.String,
  628. }
  629. }
  630. if err := rows.Err(); err != nil {
  631. return nil, err
  632. }
  633. return codeMap, nil
  634. }