123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package main
- import (
- es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "time"
- )
- var (
- MgoB *mongodb.MongodbSim
- MgoQy *mongodb.MongodbSim
- MgoP *mongodb.MongodbSim
- Es *es.Elastic
- updatePool = make(chan []map[string]interface{}, 5000)
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 5) //保存协程
- )
- func InitMgo() {
- //MgoB = &mongodb.MongodbSim{
- // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- // //MongodbAddr: "127.0.0.1:27083",
- // Size: 10,
- // DbName: "qfw",
- // UserName: "SJZY_RWbid_ES",a
- // Password: "SJZY@B4i4D5e6S",
- // //Direct: true,
- //}
- //MgoB.InitPool()
- MgoQy = &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoQy.InitPool()
- MgoP = &mongodb.MongodbSim{
- //MongodbAddr: "127.0.0.1:27080",
- MongodbAddr: "172.17.4.85:27080",
- DbName: "qfw",
- Size: 10,
- //Direct: true,
- }
- MgoP.InitPool()
- // 本地数据库
- //MgoB = &mongodb.MongodbSim{
- // //MongodbAddr: "172.17.189.140:27080",
- // MongodbAddr: "127.0.0.1:27017",
- // Size: 10,
- // DbName: "wcc",
- // //UserName: "SJZY_RWbid_ES",
- // //Password: "SJZY@B4i4D5e6S",
- // //Direct: true,
- //}
- //MgoB.InitPool()
- // 测试环境
- //MgoB = &mongodb.MongodbSim{
- // MongodbAddr: "192.168.3.206:27002",
- // //MongodbAddr: "127.0.0.1:27017",
- // Size: 10,
- // DbName: "qfw_data",
- // UserName: "root",
- // Password: "root",
- // //Direct: true,
- //}
- //MgoB.InitPool()
- }
- func InitEs() {
- Es = &es.Elastic{
- //S_esurl: "http://127.0.0.1:19908",
- S_esurl: "http://172.17.4.184:19908",
- I_size: 5,
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- Es.InitElasticSize()
- }
- func main() {
- InitMgo()
- InitEs()
- go updateEsMethod()
- fixQyxy()
- log.Println("11111111111")
- select {}
- }
- // updateEsMethod 更新es
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk("projectset", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|