123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- "qfw/util"
- "sync"
- "time"
- )
- func main() {
- go updateMethod()
- //go SaveMethod()
- go TaskFun()
- c := make(chan bool, 1)
- <-c
- }
- var coll string
- func main1() {
- flag.StringVar(&coll, "coll", "", "表名")
- flag.Parse()
- if coll == "" {
- flag.PrintDefaults()
- log.Println("参数错误.")
- return
- }
- //go updateMethod()
- go SaveMethod()
- task()
- ch := make(chan bool, 1)
- <-ch
- }
- func task() {
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- finalId := 0
- lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", coll))
- if len(*lastInfo) > 0 {
- finalId = util.IntAll((*lastInfo)[0]["id"])
- }
- util.Debug("finally id---", finalId)
- lastid, count := 0, 0
- for {
- util.Debug("重新查询,lastid---", lastid)
- q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", coll, lastid)
- //q := "SELECT * FROM company_base WHERE company_id='282a0fd3080deffd317b686081df4e66'"
- rows, err := MysqlTool.DB.Query(q)
- if err != nil{
- log.Println(err)
- }
- columns, err := rows.Columns()
- if finalId == lastid {
- util.Debug("----finish----------", count)
- break
- }
- //if lastid >= 186177635 {
- // util.Debug("--finish--lastid--", lastid)
- // break
- //}
- for rows.Next(){
- scanArgs := make([]interface{}, len(columns))
- values := make([]interface{}, len(columns))
- ret := make(map[string]interface{})
- for k := range values {
- scanArgs[k] = &values[k]
- }
- err = rows.Scan(scanArgs...)
- if err != nil {
- log.Println(err)
- break
- }
- for i, col := range values {
- if v, ok := col.([]uint8); ok {
- ret[columns[i]] = string(v)
- } else {
- ret[columns[i]] = col
- }
- }
- lastid = util.IntAll(ret["id"])
- count++
- if count % 20000 == 0 {
- util.Debug("current-------", count, lastid)
- }
- //if strings.Contains(util.ObjToString(ret["company_type"]), "个体") {
- // continue
- //}
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //saveInfo := []map[string]interface{}{
- // {"_id": util.IntAll(tmp["id"])},
- // {"$set": tmp},
- //}
- //updatePool <- saveInfo
- tmp["_id"] = util.IntAll(tmp["id"])
- savePool <- tmp
- }(ret)
- ret = make(map[string]interface{})
- }
- _ = rows.Close()
- wg.Wait()
- }
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(DbSave, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk(DbSave, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveMethod() {
- log.Println("Mgo Save...")
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool1.SaveBulk(coll, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool1.SaveBulk(coll, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|