package main import ( "context" "esindex/config" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" "time" ) // winnerEsTaskOnce 中标单位每天新增数据 func winnerEsTaskOnce() { defer util.Catch() arrEs := []map[string]interface{}{} winerEsLock := &sync.Mutex{} defer util.Catch() now := time.Now() tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local) curTime := tarTime.Format("2006-01-02") pool := make(chan bool, 15) //控制线程数 wg := &sync.WaitGroup{} rowsPerPage := 10000 finalId := 0 lastSql := fmt.Sprintf(` SELECT id FROM dws_f_ent_baseinfo WHERE ( createtime >= '%v' OR updatetime >= '%v') && (identity_type&(1<<1))>0 && company_id !='' ORDER BY id DESC LIMIT 1 `, curTime, curTime) lastInfo := Mysql.SelectBySql(lastSql) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("winnerEsTaskOnce", zap.Int("finalId", finalId)) lastid, total := 0, 0 for { query := fmt.Sprintf(` SELECT b.name, b.name_id, b.id, b.company_id, b.seo_id, c.area, c.city, b.createtime, b.updatetime FROM dws_f_ent_baseinfo AS b LEFT JOIN code_area AS c ON b.city_code = c.code WHERE b.id > %d && (identity_type&(1<<1))>0 && b.company_id !='' && ( b.createtime >= '%v' OR b.updatetime >= '%v') ORDER BY b.id ASC LIMIT %d; `, lastid, curTime, curTime, rowsPerPage) ctx := context.Background() rows, err := Mysql.DB.QueryContext(ctx, query) if err != nil { log.Info("winnerEsTaskOnce", zap.Any("QueryContext err", err)) } if finalId == lastid { log.Info("winnerEsTaskOnce over", zap.Any("total", total), zap.Any("lastid", lastid)) break } columns, err := rows.Columns() if err != nil { log.Info("winnerEsTaskOnce", zap.Any("rows.Columns", err)) } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Info("winnerEsTaskOnce", zap.Any("rows.Scan", err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } total++ if total%1000 == 0 { log.Info("winnerEsTaskOnce", zap.Int("current total", total), zap.Any("lastid", lastid)) } lastid = util.IntAll(ret["id"]) pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} tmp_id := tmp["name_id"] savetmp["_id"] = tmp_id savetmp["id"] = tmp_id savetmp["name"] = tmp["name"] savetmp["winner_name"] = tmp["name"] if tmp["seo_id"] != nil { savetmp["seo_id"] = tmp["seo_id"] } createtime := tmp["createtime"].(time.Time) savetmp["l_createtime"] = createtime.Unix() savetmp["pici"] = createtime.Unix() if tmp["updatetime"] != nil { updatetime := tmp["updatetime"].(time.Time) savetmp["pici"] = updatetime.Unix() } if province := util.ObjToString(tmp["area"]); province != "" { savetmp["province"] = province } if city := util.ObjToString(tmp["city"]); city != "" { savetmp["city"] = city } winerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) // 华为云新集群,存储标讯、项目、凭安数据 if config.Conf.DB.Es.Addr3 != "" { Es3.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) } arrEs = []map[string]interface{}{} } winerEsLock.Unlock() }(ret) ret = make(map[string]interface{}) } rows.Close() wg.Wait() if err := rows.Err(); err != nil { log.Info("winnerEsTaskOnce", zap.Any("err", err)) } } if len(arrEs) > 0 { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) // 华为云新集群,存储标讯、项目、凭安数据 if config.Conf.DB.Es.Addr3 != "" { Es3.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) } arrEs = []map[string]interface{}{} } log.Info("winnerEsTaskOnce", zap.Int("结束,total:", total)) } // winnerEsAll 存量中标单位 func winnerEsAll() { arrEs := []map[string]interface{}{} winerEsLock := &sync.Mutex{} defer util.Catch() winnerPool := make(chan bool, 15) //控制线程数 wg := &sync.WaitGroup{} rowsPerPage := 10000 finalId := 0 lastSql := fmt.Sprintf(` SELECT id FROM dws_f_ent_baseinfo WHERE (identity_type&(1<<1))>0 && company_id !='' ORDER BY id DESC LIMIT 1 `) lastInfo := Mysql.SelectBySql(lastSql) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("winnerEsAll", zap.Int("finalId", finalId)) lastid, total := 0, 0 for { query := fmt.Sprintf(` SELECT b.name, b.name_id, b.id, b.company_id, b.seo_id, c.area, c.city, b.createtime, b.updatetime FROM dws_f_ent_baseinfo AS b LEFT JOIN code_area AS c ON b.city_code = c.code WHERE b.id > %d && (identity_type&(1<<1))>0 && b.company_id !='' ORDER BY b.id ASC LIMIT %d; `, lastid, rowsPerPage) ctx := context.Background() rows, err := Mysql.DB.QueryContext(ctx, query) if err != nil { log.Info("winnerEsAll", zap.Any("QueryContext err", err)) } if finalId == lastid { log.Info("winnerEsAll over", zap.Any("total", total), zap.Any("lastid", lastid)) break } columns, err := rows.Columns() if err != nil { log.Info("winnerEsAll", zap.Any("rows.Columns", err)) } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Info("winnerEsAll", zap.Any("rows.Scan", err)) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } total++ if total%1000 == 0 { log.Info("winnerEsAll", zap.Int("current total", total), zap.Any("lastid", lastid)) } lastid = util.IntAll(ret["id"]) winnerPool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-winnerPool wg.Done() }() savetmp := map[string]interface{}{} tmp_id := tmp["name_id"] savetmp["_id"] = tmp_id savetmp["id"] = tmp_id savetmp["name"] = tmp["name"] savetmp["winner_name"] = tmp["name"] if tmp["seo_id"] != nil { savetmp["seo_id"] = tmp["seo_id"] } createtime := tmp["createtime"].(time.Time) savetmp["l_createtime"] = createtime.Unix() savetmp["pici"] = createtime.Unix() if tmp["updatetime"] != nil { updatetime := tmp["updatetime"].(time.Time) savetmp["pici"] = updatetime.Unix() } if province := util.ObjToString(tmp["area"]); province != "" { savetmp["province"] = province } if city := util.ObjToString(tmp["city"]); city != "" { savetmp["city"] = city } winerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) arrEs = []map[string]interface{}{} } winerEsLock.Unlock() }(ret) ret = make(map[string]interface{}) } rows.Close() wg.Wait() if err := rows.Err(); err != nil { log.Info("winnerEsAll", zap.Any("err", err)) } } if len(arrEs) > 0 { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) arrEs = []map[string]interface{}{} } log.Info("winnerEsAll", zap.Int("结束,total:", total)) } func winnerEsTaskOnceOld() { defer util.Catch() arrEs := []map[string]interface{}{} winerEsLock := &sync.Mutex{} pool := make(chan bool, 3) wg := &sync.WaitGroup{} now := time.Now() preTime := time.Date(now.Year(), now.Month(), now.Day()-1, now.Hour(), 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) task_sid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(preTime)) task_eid := mongodb.BsonIdToSId(primitive.NewObjectIDFromTimestamp(curTime)) log.Info("winner 区间id", zap.String("sid", task_sid), zap.String("eid", task_eid)) //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": mongodb.StringTOBsonId(task_sid), "$lt": mongodb.StringTOBsonId(task_eid), }, } //mongo sess := MgoQ.GetMgoConn() defer MgoQ.DestoryMongoConn(sess) it_1 := sess.DB(MgoQ.DbName).C("winner_enterprise").Find(&q).Sort("_id").Iter() num_1 := 0 for tmp := make(map[string]interface{}); it_1.Next(&tmp); num_1++ { if num_1%2000 == 0 && num_1 > 0 { log.Info("winnerEsTaskOnce current", zap.Int("count", num_1)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() savetmp := map[string]interface{}{} tmp_id := mongodb.BsonIdToSId(tmp["_id"]) savetmp["_id"] = tmp_id savetmp["name"] = tmp["company_name"] savetmp["winner_name"] = tmp["company_name"] savetmp["pici"] = tmp["updatetime"] if province := util.ObjToString(tmp["province"]); province != "" { savetmp["province"] = province } if city := util.ObjToString(tmp["city"]); city != "" { savetmp["city"] = city } if text := util.ObjToString(tmp["tag_business"]); text != "" { savetmp["tag_business"] = text } winerEsLock.Lock() arrEs = append(arrEs, savetmp) if len(arrEs) >= EsBulkSize { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) arrEs = []map[string]interface{}{} } winerEsLock.Unlock() }(tmp) tmp = make(map[string]interface{}) } wg.Wait() winerEsLock.Lock() if len(arrEs) > 0 { tmps := arrEs Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps) arrEs = []map[string]interface{}{} } winerEsLock.Unlock() log.Info("winner over!", zap.Int("总计", num_1)) }