package main import ( "buyer_data/config" "database/sql" "fmt" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "strings" "sync" "time" ) func taskInfo1() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} field := bson.M{"buyer_name": 1, "buyerclass": 1, "area": 1, "city": 1, "district": 1} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_enterprise").Find(nil).Select(field).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() save := make(map[string]interface{}) name := util.ObjToString(tmp["buyer_name"]) info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name}) if len(*info) > 0 { save["name"] = name save["name_id"] = util.ObjToString((*info)["_id"]) save["name_id_source"] = 1 save["type"] = util.ObjToString((*info)["company_type_old"]) save["buyerclass"] = util.ObjToString(tmp["buyerclass"]) if area := util.ObjToString(tmp["area"]); area != "" { save["area"] = area if city := util.ObjToString(tmp["city"]); city != "" { save["city"] = city } if district := util.ObjToString(tmp["district"]); district != "" { save["city"] = district } } else { if area = util.ObjToString((*info)["company_area"]); area != "" { save["area"] = area if city := util.ObjToString((*info)["company_city"]); city != "" { save["city"] = city } if district := util.ObjToString((*info)["company_district"]); district != "" { save["city"] = district } } } save["reliability"] = 1 save["legal_person"] = util.ObjToString((*info)["legal_person"]) if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 { save["historyname"] = arr } save["status"] = util.ObjToString((*info)["company_status"]) save["comeintime"] = time.Now().Unix() } else { save["name"] = name save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "") save["name_id_source"] = 4 save["comeintime"] = time.Now().Unix() } savePool <- save }(tmp) tmp = make(map[string]interface{}) } } func taskInfo2() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_err").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() save := make(map[string]interface{}) name := util.ObjToString(tmp["name"]) info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name}) if len(*info) > 0 { save["name"] = name save["name_id"] = util.ObjToString((*info)["_id"]) save["name_id_source"] = 1 save["type"] = util.ObjToString((*info)["company_type_old"]) save["buyerclass"] = util.ObjToString(tmp["buyerclass"]) if area := util.ObjToString(tmp["area"]); area != "" { save["area"] = area if city := util.ObjToString(tmp["city"]); city != "" { save["city"] = city } if district := util.ObjToString(tmp["district"]); district != "" { save["city"] = district } } else { if area = util.ObjToString((*info)["company_area"]); area != "" { save["area"] = area if city := util.ObjToString((*info)["company_city"]); city != "" { save["city"] = city } if district := util.ObjToString((*info)["company_district"]); district != "" { save["city"] = district } } } save["reliability"] = 1 save["legal_person"] = util.ObjToString((*info)["legal_person"]) if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 { save["historyname"] = arr } save["status"] = util.ObjToString((*info)["company_status"]) save["comeintime"] = time.Now().Unix() } else { save["name"] = name save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "") save["name_id_source"] = 4 save["comeintime"] = time.Now().Unix() } savePool <- save }(tmp) tmp = make(map[string]interface{}) } } func hisNameFuc(tmp map[string]interface{}) []string { var nameArr []string if names, ok := tmp["history_names"].([]interface{}); ok && len(names) > 0 { for _, n := range names { nameArr = append(nameArr, util.ObjToString(n)) } } if hisname := util.ObjToString(tmp["history_name"]); hisname != "" { for _, s := range strings.Split(hisname, ",") { if !strings.Contains(strings.Join(nameArr, ","), s) { nameArr = append(nameArr, s) } } } return nameArr } func taskMysql() { pool := make(chan bool, 2) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s where identity_type = 1 ORDER BY id DESC LIMIT 1 ", "dws_f_ent_baseinfo")) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } log.Info("查询最后id---", zap.Int("finally id: ", finalId)) lastid, count := 0, 0 for { log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid)) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d and identity_type = 1 ORDER BY id ASC limit 10000", "dws_f_ent_baseinfo", lastid) var stmtOut *sql.Stmt var tx *sql.Tx var err error if tx == nil { stmtOut, err = MysqlTool.DB.Prepare(q) } else { stmtOut, err = tx.Prepare(q) } rows, err := stmtOut.Query() if err != nil { log.Error("mysql query err ", zap.Error(err)) } columns, err := rows.Columns() if finalId == lastid { log.Info("----finish-----", zap.Int("count: ", count)) break } for rows.Next() { scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Error("mysql scan err ", zap.Error(err)) break } for i, col := range values { if col == nil { ret[columns[i]] = nil } else { switch val := (*scanArgs[i].(*interface{})).(type) { case byte: ret[columns[i]] = val break case []byte: v := string(val) switch v { case "\x00": // 处理数据类型为bit的情况 ret[columns[i]] = 0 case "\x01": // 处理数据类型为bit的情况 ret[columns[i]] = 1 default: ret[columns[i]] = v break } break case time.Time: if val.IsZero() { ret[columns[i]] = nil } else { ret[columns[i]] = val.Format("2006-01-02 15:04:05") } break default: ret[columns[i]] = val } } } lastid = util.IntAll(ret["id"]) count++ if count%2000 == 0 { log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid)) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() name := util.ObjToString(tmp["name"]) save := make(map[string]interface{}) if cid := util.ObjToString(tmp["company_id"]); cid != "" { bfo, _ := MongoTool.FindOne(config.Conf.DB.Mongo.SaveColl, bson.M{"name_id": cid}) if len(*bfo) > 0 { return } info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name}) save["name"] = name save["name_id"] = cid save["name_id_source"] = 1 save["type"] = util.ObjToString((*info)["company_type_old"]) if area := util.ObjToString((*info)["company_area"]); area != "" { save["area"] = area if city := util.ObjToString((*info)["company_city"]); city != "" { save["city"] = city } if district := util.ObjToString((*info)["company_district"]); district != "" { save["city"] = district } } save["reliability"] = 1 save["legal_person"] = util.ObjToString((*info)["legal_person"]) if arr := hisNameFuc(*info); arr != nil && len(arr) > 0 { save["historyname"] = arr } save["status"] = util.ObjToString((*info)["company_status"]) updatePool <- []map[string]interface{}{ {"name_id": cid}, {"$set": save}, } } else { save["name"] = name save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "") save["name_id_source"] = 4 save["comeintime"] = time.Now().Unix() savePool <- save } }(ret) ret = make(map[string]interface{}) } _ = rows.Close() stmtOut.Close() wg.Wait() } log.Info(fmt.Sprintf("over --- %d", count)) } func taskInfo3() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} f := bson.M{"reliability": nil} query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.SaveColl).Find(f).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() save := make(map[string]interface{}) name := util.ObjToString(tmp["buyer_name"]) info, _ := MongoTool.FindOne("qyxy_std", bson.M{"company_name": name}) if len(*info) > 0 { save["name"] = name save["name_id"] = util.ObjToString((*info)["_id"]) save["name_id_source"] = 1 save["type"] = util.ObjToString((*info)["company_type_old"]) save["buyerclass"] = util.ObjToString(tmp["buyerclass"]) if area := util.ObjToString(tmp["area"]); area != "" { save["area"] = area if city := util.ObjToString(tmp["city"]); city != "" { save["city"] = city } if district := util.ObjToString(tmp["district"]); district != "" { save["city"] = district } } else { if area = util.ObjToString((*info)["company_area"]); area != "" { save["area"] = area if city := util.ObjToString((*info)["company_city"]); city != "" { save["city"] = city } if district := util.ObjToString((*info)["company_district"]); district != "" { save["city"] = district } } } save["reliability"] = 1 save["legal_person"] = util.ObjToString((*info)["legal_person"]) if arr := hisNameFuc(tmp); arr != nil && len(arr) > 0 { save["historyname"] = arr } save["status"] = util.ObjToString((*info)["company_status"]) save["comeintime"] = time.Now().Unix() } else { save["name"] = name save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "") save["name_id_source"] = 4 save["comeintime"] = time.Now().Unix() } savePool <- save }(tmp) tmp = make(map[string]interface{}) } } func taskInfo4() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo.Dbname).C("buyer_detail_1019").Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() name := util.ObjToString(tmp["accurate_entity_name"]) info, _ := MongoTool.FindOne(config.Conf.DB.Mongo.SaveColl, bson.M{"name": name}) if len(*info) > 0 { update := make(map[string]interface{}) update["reliability"] = 0 update["legal_person"] = util.ObjToString(tmp["fddbr"]) update["status"] = util.ObjToString((*info)["company_status"]) updatePool <- []map[string]interface{}{ {"_id": (*info)["_id"]}, {"$set": update}, } } else { save := make(map[string]interface{}) save["name"] = name save["name_id"] = strings.ReplaceAll(uuid.New().String(), "-", "") save["name_id_source"] = 4 save["reliability"] = 0 save["legal_person"] = util.ObjToString(tmp["fddbr"]) save["status"] = util.ObjToString((*info)["company_status"]) save["comeintime"] = time.Now().Unix() savePool <- save } }(tmp) tmp = make(map[string]interface{}) } log.Info(fmt.Sprintf("over --- %d", count)) } func taskInfo5() { sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.SaveColl).Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() name := util.ObjToString(tmp["name"]) if ok, _ := redis.Exists(config.Conf.DB.Redis.Code, name); ok { MongoTool.UpdateById(config.Conf.DB.Mongo.SaveColl, tmp["_id"], bson.M{"$set": bson.M{"del": true}}) } else { redis.PutCKV(config.Conf.DB.Redis.Code, name, 1) } }(tmp) tmp = make(map[string]interface{}) } log.Info(fmt.Sprintf("over --- %d", count)) }