123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- package main
- import (
- "context"
- "esindex/config"
- "fmt"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "time"
- )
- //buyerOnce 处理增量数据
- func buyerOnce() {
- if len(specialNames) < 1 {
- initSpecialNames()
- }
- rowsPerPage := 1000
- now := time.Now()
- tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
- curTime := tarTime.Format("2006-01-02")
- countSql := fmt.Sprintf(`
- SELECT
- count(t.id)
- FROM
- dws_f_ent_tags AS t
- LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
- WHERE t.createtime >= '%v' OR t.updatetime >= '%v'
- ORDER BY t.id DESC LIMIT 1
- `, curTime, curTime)
- dataCounts := Mysql.CountBySql(countSql)
- if dataCounts > 0 {
- log.Info("buyerOnce", zap.Any(fmt.Sprintf("总数:%s", curTime), dataCounts))
- } else {
- log.Info("buyerOnce", zap.String(curTime, "没有更新数据"))
- return
- }
- lastid, total := 0, 0
- realCount := 0
- for {
- query := fmt.Sprintf(`
- SELECT
- b.name,
- t.id,
- t.name_id,
- b.company_id,
- t.createtime,
- t.updatetime,
- c.area,
- c.city,
- class.name AS buyerclass
-
- FROM
- dws_f_ent_tags AS t
- LEFT JOIN code_buyerclass AS class ON class.code = t.labelvalues
- LEFT JOIN dws_f_ent_baseinfo AS b ON b.name_id = t.name_id
- LEFT JOIN code_area AS c ON b.city_code = c.code
- WHERE (t.createtime >= '%v' OR t.updatetime >= '%v') and ( t.id > %d )
- ORDER BY t.id ASC
- LIMIT %d;
- `, curTime, curTime, lastid, rowsPerPage)
- ctx := context.Background()
- rows, err := Mysql.DB.QueryContext(ctx, query)
- if err != nil {
- log.Info("dealData", zap.Any("QueryContext err", err))
- }
- if total >= int(dataCounts) {
- log.Info("buyerOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
- break
- }
- columns, err := rows.Columns()
- if err != nil {
- log.Info("buyerOnce", zap.Any("rows.Columns", err))
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- //MySQL 更新
- update := map[string]interface{}{}
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Info("buyerOnce", zap.Any("rows.Scan", err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- total++
- if total%100 == 0 {
- log.Info("buyerOnce", zap.Int("current total", total))
- }
- lastid = util.IntAll(ret["id"])
- name := util.ObjToString(ret["name"])
- //company_id != ‘’ 或者在凭安 特殊企业或者在爬虫采购单位里
- if util.ObjToString(ret["company_id"]) != "" || specialNames[name] {
- realCount++
- update["status"] = 1
- data := make(map[string]interface{}, 0)
- data["name"] = name
- data["name_id"] = ret["name_id"]
- data["id"] = ret["name_id"]
- data["buyer_name"] = name
- data["province"] = ret["area"]
- data["city"] = ret["city"]
- data["buyerclass"] = ret["buyerclass"]
- if ret["createtime"] != nil {
- if createtime, ok := ret["createtime"].(time.Time); ok {
- data["createtime"] = createtime.Unix()
- if ret["updatetime"] != nil {
- if updatetime, ok := ret["updatetime"].(time.Time); ok {
- data["updatetime"] = updatetime.Unix()
- }
- } else {
- data["updatetime"] = createtime.Unix()
- }
- }
- }
- sql := fmt.Sprintf(`select count(id) from dws_f_ent_contact where name_id = '%v'`, ret["name_id"])
- counts := Mysql.CountBySql(sql)
- if counts > 0 {
- data["is_contact"] = true
- } else {
- data["is_contact"] = false
- }
- //生索引
- arrEs := make([]map[string]interface{}, 0) //最终生索引数据
- arrEs = append(arrEs, data)
- err = Es.InsertOrUpdate(config.Conf.DB.Es.IndexBuyer, arrEs)
- if err != nil {
- log.Info("buyerOnce", zap.Any("InsertOrUpdate err", err))
- }
- }
- //更新MySQL
- where := map[string]interface{}{
- "name_id": ret["name_id"],
- }
- if len(update) > 0 {
- Mysql.Update("dws_f_ent_tags", where, update)
- }
- }
- rows.Close()
- if err := rows.Err(); err != nil {
- log.Info("buyerOnce", zap.Any("err", err))
- }
- }
- log.Info("buyerOnce", zap.Int("结束,总数是:", total), zap.Int("realCount", realCount))
- }
- //dealSpecialNames 拿到凭安 特企数据 以及爬虫采购单位数据
- func initSpecialNames() {
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- query := sess.DB(MgoS.DbName).C("special_enterprise").Find(nil).Iter()
- count := 0
- log.Info("dealSpecialNames", zap.String("special_enterprise", "special_enterprise"))
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //2.
- time.Sleep(time.Second)
- log.Info("dealSpecialNames", zap.String("special_foundation", "special_foundation"))
- query2 := sess.DB("mixdata").C("special_foundation").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query2.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //3.
- time.Sleep(time.Second)
- log.Info("dealSpecialNames", zap.String("special_gov_unit", "special_gov_unit"))
- query3 := sess.DB("mixdata").C("special_gov_unit").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query3.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //4.
- time.Sleep(time.Second)
- log.Info("dealSpecialNames", zap.String("special_hongkong_company", "special_hongkong_company"))
- query4 := sess.DB("mixdata").C("special_hongkong_company").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query4.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //5.
- time.Sleep(time.Second)
- log.Info("dealSpecialNames", zap.String("special_hongkong_company_history", "special_hongkong_company_history"))
- query5 := sess.DB("mixdata").C("special_hongkong_company_history").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query5.Next(tmp); count++ {
- if tmp["use_name"] != nil && tmp["use_name"] != "" {
- specialNames[tmp["use_name"].(string)] = true
- }
- }
- //6.
- time.Sleep(time.Second)
- log.Info("dealSpecialNames", zap.String("special_law_office", "special_law_office"))
- query6 := sess.DB("mixdata").C("special_law_office").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query6.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //7.
- log.Info("dealSpecialNames", zap.String("special_social_organ", "special_social_organ"))
- query7 := sess.DB("mixdata").C("special_social_organ").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query7.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- log.Info("dealSpecialNames", zap.String("special_trade_union", "special_trade_union"))
- //8.
- query8 := sess.DB("mixdata").C("special_trade_union").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query8.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- //9. 爬虫采集的采购单位
- query9 := sess.DB("mixdata").C("special_buyer_name").Find(nil).Iter()
- count = 0
- for tmp := make(map[string]interface{}); query9.Next(tmp); count++ {
- if tmp["company_name"] != nil && tmp["company_name"] != "" {
- specialNames[tmp["company_name"].(string)] = true
- }
- }
- }
|