|
- package main
- import (
- "fmt"
- "log"
- "sync"
- qu "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/mongodb"
- "app.yhyue.com/moapp/jybase/mysql"
- "github.com/gogf/gf/v2/util/gconv"
- )
- var config map[string]interface{}
- var Mgo *mongodb.MongodbSim
- var TidbCall *mysql.Mysql
- var TidbBi *mysql.Mysql
- func InitMongo(mgs map[string]interface{}) {
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: qu.ObjToString(mgs["address"]),
- Size: qu.IntAll(mgs["size"]),
- DbName: qu.ObjToString(mgs["dbName"]),
- ReplSet: qu.ObjToString(mgs["replSet"]),
- UserName: qu.ObjToString(mgs["userName"]),
- Password: qu.ObjToString(mgs["password"]),
- }
- Mgo.InitPool()
- log.Println("初始化 mongodb")
- }
- func InitMysql(mys map[string]interface{}) {
- TidbBi = &mysql.Mysql{
- Address: qu.ObjToString(mys["address"]),
- UserName: qu.ObjToString(mys["userName"]),
- PassWord: qu.ObjToString(mys["passWord"]),
- DBName: qu.ObjToString(mys["dbName"]),
- MaxOpenConns: qu.IntAll(mys["maxOpenConns"]),
- MaxIdleConns: qu.IntAll(mys["maxIdleConns"]),
- }
- TidbBi.Init()
- log.Println("初始化 mysql")
- }
- func InitTidb(mys map[string]interface{}) {
- TidbCall = &mysql.Mysql{
- Address: qu.ObjToString(mys["address"]),
- UserName: qu.ObjToString(mys["userName"]),
- PassWord: qu.ObjToString(mys["passWord"]),
- DBName: qu.ObjToString(mys["dbName"]),
- MaxOpenConns: qu.IntAll(mys["maxOpenConns"]),
- MaxIdleConns: qu.IntAll(mys["maxIdleConns"]),
- }
- TidbCall.Init()
- log.Println("初始化 Tidb")
- }
- func init() {
- qu.ReadConfig(&config)
- //
- mgs, _ := config["mongodb"].(map[string]interface{})
- InitMongo(mgs)
- mys, _ := config["tidb1"].(map[string]interface{})
- InitMysql(mys)
- tbs, _ := config["tidb2"].(map[string]interface{})
- InitTidb(tbs)
- }
- func main() {
- //
- log.Println("电销 开始")
- // do1()
- log.Println("电销 结束")
- //
- log.Println("合力易捷 开始")
- // do2()
- log.Println("合力易捷 结束")
- }
- //电销线索刷库
- func do1() {
- var (
- pool = make(chan bool, 5)
- wait = &sync.WaitGroup{}
- )
- i := 0
- for {
- count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`)
- if count == 0 {
- log.Println("find no data end")
- return
- }
- TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool {
- for _, v := range *l {
- pool <- true
- wait.Add(1)
- i++
- go func(thisData map[string]interface{}) {
- defer func() {
- <-pool
- wait.Done()
- }()
- id := gconv.Int64(thisData["id"])
- query := map[string]interface{}{
- "id": id,
- }
- cluename := gconv.String(thisData["cluename"])
- update := getCompanyType(cluename)
- ok := TidbBi.Update("dwd_f_crm_clue_info", query, update)
- if !ok {
- log.Println("crm clue info update err", query, update)
- }
- }(v)
- }
- if i%5000 == 0 {
- log.Println(fmt.Sprintf("current --- %d ", i))
- }
- return true
- }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`)
- }
- wait.Wait()
- }
- //电销线索刷库 无并发
- func do1_bak() {
- i := 0
- for {
- count := TidbBi.CountBySql(`SELECT count(1) FROM dwd_f_crm_clue_info where company_nature is null`)
- if count == 0 {
- log.Println("find no data end")
- return
- }
- TidbBi.SelectByBath(10, func(l *[]map[string]interface{}) bool {
- for _, v := range *l {
- i++
- id := gconv.Int64(v["id"])
- query := map[string]interface{}{
- "id": id,
- }
- cluename := gconv.String(v["cluename"])
- update := getCompanyType(cluename)
- ok := TidbBi.Update("dwd_f_crm_clue_info", query, update)
- if !ok {
- log.Println("crm clue info update err", query, update)
- }
- }
- if i%5000 == 0 {
- log.Println(fmt.Sprintf("current --- %d ", i))
- }
- return true
- }, `SELECT * FROM dwd_f_crm_clue_info where company_nature is null limit 500`)
- }
- }
- //合力易捷刷库
- func do2() {
- i := 0
- var (
- pool = make(chan bool, 5)
- wait = &sync.WaitGroup{}
- )
- for {
- count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`)
- if count == 0 {
- log.Println("find no data end")
- return
- }
- TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool {
- for _, v := range *l {
- pool <- true
- wait.Add(1)
- i++
- go func(thisData map[string]interface{}) {
- defer func() {
- <-pool
- wait.Done()
- }()
- id := gconv.String(thisData["unique_id"])
- query := map[string]interface{}{
- "unique_id": id,
- }
- company := gconv.String(thisData["company"])
- update := getCompanyType(company)
- ok := TidbCall.Update("customer", query, update)
- if !ok {
- log.Println("customer info update err", query, update)
- }
- }(v)
- }
- if i%5000 == 0 {
- log.Println(fmt.Sprintf("current --- %d ", i))
- }
- return true
- }, `SELECT * FROM customer where company_nature is null limit 1000`)
- }
- wait.Wait()
- }
- //合力易捷刷库 无并发
- func do2_bak() {
- i := 0
- for {
- count := TidbCall.CountBySql(`SELECT count(1) FROM customer where company_nature is null`)
- if count == 0 {
- log.Println("find no data end")
- return
- }
- TidbCall.SelectByBath(10, func(l *[]map[string]interface{}) bool {
- for _, v := range *l {
- i++
- id := gconv.String(v["unique_id"])
- query := map[string]interface{}{
- "unique_id": id,
- }
- company := gconv.String(v["company"])
- update := getCompanyType(company)
- ok := TidbCall.Update("customer", query, update)
- if !ok {
- log.Println("customer info update err", query, update)
- }
- }
- if i%5000 == 0 {
- log.Println(fmt.Sprintf("current --- %d ", i))
- }
- return true
- }, `SELECT * FROM customer where company_nature is null limit 1000`)
- }
- }
- //公司性质、公司核验
- func getCompanyType(name string) map[string]interface{} {
- data := map[string]interface{}{
- "company_nature": 0,
- "company_verification": 0,
- }
- if c := TidbBi.CountBySql(`select count(1) from group_company_name where company_name=?`, name); c > 0 {
- data["company_nature"] = 1
- }
- if c := Mgo.Count("qyxy_std", map[string]interface{}{"company_name": name}); c > 0 {
- data["company_verification"] = 1
- }
- return data
- }
|