123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package main
- import (
- "mongodb"
- "qfw/util"
- "strings"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{}
- Mgo, Mgo1 *mongodb.MongodbSim
- Dbname, Dbcoll string
- collSave, qyxyColl string
- savePool chan map[string]interface{}
- saveSp chan bool
- updatePool chan []map[string]interface{}
- updateSp chan bool
- filePath string
- tagArr []string
- operators []string
- TagMatchRule = map[string][]TagMatching{}
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- collSave = util.ObjToString(Sysconfig["dbcoll"])
- qyxyColl = util.ObjToString(Sysconfig["qyxyColl"])
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: Sysconfig["mgodb"].(string),
- Size: util.IntAllDef(Sysconfig["dbsize"], 5),
- DbName: Sysconfig["dbname"].(string),
- //UserName: Sysconfig["uname"].(string),
- //Password: Sysconfig["upwd"].(string),
- }
- Mgo.InitPool()
- checkDb, _ := Sysconfig["checkDb"].(map[string]interface{})
- Dbname = checkDb["dbname"].(string)
- Dbcoll = checkDb["dbcoll"].(string)
- Mgo1 = &mongodb.MongodbSim{
- MongodbAddr: checkDb["addr"].(string),
- Size: util.IntAll(checkDb["dbsize"]),
- DbName: checkDb["dbname"].(string),
- }
- Mgo1.InitPool()
- filePath = util.ObjToString(Sysconfig["tagFile"])
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 5)
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- operators = strings.Split(util.ObjToString(Sysconfig["operators"]), ",")
- initExcel(filePath)
- util.Debug(len(TagMatchRule))
- }
- func main() {
- //go saveMethod()
- go updateMethod()
- sess := Mgo1.GetMgoConn()
- defer Mgo1.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- //q := map[string]interface{}{"id": "61547b9f1a75b8f4469b7f90"}
- query := sess.DB(Dbname).C(Dbcoll).Find(nil).Select(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
- if count%500 == 0 {
- util.Debug("current ---", count)
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- taskinfo(tmp)
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- util.Debug("over ---", count)
- c := make(chan bool, 1)
- <-c
- }
- func saveMethod() {
- arru := make([]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- Mgo.SaveBulk(collSave, arru...)
- }(arru)
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- Mgo.SaveBulk(collSave, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpSertBulk(collSave, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mgo.UpSertBulk(collSave, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|