buyertask.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package main
  2. import (
  3. "context"
  4. "esindex/config"
  5. "fmt"
  6. "go.uber.org/zap"
  7. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  9. "sync"
  10. "time"
  11. )
  12. // buyerOnce 处理增量数据
  13. func buyerOnce() {
  14. if len(specialNames) < 1 {
  15. initSpecialNames()
  16. }
  17. rowsPerPage := 1000
  18. now := time.Now()
  19. tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
  20. curTime := tarTime.Format("2006-01-02")
  21. countSql := fmt.Sprintf(`
  22. SELECT
  23. count(t.id)
  24. FROM
  25. dws_f_ent_tags AS t
  26. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  27. WHERE t.createtime >= '%v' OR t.updatetime >= '%v'
  28. ORDER BY t.id DESC LIMIT 1
  29. `, curTime, curTime)
  30. dataCounts := Mysql.CountBySql(countSql)
  31. if dataCounts > 0 {
  32. log.Info("buyerOnce", zap.Any(fmt.Sprintf("总数:%s", curTime), dataCounts))
  33. } else {
  34. log.Info("buyerOnce", zap.String(curTime, "没有更新数据"))
  35. return
  36. }
  37. lastid, total := 0, 0
  38. realCount := 0
  39. for {
  40. query := fmt.Sprintf(`
  41. SELECT
  42. b.name,
  43. b.seo_id,
  44. t.id,
  45. t.name_id,
  46. b.company_id,
  47. t.createtime,
  48. t.updatetime,
  49. c.area,
  50. c.city,
  51. class.name AS buyerclass
  52. FROM
  53. dws_f_ent_tags AS t
  54. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  55. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  56. LEFT JOIN code_area AS c ON b.city_code = c.code
  57. WHERE (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d )
  58. ORDER BY t.id ASC
  59. LIMIT %d;
  60. `, curTime, curTime, lastid, rowsPerPage)
  61. ctx := context.Background()
  62. rows, err := Mysql.DB.QueryContext(ctx, query)
  63. if err != nil {
  64. log.Info("dealData", zap.Any("QueryContext err", err))
  65. }
  66. if total >= int(dataCounts) {
  67. log.Info("buyerOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
  68. break
  69. }
  70. columns, err := rows.Columns()
  71. if err != nil {
  72. log.Info("buyerOnce", zap.Any("rows.Columns", err))
  73. }
  74. for rows.Next() {
  75. scanArgs := make([]interface{}, len(columns))
  76. values := make([]interface{}, len(columns))
  77. ret := make(map[string]interface{})
  78. //MySQL 更新
  79. update := map[string]interface{}{}
  80. for k := range values {
  81. scanArgs[k] = &values[k]
  82. }
  83. err = rows.Scan(scanArgs...)
  84. if err != nil {
  85. log.Info("buyerOnce", zap.Any("rows.Scan", err))
  86. break
  87. }
  88. for i, col := range values {
  89. if v, ok := col.([]uint8); ok {
  90. ret[columns[i]] = string(v)
  91. } else {
  92. ret[columns[i]] = col
  93. }
  94. }
  95. total++
  96. if total%100 == 0 {
  97. log.Info("buyerOnce", zap.Int("current total", total))
  98. }
  99. lastid = util.IntAll(ret["id"])
  100. name := util.ObjToString(ret["name"])
  101. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  102. if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
  103. realCount++
  104. update["status"] = 1
  105. } else if ruleBuyer(name) { //不符合条件,排除
  106. update["status"] = -1
  107. } else { //默认2,认为可信
  108. realCount++
  109. update["status"] = 2
  110. }
  111. //1.更新MySQL
  112. where := map[string]interface{}{
  113. "name_id": ret["name_id"],
  114. }
  115. if len(update) > 0 {
  116. Mysql.Update("dws_f_ent_tags", where, update)
  117. }
  118. //2.生索引,status = 1或者2 才生索引
  119. if util.IntAll(update["status"]) == 1 || util.IntAll(update["status"]) == 2 {
  120. data := make(map[string]interface{}, 0)
  121. data["name"] = name
  122. data["name_id"] = ret["name_id"]
  123. if ret["seo_id"] != nil {
  124. data["seo_id"] = ret["seo_id"]
  125. }
  126. data["id"] = ret["name_id"]
  127. data["buyer_name"] = name
  128. data["province"] = ret["area"]
  129. data["city"] = ret["city"]
  130. data["buyerclass"] = ret["buyerclass"]
  131. if ret["createtime"] != nil {
  132. if createtime, ok := ret["createtime"].(time.Time); ok {
  133. data["createtime"] = createtime.Unix()
  134. if ret["updatetime"] != nil {
  135. if updatetime, ok := ret["updatetime"].(time.Time); ok {
  136. data["updatetime"] = updatetime.Unix()
  137. }
  138. } else {
  139. data["updatetime"] = createtime.Unix()
  140. }
  141. }
  142. }
  143. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
  144. counts := Mysql.CountBySql(sql)
  145. if counts > 0 {
  146. data["is_contact"] = true
  147. } else {
  148. data["is_contact"] = false
  149. }
  150. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  151. arrEs = append(arrEs, data)
  152. err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  153. if err != nil {
  154. log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
  155. }
  156. // 华为云新集群,存储标讯、项目、凭安数据
  157. if config.Conf.DB.Es.Addr3 != "" {
  158. err = Es3.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
  159. if err != nil {
  160. log.Info("buyerOnce", zap.Any("Addr3 InsertOrUpdate err", err))
  161. }
  162. }
  163. }
  164. }
  165. rows.Close()
  166. if err := rows.Err(); err != nil {
  167. log.Info("buyerOnce", zap.Any("err", err))
  168. }
  169. }
  170. log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
  171. }
  172. // dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
  173. func initSpecialNames() {
  174. sess := MgoS.GetMgoConn()
  175. defer MgoS.DestoryMongoConn(sess)
  176. query := sess.DB(MgoS.DbName).C("special_enterprise").Find(nil).Iter()
  177. count := 0
  178. log.Info("dealSpecialNames", zap.String("special_enterprise", "special_enterprise"))
  179. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  180. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  181. if specialNames[tmp["company_name"].(string)] {
  182. continue
  183. } else {
  184. specialNames[tmp["company_name"].(string)] = true
  185. }
  186. }
  187. }
  188. //2.
  189. time.Sleep(time.Second)
  190. log.Info("dealSpecialNames", zap.String("special_foundation", "special_foundation"))
  191. query2 := sess.DB("mixdata").C("special_foundation").Find(nil).Iter()
  192. count = 0
  193. for tmp := make(map[string]interface{}); query2.Next(tmp); count++ {
  194. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  195. if specialNames[tmp["company_name"].(string)] {
  196. continue
  197. } else {
  198. specialNames[tmp["company_name"].(string)] = true
  199. }
  200. }
  201. }
  202. //3.
  203. time.Sleep(time.Second)
  204. log.Info("dealSpecialNames", zap.String("special_gov_unit", "special_gov_unit"))
  205. query3 := sess.DB("mixdata").C("special_gov_unit").Find(nil).Iter()
  206. count = 0
  207. for tmp := make(map[string]interface{}); query3.Next(tmp); count++ {
  208. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  209. if specialNames[tmp["company_name"].(string)] {
  210. continue
  211. } else {
  212. specialNames[tmp["company_name"].(string)] = true
  213. }
  214. }
  215. }
  216. //4.
  217. time.Sleep(time.Second)
  218. log.Info("dealSpecialNames", zap.String("special_hongkong_company", "special_hongkong_company"))
  219. query4 := sess.DB("mixdata").C("special_hongkong_company").Find(nil).Iter()
  220. count = 0
  221. for tmp := make(map[string]interface{}); query4.Next(tmp); count++ {
  222. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  223. if specialNames[tmp["company_name"].(string)] {
  224. continue
  225. } else {
  226. specialNames[tmp["company_name"].(string)] = true
  227. }
  228. }
  229. }
  230. //5.
  231. time.Sleep(time.Second)
  232. log.Info("dealSpecialNames", zap.String("special_hongkong_company_history", "special_hongkong_company_history"))
  233. query5 := sess.DB("mixdata").C("special_hongkong_company_history").Find(nil).Iter()
  234. count = 0
  235. for tmp := make(map[string]interface{}); query5.Next(tmp); count++ {
  236. if tmp["use_name"] != nil && tmp["use_name"] != "" {
  237. if specialNames[tmp["use_name"].(string)] {
  238. continue
  239. }
  240. specialNames[tmp["use_name"].(string)] = true
  241. }
  242. }
  243. //6.
  244. time.Sleep(time.Second)
  245. log.Info("dealSpecialNames", zap.String("special_law_office", "special_law_office"))
  246. query6 := sess.DB("mixdata").C("special_law_office").Find(nil).Iter()
  247. count = 0
  248. for tmp := make(map[string]interface{}); query6.Next(tmp); count++ {
  249. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  250. if specialNames[tmp["company_name"].(string)] {
  251. continue
  252. }
  253. specialNames[tmp["company_name"].(string)] = true
  254. }
  255. }
  256. //7.
  257. log.Info("dealSpecialNames", zap.String("special_social_organ", "special_social_organ"))
  258. query7 := sess.DB("mixdata").C("special_social_organ").Find(nil).Iter()
  259. count = 0
  260. for tmp := make(map[string]interface{}); query7.Next(tmp); count++ {
  261. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  262. if specialNames[tmp["company_name"].(string)] {
  263. continue
  264. }
  265. specialNames[tmp["company_name"].(string)] = true
  266. }
  267. }
  268. log.Info("dealSpecialNames", zap.String("special_trade_union", "special_trade_union"))
  269. //8.
  270. query8 := sess.DB("mixdata").C("special_trade_union").Find(nil).Iter()
  271. count = 0
  272. for tmp := make(map[string]interface{}); query8.Next(tmp); count++ {
  273. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  274. if specialNames[tmp["company_name"].(string)] {
  275. continue
  276. }
  277. specialNames[tmp["company_name"].(string)] = true
  278. }
  279. }
  280. //9. 爬虫采集的采购单位
  281. query9 := sess.DB("mixdata").C("special_buyer_name").Find(nil).Iter()
  282. count = 0
  283. for tmp := make(map[string]interface{}); query9.Next(tmp); count++ {
  284. if tmp["company_name"] != nil && tmp["company_name"] != "" {
  285. if specialNames[tmp["company_name"].(string)] {
  286. continue
  287. }
  288. specialNames[tmp["company_name"].(string)] = true
  289. }
  290. }
  291. }
  292. // buyerAll 采购单位全量数据
  293. func buyerAll() {
  294. arrEs := make([]map[string]interface{}, 0) //最终生索引数据
  295. winerEsLock := &sync.Mutex{}
  296. //if len(specialNames) < 1 {
  297. // initSpecialNames()
  298. //}
  299. countSql := fmt.Sprintf(`SELECT count(id) FROM dws_f_ent_tags `)
  300. dataCounts := Mysql.CountBySql(countSql)
  301. if dataCounts > 0 {
  302. log.Info("buyerAll", zap.Any("采购单位全量:", dataCounts))
  303. } else {
  304. log.Info("buyerAll", zap.String("采购单位全量是0", "没有更新数据"))
  305. return
  306. }
  307. finalId := 0
  308. lastSql := fmt.Sprintf(`SELECT id FROM dws_f_ent_tags ORDER BY id DESC LIMIT 1`)
  309. lastInfo := Mysql.SelectBySql(lastSql)
  310. if len(*lastInfo) > 0 {
  311. finalId = util.IntAll((*lastInfo)[0]["id"])
  312. } else {
  313. log.Info("buyerAll", zap.String("获取最大ID失败", "没有数据"))
  314. return
  315. }
  316. log.Info("buyerAll", zap.Int("finalId", finalId))
  317. buyerPool := make(chan bool, 10) //控制线程数
  318. wg := &sync.WaitGroup{}
  319. lastid, total := 0, 0
  320. realCount := 0
  321. for {
  322. query := fmt.Sprintf(`
  323. SELECT
  324. b.name,
  325. b.seo_id,
  326. t.id,
  327. t.status,
  328. t.name_id,
  329. b.company_id,
  330. t.createtime,
  331. t.updatetime,
  332. c.area,
  333. c.city,
  334. class.name AS buyerclass
  335. FROM
  336. dws_f_ent_tags AS t
  337. LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
  338. LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
  339. LEFT JOIN code_area AS c ON b.city_code = c.code
  340. WHERE t.id > %d and t.status > 0
  341. ORDER BY t.id ASC
  342. LIMIT %d;
  343. `, lastid, 1000)
  344. ctx := context.Background()
  345. rows, err := Mysql.DB.QueryContext(ctx, query)
  346. if err != nil {
  347. log.Info("buyerAll", zap.Any("QueryContext err", err))
  348. }
  349. if finalId == lastid {
  350. log.Info("buyerAll over", zap.Any("total", total), zap.Any("lastid", lastid))
  351. break
  352. }
  353. columns, err := rows.Columns()
  354. if err != nil {
  355. log.Info("buyerAll", zap.Any("rows.Columns", err))
  356. }
  357. for rows.Next() {
  358. scanArgs := make([]interface{}, len(columns))
  359. values := make([]interface{}, len(columns))
  360. ret := make(map[string]interface{})
  361. for k := range values {
  362. scanArgs[k] = &values[k]
  363. }
  364. err = rows.Scan(scanArgs...)
  365. if err != nil {
  366. log.Info("buyerAll", zap.Any("rows.Scan", err))
  367. break
  368. }
  369. for i, col := range values {
  370. if v, ok := col.([]uint8); ok {
  371. ret[columns[i]] = string(v)
  372. } else {
  373. ret[columns[i]] = col
  374. }
  375. }
  376. lastid = util.IntAll(ret["id"])
  377. total++
  378. if total%2000 == 0 {
  379. log.Info("buyerAll", zap.Int("current total", total), zap.Int("lastid", lastid))
  380. }
  381. buyerPool <- true
  382. wg.Add(1)
  383. go func(tmp map[string]interface{}) {
  384. defer func() {
  385. <-buyerPool
  386. wg.Done()
  387. }()
  388. name := util.ObjToString(tmp["name"])
  389. //MySQL 更新
  390. //update := map[string]interface{}{}
  391. //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
  392. //if util.ObjToString(tmp["company_id"]) != "" || specialNames[name] {
  393. // realCount++
  394. // update["status"] = 1
  395. //} else if ruleBuyer(name) { //不符合条件,排除
  396. // update["status"] = -1
  397. //} else { //默认2,认为可信
  398. // realCount++
  399. // update["status"] = 2
  400. //}
  401. ////1.更新MySQL
  402. //where := map[string]interface{}{
  403. // "name_id": tmp["name_id"],
  404. //}
  405. //if len(update) > 0 {
  406. // Mysql.Update("dws_f_ent_tags", where, update)
  407. //}
  408. //2.生索引,status = 1或者2 才生索引
  409. if util.IntAll(tmp["status"]) == 1 || util.IntAll(tmp["status"]) == 2 {
  410. data := make(map[string]interface{}, 0)
  411. data["name"] = name
  412. data["name_id"] = tmp["name_id"]
  413. if ret["seo_id"] != nil {
  414. data["seo_id"] = tmp["seo_id"]
  415. }
  416. data["id"] = tmp["name_id"]
  417. data["buyer_name"] = name
  418. data["province"] = tmp["area"]
  419. data["city"] = tmp["city"]
  420. data["buyerclass"] = tmp["buyerclass"]
  421. if ret["createtime"] != nil {
  422. if createtime, ok := tmp["createtime"].(time.Time); ok {
  423. data["createtime"] = createtime.Unix()
  424. if ret["updatetime"] != nil {
  425. if updatetime, ok := tmp["updatetime"].(time.Time); ok {
  426. data["updatetime"] = updatetime.Unix()
  427. }
  428. } else {
  429. data["updatetime"] = createtime.Unix()
  430. }
  431. }
  432. }
  433. sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, tmp["name_id"])
  434. counts := Mysql.CountBySql(sql)
  435. if counts > 0 {
  436. data["is_contact"] = true
  437. } else {
  438. data["is_contact"] = false
  439. }
  440. //写入es
  441. winerEsLock.Lock()
  442. arrEs = append(arrEs, data)
  443. if len(arrEs) >= EsBulkSize {
  444. tmps := arrEs
  445. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  446. arrEs = []map[string]interface{}{}
  447. }
  448. winerEsLock.Unlock()
  449. }
  450. }(ret)
  451. }
  452. if err := rows.Err(); err != nil {
  453. log.Info("buyerAll", zap.Any("rows.Err()", err))
  454. }
  455. err = rows.Close()
  456. if err != nil {
  457. log.Info("buyerAll", zap.Any("rows.Close() err", err))
  458. }
  459. wg.Wait()
  460. }
  461. if len(arrEs) > 0 {
  462. tmps := arrEs
  463. Es.BulkSave(config.Conf.DB.Es.IndexBuyer, tmps)
  464. arrEs = []map[string]interface{}{}
  465. }
  466. log.Info("buyerAll", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
  467. }