package main import ( "flag" "fmt" "log" "qfw/util" "sync" "time" ) func main() { go updateMethod() //go SaveMethod() go TaskFun() c := make(chan bool, 1) <-c } var coll string func main1() { flag.StringVar(&coll, "coll", "", "表名") flag.Parse() if coll == "" { flag.PrintDefaults() log.Println("参数错误.") return } //go updateMethod() go SaveMethod() task() ch := make(chan bool, 1) <-ch } func task() { pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} finalId := 0 lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", coll)) if len(*lastInfo) > 0 { finalId = util.IntAll((*lastInfo)[0]["id"]) } util.Debug("finally id---", finalId) lastid, count := 0, 0 for { util.Debug("重新查询,lastid---", lastid) q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid) //q := "SELECT * FROM company_base WHERE company_id='282a0fd3080deffd317b686081df4e66'" rows, err := MysqlTool.DB.Query(q) if err != nil{ log.Println(err) } columns, err := rows.Columns() if finalId == lastid { util.Debug("----finish----------", count) break } //if lastid >= 186177635 { // util.Debug("--finish--lastid--", lastid) // break //} for rows.Next(){ scanArgs := make([]interface{}, len(columns)) values := make([]interface{}, len(columns)) ret := make(map[string]interface{}) for k := range values { scanArgs[k] = &values[k] } err = rows.Scan(scanArgs...) if err != nil { log.Println(err) break } for i, col := range values { if v, ok := col.([]uint8); ok { ret[columns[i]] = string(v) } else { ret[columns[i]] = col } } lastid = util.IntAll(ret["id"]) count++ if count % 20000 == 0 { util.Debug("current-------", count, lastid) } //if strings.Contains(util.ObjToString(ret["company_type"]), "个体") { // continue //} pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() //saveInfo := []map[string]interface{}{ // {"_id": util.IntAll(tmp["id"])}, // {"$set": tmp}, //} //updatePool <- saveInfo tmp["_id"] = util.IntAll(tmp["id"]) savePool <- tmp }(ret) ret = make(map[string]interface{}) } _ = rows.Close() wg.Wait() } } func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(DbSave, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MongoTool.UpSertBulk(DbSave, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveMethod() { log.Println("Mgo Save...") arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePool: arru[indexu] = v indexu++ if indexu == saveSize { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool1.SaveBulk(coll, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveSp }() MongoTool1.SaveBulk(coll, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }