123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package main
- import (
- "context"
- "fmt"
- "github.com/go-redis/redis"
- "go.mongodb.org/mongo-driver/bson"
- "mongodb"
- "qfw/util"
- "reflect"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- rdb *redis.Client
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MongoTool.InitPool()
- //MongoTool = &mongodb.MongodbSim{
- // MongodbAddr: "172.17.4.85:27080",
- // Size: 10,
- // DbName: "qfw",
- //}
- //MongoTool.InitPool()
- }
- func initRedis() (err error) {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
- defer cancel()
- rdb = redis.NewClient(&redis.Options{
- Addr: "127.0.0.1:8379",
- PoolSize: 200,
- DB: 2,
- })
- _, err = rdb.Ping(ctx).Result()
- if err != nil {
- fmt.Println("ping redis failed err:", err)
- return err
- }
- return nil
- }
- func main() {
- go updateMethod()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- f := bson.M{"company_name": 1}
- query := sess.DB("mixdata").C("winner_enterprise").Find(nil).Select(f).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%5000 == 0 {
- util.Debug("current ---", count)
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- }(tmp)
- tmp = make(map[string]interface{})
- }
- }
- func main1() {
- go updateMethod()
- err := initRedis()
- if err != nil {
- fmt.Println("init redis failed err :", err)
- return
- }
- ctx := context.Background()
- var cursor uint64
- var keys []string
- n := 0
- for {
- keys, cursor, err = rdb.Scan(ctx, cursor, "*", 500).Result()
- if err != nil {
- fmt.Println("scan keys failed err:", err)
- return
- }
- n += len(keys)
- for _, key := range keys {
- val, err := rdb.Get(ctx, key).Result()
- if err != nil {
- fmt.Println("get key values failed err:", err)
- return
- }
- arr := strings.Split(val, "-")
- if len(arr) > 1 {
- num, _ := strconv.Atoi(strings.ReplaceAll(arr[1], "\"", ""))
- save := []map[string]interface{}{{
- "_id": key,
- },
- {"$set": map[string]interface{}{"employee_num": num}},
- }
- updatePool <- save
- }
- //val, _ = strconv.Unquote(val) // 处理json字符串带转义符号
- //maps := make(map[string]interface{})
- //err1 := json.Unmarshal([]byte(val), &maps)
- //if err1 != nil {
- // util.Debug("-----map解析异常")
- //}
- //taskinfo(ctx, key, maps)
- }
- util.Debug("current---", n, cursor)
- if cursor == 0 {
- util.Debug("over---", n)
- break
- }
- }
- }
- func taskinfo(ctx context.Context, name string, tmp map[string]interface{}) {
- q := bson.M{"company_name": name}
- info, b := MongoTool.FindOneByField("qyxy_std", q, nil)
- if b && len(*info) > 0 {
- if types, ok := (*info)["bid_unittype"].([]interface{}); ok {
- t1 := util.ObjArrToStringArr(types)
- t2 := util.ObjArrToStringArr(tmp["bid_unittype"].([]interface{}))
- t2 = append(t2, t1...)
- tmp["bid_unittype"] = Duplicate(t2)
- }
- tmp["updatetime"] = time.Now().Unix()
- MongoTool.Update("qyxy_std", bson.M{"_id": (*info)["_id"]}, bson.M{"$set": tmp}, true, false)
- } else {
- tmp["company_name"] = name
- MongoTool.Save("qyxy_std_err", tmp)
- rdb.Del(ctx, name)
- }
- }
- func Duplicate(a interface{}) (ret []interface{}) {
- va := reflect.ValueOf(a)
- for i := 0; i < va.Len(); i++ {
- if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) {
- continue
- }
- ret = append(ret, va.Index(i).Interface())
- }
- return ret
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpdateBulk("qyxy_std", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpdateBulk("qyxy_std", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
|