123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package main
- import (
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- Es *elastic.Elastic
- updatePool chan []map[string]interface{}
- updateSp chan bool
- updatePool1 chan []map[string]interface{}
- updateSp1 chan bool
- saveSize int
- savePool chan map[string]interface{}
- saveSp chan bool
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MongoTool.InitPool()
- Es = &elastic.Elastic{
- S_esurl: "http://172.17.162.27:19908", //http://172.17.4.184:19800
- I_size: 10,
- Username: "dataGr_appli",
- Password: "L2ds90Ha4e5#",
- }
- Es.InitElasticSize()
- saveSize = 200
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 5)
- }
- func main() {
- go saveMethod()
- go updateMethod()
- //go findEs()
- go TimeTask()
- ch := make(chan bool, 1)
- <-ch
- }
- func saveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("project_forecast", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("project_forecast", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool1:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp1 <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp1
- }()
- MongoTool.UpSertBulk("project_forecast", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp1 <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp1
- }()
- MongoTool.UpSertBulk("project_forecast", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|