package main import ( "app.yhyue.com/data_processing/common_utils/redis" "data_sync/config" "flag" "log" "time" ) var ( V string Fields = []string{"area", "city", "projectname", "projectcode", "budget", "s_winner", "bidamount", "buyer"} updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 1) updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) updateRcPool = make(chan []map[string]interface{}, 5000) updateRcsp = make(chan bool, 5) ) func init() { config.Init("./common.toml") InitLog() InitMgo() InitEs() redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4) } func main() { flag.StringVar(&V, "v", "", "version") flag.Parse() if V != "" { go updateFuc() go updateRcFuc() go updateEsFuc() if V == "v1" { taskinfoV1() } else if V == "v2" { taskinfoV2() } ch := make(chan bool, 1) <-ch } else { flag.PrintDefaults() log.Println("参数错误.") } } func updateFuc() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 500 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mongo.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mongo.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func updateRcFuc() { arru := make([][]map[string]interface{}, 500) indexu := 0 for { select { case v := <-updateRcPool: arru[indexu] = v indexu++ if indexu == 500 { updateRcsp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateRcsp }() Mongo.UpSertBulk("bidding_modify_record", arru...) }(arru) arru = make([][]map[string]interface{}, 500) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateRcsp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mongo.UpSertBulk("bidding_modify_record", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 500) indexu = 0 } } } } func updateEsFuc() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding_v1", "bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding_v1", "bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }