package main import ( "mongodb" "qfw/util" "strings" "sync" "time" ) var ( Sysconfig map[string]interface{} Mgo, Mgo1 *mongodb.MongodbSim Dbname, Dbcoll string collSave, qyxyColl string savePool chan map[string]interface{} saveSp chan bool updatePool chan []map[string]interface{} updateSp chan bool filePath string tagArr []string operators []string TagMatchRule = map[string][]TagMatching{} ) func init() { util.ReadConfig(&Sysconfig) collSave = util.ObjToString(Sysconfig["dbcoll"]) qyxyColl = util.ObjToString(Sysconfig["qyxyColl"]) Mgo = &mongodb.MongodbSim{ MongodbAddr: Sysconfig["mgodb"].(string), Size: util.IntAllDef(Sysconfig["dbsize"], 5), DbName: Sysconfig["dbname"].(string), //UserName: Sysconfig["uname"].(string), //Password: Sysconfig["upwd"].(string), } Mgo.InitPool() checkDb, _ := Sysconfig["checkDb"].(map[string]interface{}) Dbname = checkDb["dbname"].(string) Dbcoll = checkDb["dbcoll"].(string) Mgo1 = &mongodb.MongodbSim{ MongodbAddr: checkDb["addr"].(string), Size: util.IntAll(checkDb["dbsize"]), DbName: checkDb["dbname"].(string), } Mgo1.InitPool() filePath = util.ObjToString(Sysconfig["tagFile"]) savePool = make(chan map[string]interface{}, 5000) saveSp = make(chan bool, 5) updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) operators = strings.Split(util.ObjToString(Sysconfig["operators"]), ",") initExcel(filePath) util.Debug(len(TagMatchRule)) } func main() { //go saveMethod() go updateMethod() sess := Mgo1.GetMgoConn() defer Mgo1.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} //q := map[string]interface{}{"id": "61547b9f1a75b8f4469b7f90"} query := sess.DB(Dbname).C(Dbcoll).Find(nil).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(&tmp); count++ { if count%500 == 0 { util.Debug("current ---", count) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() taskinfo(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() util.Debug("over ---", count) c := make(chan bool, 1) <-c } func saveMethod() { arru := make([]map[string]interface{}, 200) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == 200 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() Mgo.SaveBulk(collSave, arru...) }(arru) arru = make([]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() Mgo.SaveBulk(collSave, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, 200) indexu = 0 } } } } func updateMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpSertBulk(collSave, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpSertBulk(collSave, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }