123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package main
- import (
- "mongodb"
- es "qfw/util/elastic"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- Es *es.Elastic
- updatePool chan []map[string]interface{}
- updateSp chan bool
- updatePool1 chan []map[string]interface{}
- updateSp1 chan bool
- saveSize int
- savePool chan map[string]interface{}
- saveSp chan bool
- savePool1 chan map[string]interface{}
- saveSp1 chan bool
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", // 172.17.4.187:27082,172.17.145.163:27083
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MongoTool.InitPool()
- Es = &es.Elastic{
- S_esurl: "http://172.17.145.170:9800", //http://172.17.145.170:9800
- I_size: 10,
- }
- Es.InitElasticSize()
- saveSize = 200
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- updatePool1 = make(chan []map[string]interface{}, 5000)
- updateSp1 = make(chan bool, 5)
- savePool = make(chan map[string]interface{}, 5000)
- saveSp = make(chan bool, 5)
- savePool1 = make(chan map[string]interface{}, 5000)
- saveSp1 = make(chan bool, 5)
- }
- func main() {
- go saveMethod()
- go saveMethod1()
- go updateMethod()
- //go updateMethod1()
- //go findEs()
- //go fcResult()
- go TimeTask()
- ch := make(chan bool, 1)
- <-ch
- }
- func saveMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp
- }()
- MongoTool.SaveBulk("project_forecast_yece_tmp", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func saveMethod1() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePool1:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp1
- }()
- MongoTool.SaveBulk("project_forecast", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveSp1
- }()
- MongoTool.SaveBulk("project_forecast", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("project_forecast_yece_tmp", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func updateMethod1() {
- arru := make([][]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-updatePool1:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- updateSp1 <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp1
- }()
- MongoTool.UpSertBulk("project_forecast", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp1 <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp1
- }()
- MongoTool.UpSertBulk("project_forecast", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|