123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package main
- import (
- "data_clear_sync/config"
- "flag"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
- "log"
- "time"
- )
- var (
- V string
- Fields = []string{"area", "city", "projectname", "projectcode", "budget", "s_winner", "bidamount", "buyer"}
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 1)
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- updateRcPool = make(chan []map[string]interface{}, 5000)
- updateRcsp = make(chan bool, 5)
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- InitMgo()
- InitEs()
- redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
- }
- func main() {
- flag.StringVar(&V, "v", "", "version")
- flag.Parse()
- if V != "" {
- go updateFuc()
- go updateRcFuc()
- go updateEsFuc()
- if V == "v1" {
- taskinfoV1()
- } else if V == "v2" {
- taskinfoV2()
- }
- ch := make(chan bool, 1)
- <-ch
- } else {
- flag.PrintDefaults()
- log.Println("参数错误.")
- }
- }
- func updateFuc() {
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mongo.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mongo.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateRcFuc() {
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updateRcPool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateRcsp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateRcsp
- }()
- Mongo.UpSertBulk("bidding_modify_record", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateRcsp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- Mongo.UpSertBulk("bidding_modify_record", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateEsFuc() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|