123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- package main
- import (
- "context"
- "esindex/config"
- "fmt"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.uber.org/zap"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "sync"
- "time"
- )
- // winnerEsTaskOnce 中标单位每天新增数据
- func winnerEsTaskOnce() {
- defer util.Catch()
- arrEs := []map[string]interface{}{}
- winerEsLock := &sync.Mutex{}
- defer util.Catch()
- now := time.Now()
- tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
- curTime := tarTime.Format("2006-01-02")
- pool := make(chan bool, 15) //控制线程数
- wg := &sync.WaitGroup{}
- rowsPerPage := 10000
- finalId := 0
- lastSql := fmt.Sprintf(`
- SELECT
- id
- FROM
-
- dws_f_ent_baseinfo
- WHERE ( createtime >= '%v' OR updatetime >= '%v') && (identity_type&(1<<1))>0 && company_id !=''
- ORDER BY id DESC LIMIT 1
- `, curTime, curTime)
- lastInfo := Mysql.SelectBySql(lastSql)
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("winnerEsTaskOnce", zap.Int("finalId", finalId))
- lastid, total := 0, 0
- for {
- query := fmt.Sprintf(`
- SELECT
- b.name,
- b.name_id,
- b.id,
- b.company_id,
- b.seo_id,
- c.area,
- c.city,
- b.createtime,
- b.updatetime
- FROM
- dws_f_ent_baseinfo AS b
- LEFT JOIN code_area AS c ON b.city_code = c.code
- WHERE b.id > %d && (identity_type&(1<<1))>0 && b.company_id !='' && ( b.createtime >= '%v' OR b.updatetime >= '%v')
- ORDER BY b.id ASC
- LIMIT %d;
- `, lastid, curTime, curTime, rowsPerPage)
- ctx := context.Background()
- rows, err := Mysql.DB.QueryContext(ctx, query)
- if err != nil {
- log.Info("winnerEsTaskOnce", zap.Any("QueryContext err", err))
- }
- if finalId == lastid {
- log.Info("winnerEsTaskOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
- break
- }
- columns, err := rows.Columns()
- if err != nil {
- log.Info("winnerEsTaskOnce", zap.Any("rows.Columns", err))
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Info("winnerEsTaskOnce", zap.Any("rows.Scan", err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- total++
- if total%1000 == 0 {
- log.Info("winnerEsTaskOnce", zap.Int("current total", total), zap.Any("lastid", lastid))
- }
- lastid = util.IntAll(ret["id"])
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- tmp_id := tmp["name_id"]
- savetmp["_id"] = tmp_id
- savetmp["id"] = tmp_id
- savetmp["name"] = tmp["name"]
- savetmp["winner_name"] = tmp["name"]
- if tmp["seo_id"] != nil {
- savetmp["seo_id"] = tmp["seo_id"]
- }
- createtime := tmp["createtime"].(time.Time)
- savetmp["l_createtime"] = createtime.Unix()
- savetmp["pici"] = createtime.Unix()
- if tmp["updatetime"] != nil {
- updatetime := tmp["updatetime"].(time.Time)
- savetmp["pici"] = updatetime.Unix()
- }
- if province := util.ObjToString(tmp["area"]); province != "" {
- savetmp["province"] = province
- }
- if city := util.ObjToString(tmp["city"]); city != "" {
- savetmp["city"] = city
- }
- winerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= EsBulkSize {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- winerEsLock.Unlock()
- }(ret)
- ret = make(map[string]interface{})
- }
- rows.Close()
- wg.Wait()
- if err := rows.Err(); err != nil {
- log.Info("winnerEsTaskOnce", zap.Any("err", err))
- }
- }
- if len(arrEs) > 0 {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- log.Info("winnerEsTaskOnce", zap.Int("结束,total:", total))
- }
- // winnerEsAll 存量中标单位
- func winnerEsAll() {
- arrEs := []map[string]interface{}{}
- winerEsLock := &sync.Mutex{}
- defer util.Catch()
- winnerPool := make(chan bool, 15) //控制线程数
- wg := &sync.WaitGroup{}
- rowsPerPage := 10000
- finalId := 0
- lastSql := fmt.Sprintf(`
- SELECT
- id
- FROM
-
- dws_f_ent_baseinfo
- WHERE (identity_type&(1<<1))>0 && company_id !=''
- ORDER BY id DESC LIMIT 1
- `)
- lastInfo := Mysql.SelectBySql(lastSql)
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- log.Info("winnerEsAll", zap.Int("finalId", finalId))
- lastid, total := 0, 0
- for {
- query := fmt.Sprintf(`
- SELECT
- b.name,
- b.name_id,
- b.id,
- b.company_id,
- b.seo_id,
- c.area,
- c.city,
- b.createtime,
- b.updatetime
- FROM
- dws_f_ent_baseinfo AS b
- LEFT JOIN code_area AS c ON b.city_code = c.code
- WHERE b.id > %d && (identity_type&(1<<1))>0 && b.company_id !=''
- ORDER BY b.id ASC
- LIMIT %d;
- `, lastid, rowsPerPage)
- ctx := context.Background()
- rows, err := Mysql.DB.QueryContext(ctx, query)
- if err != nil {
- log.Info("winnerEsAll", zap.Any("QueryContext err", err))
- }
- if finalId == lastid {
- log.Info("winnerEsAll over", zap.Any("total", total), zap.Any("lastid", lastid))
- break
- }
- columns, err := rows.Columns()
- if err != nil {
- log.Info("winnerEsAll", zap.Any("rows.Columns", err))
- }
- for rows.Next() {
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Info("winnerEsAll", zap.Any("rows.Scan", err))
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- total++
- if total%1000 == 0 {
- log.Info("winnerEsAll", zap.Int("current total", total), zap.Any("lastid", lastid))
- }
- lastid = util.IntAll(ret["id"])
- winnerPool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-winnerPool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- tmp_id := tmp["name_id"]
- savetmp["_id"] = tmp_id
- savetmp["id"] = tmp_id
- savetmp["name"] = tmp["name"]
- savetmp["winner_name"] = tmp["name"]
- if tmp["seo_id"] != nil {
- savetmp["seo_id"] = tmp["seo_id"]
- }
- createtime := tmp["createtime"].(time.Time)
- savetmp["l_createtime"] = createtime.Unix()
- savetmp["pici"] = createtime.Unix()
- if tmp["updatetime"] != nil {
- updatetime := tmp["updatetime"].(time.Time)
- savetmp["pici"] = updatetime.Unix()
- }
- if province := util.ObjToString(tmp["area"]); province != "" {
- savetmp["province"] = province
- }
- if city := util.ObjToString(tmp["city"]); city != "" {
- savetmp["city"] = city
- }
- winerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= EsBulkSize {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- winerEsLock.Unlock()
- }(ret)
- ret = make(map[string]interface{})
- }
- rows.Close()
- wg.Wait()
- if err := rows.Err(); err != nil {
- log.Info("winnerEsAll", zap.Any("err", err))
- }
- }
- if len(arrEs) > 0 {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- log.Info("winnerEsAll", zap.Int("结束,total:", total))
- }
- func winnerEsTaskOnceOld() {
- defer util.Catch()
- arrEs := []map[string]interface{}{}
- winerEsLock := &sync.Mutex{}
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- now := time.Now()
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local)
- curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime))
- task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime))
- log.Info("winner 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid))
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": mongodb.StringTOBsonId(task_sid),
- "$lt": mongodb.StringTOBsonId(task_eid),
- },
- }
- //mongo
- sess := MgoQ.GetMgoConn()
- defer MgoQ.DestoryMongoConn(sess)
- it_1 := sess.DB(MgoQ.DbName).C("winner_enterprise").Find(&q).Sort("_id").Iter()
- num_1 := 0
- for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ {
- if num_1%2000 == 0 && num_1 > 0 {
- log.Info("winnerEsTaskOnce current", zap.Int("count", num_1))
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- savetmp := map[string]interface{}{}
- tmp_id := mongodb.BsonIdToSId(tmp["_id"])
- savetmp["_id"] = tmp_id
- savetmp["name"] = tmp["company_name"]
- savetmp["winner_name"] = tmp["company_name"]
- savetmp["pici"] = tmp["updatetime"]
- if province := util.ObjToString(tmp["province"]); province != "" {
- savetmp["province"] = province
- }
- if city := util.ObjToString(tmp["city"]); city != "" {
- savetmp["city"] = city
- }
- if text := util.ObjToString(tmp["tag_business"]); text != "" {
- savetmp["tag_business"] = text
- }
- winerEsLock.Lock()
- arrEs = append(arrEs, savetmp)
- if len(arrEs) >= EsBulkSize {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- winerEsLock.Unlock()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- winerEsLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
- arrEs = []map[string]interface{}{}
- }
- winerEsLock.Unlock()
- log.Info("winner over!", zap.Int("总计", num_1))
- }
|