package main import ( "fmt" "log" "sync" qu "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/mongodb" "app.yhyue.com/moapp/jybase/mysql" "github.com/gogf/gf/v2/util/gconv" ) var config map[string]interface{} var Mgo *mongodb.MongodbSim var TidbCall *mysql.Mysql var TidbBi *mysql.Mysql func InitMongo(mgs map[string]interface{}) { Mgo = &mongodb.MongodbSim{ MongodbAddr: qu.ObjToString(mgs["address"]), Size: qu.IntAll(mgs["size"]), DbName: qu.ObjToString(mgs["dbName"]), ReplSet: qu.ObjToString(mgs["replSet"]), UserName: qu.ObjToString(mgs["userName"]), Password: qu.ObjToString(mgs["password"]), } Mgo.InitPool() log.Println("初始化 mongodb") } func InitMysql(mys map[string]interface{}) { TidbBi = &mysql.Mysql{ Address: qu.ObjToString(mys["address"]), UserName: qu.ObjToString(mys["userName"]), PassWord: qu.ObjToString(mys["passWord"]), DBName: qu.ObjToString(mys["dbName"]), MaxOpenConns: qu.IntAll(mys["maxOpenConns"]), MaxIdleConns: qu.IntAll(mys["maxIdleConns"]), } TidbBi.Init() log.Println("初始化 mysql") } func InitTidb(mys map[string]interface{}) { TidbCall = &mysql.Mysql{ Address: qu.ObjToString(mys["address"]), UserName: qu.ObjToString(mys["userName"]), PassWord: qu.ObjToString(mys["passWord"]), DBName: qu.ObjToString(mys["dbName"]), MaxOpenConns: qu.IntAll(mys["maxOpenConns"]), MaxIdleConns: qu.IntAll(mys["maxIdleConns"]), } TidbCall.Init() log.Println("初始化 Tidb") } func init() { qu.ReadConfig(&config) // mgs, _ := config["mongodb"].(map[string]interface{}) InitMongo(mgs) mys, _ := config["tidb1"].(map[string]interface{}) InitMysql(mys) tbs, _ := config["tidb2"].(map[string]interface{}) InitTidb(tbs) } func main() { // log.Println("电销 开始") // do1() log.Println("电销 结束") // log.Println("合力易捷 开始") // do2() log.Println("合力易捷 结束") } //电销线索刷库 func do1() { var ( pool = make(chan bool, 5) wait = &sync.WaitGroup{} ) i := 0 for { count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`) if count == 0 { log.Println("find no data end") return } TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool { for _, v := range *l { pool <- true wait.Add(1) i++ go func(thisData map[string]interface{}) { defer func() { <-pool wait.Done() }() id := gconv.Int64(thisData["id"]) query := map[string]interface{}{ "id": id, } cluename := gconv.String(thisData["cluename"]) update := getCompanyType(cluename) ok := TidbBi.Update("dwd_f_crm_clue_info", query, update) if !ok { log.Println("crm clue info update err", query, update) } }(v) } if i%5000 == 0 { log.Println(fmt.Sprintf("current --- %d ", i)) } return true }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`) } wait.Wait() } //电销线索刷库 无并发 func do1_bak() { i := 0 for { count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`) if count == 0 { log.Println("find no data end") return } TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool { for _, v := range *l { i++ id := gconv.Int64(v["id"]) query := map[string]interface{}{ "id": id, } cluename := gconv.String(v["cluename"]) update := getCompanyType(cluename) ok := TidbBi.Update("dwd_f_crm_clue_info", query, update) if !ok { log.Println("crm clue info update err", query, update) } } if i%5000 == 0 { log.Println(fmt.Sprintf("current --- %d ", i)) } return true }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`) } } //合力易捷刷库 func do2() { i := 0 var ( pool = make(chan bool, 5) wait = &sync.WaitGroup{} ) for { count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`) if count == 0 { log.Println("find no data end") return } TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool { for _, v := range *l { pool <- true wait.Add(1) i++ go func(thisData map[string]interface{}) { defer func() { <-pool wait.Done() }() id := gconv.String(thisData["unique_id"]) query := map[string]interface{}{ "unique_id": id, } company := gconv.String(thisData["company"]) update := getCompanyType(company) ok := TidbCall.Update("customer", query, update) if !ok { log.Println("customer info update err", query, update) } }(v) } if i%5000 == 0 { log.Println(fmt.Sprintf("current --- %d ", i)) } return true }, `SELECT * FROM customer where company_nature is null limit 1000`) } wait.Wait() } //合力易捷刷库 无并发 func do2_bak() { i := 0 for { count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`) if count == 0 { log.Println("find no data end") return } TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool { for _, v := range *l { i++ id := gconv.String(v["unique_id"]) query := map[string]interface{}{ "unique_id": id, } company := gconv.String(v["company"]) update := getCompanyType(company) ok := TidbCall.Update("customer", query, update) if !ok { log.Println("customer info update err", query, update) } } if i%5000 == 0 { log.Println(fmt.Sprintf("current --- %d ", i)) } return true }, `SELECT * FROM customer where company_nature is null limit 1000`) } } //公司性质、公司核验 func getCompanyType(name string) map[string]interface{} { data := map[string]interface{}{ "company_nature": 0, "company_verification": 0, } if c := TidbBi.CountBySql(`select count(1) from group_company_name where company_name=?`, name); c > 0 { data["company_nature"] = 1 } if c := Mgo.Count("qyxy_std", map[string]interface{}{"company_name": name}); c > 0 { data["company_verification"] = 1 } return data }