123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- "qfw/util"
- "regexp"
- )
- var (
- Sysconfig map[string]interface{}
- MongoTool *MongodbSim
- Dbname string
- MgoColl1, MgoColl2 string
- ChangeMap []map[string]interface{}
- queryClose chan bool
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- Dbname = Sysconfig["mongodbName"].(string)
- MongoTool = &MongodbSim{
- MongodbAddr: Sysconfig["mongodbServers"].(string),
- Size: util.IntAll(Sysconfig["mongodbPoolSize"]),
- DbName: Dbname,
- }
- MongoTool.InitPool()
- MgoColl1 = Sysconfig["mongoColl1"].(string)
- MgoColl2 = Sysconfig["mongoColl2"].(string)
- queryClose = make(chan bool)
- ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
- initChangeMap()
- }
- func main() {
- count, taskcount := 0, 0
- lastid := ""
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- pool := make(chan bool, 1)
- infoPool := make(chan map[string]interface{}, 2000)
- over := make(chan bool)
- go func() {
- M:
- for {
- select {
- case tmp := <-infoPool:
- if taskcount > 1234 {
- break
- }
- pool <- true
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- }()
- if tmp["changes"] != nil {
- infoList := []interface{}(tmp["changes"].(primitive.A))
- for _, item := range infoList {
- item1 := item.(map[string]interface{})
- setMark(item1)
- }
- //tmp["_id"] = util.StringTOBsonId(util.ObjToString(tmp["company_id"]))
- MongoTool.Save(MgoColl2, tmp)
- taskcount ++
- }
- }(tmp)
- case <-over:
- break M
- }
- }
- }()
- fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1}
- query := sess.DB(Dbname).C(MgoColl1).Find(nil).Select(fields).Iter()
- L:
- for {
- select {
- case <-queryClose:
- log.Println("receive interrupt sign")
- log.Println("close iter..", lastid, query.Cursor.Close(nil))
- break L
- default:
- tmp := make(map[string]interface{})
- if query.Next(&tmp) {
- lastid = tmp["company_id"].(string)
- if count%10000 == 0 {
- util.Debug("current", count, lastid)
- }
- if tmp["changes"] != nil && len([]interface{}(tmp["changes"].(primitive.A))) > 0 {
- infoPool <- tmp
- count++
- }
- } else {
- break L
- }
- }
- }
- }
- func initChangeMap() {
- for _, v := range ChangeMap{
- list := v["change_keyword"].([]interface {})
- var regList []string
- if len(list) > 0 {
- for _, v1 := range list{
- reg := ".*" + util.ObjToString(v1) + ".*"
- regList = append(regList, reg)
- }
- v["change_key_reg"] = regList
- }else {
- v["change_key_reg"] = []string{".*"}
- }
- }
- }
- func setMark(tmp map[string]interface{}) {
- for _, v := range ChangeMap{
- str := util.ObjToString(tmp["change_field"])
- regArr := v["change_key_reg"].([]string)
- for _, v1 := range regArr{
- matched, _ := regexp.MatchString(v1, str)
- if matched {
- tmp["change_name_new"] = v["change_name"]
- return
- }
- }
- }
- }
|