123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package main
- import (
- "log"
- "mongodb"
- _ "net/http/pprof"
- "qfw/util"
- "strings"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mgo *mongodb.MongodbSim //mongodb操作对象
- biddingIndexFields []string
- biddingIndexFieldsMap = map[string]string{}
- projectinfoFields []string
- projectinfoFieldsMap = map[string]string{}
- multiIndex []string
- purchasinglistFields []string
- winnerorderlistFields []string
- purchasinglistFieldsMap = map[string]string{}
- winnerorderlistFieldsMap = map[string]string{}
- BulkSize = 400
- detailLength = 50000
- fileLength = 50000
- savesize = 500
- Es *Elastic
- EsBulkSize int
- saveProjectEsPool chan map[string]interface{}
- saveProjectSp chan bool
- bidding, project map[string]interface{}
- )
- var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
- var SP = make(chan bool, 5)
- func init() {
- util.ReadConfig(&Sysconfig)
- detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
- fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
- bidding, _ = Sysconfig["bidding"].(map[string]interface{})
- project, _ = Sysconfig["project"].(map[string]interface{})
- mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{ //mongodb为binding连接
- MongodbAddr: mconf["addr"].(string),
- Size: util.IntAllDef(mconf["pool"], 5),
- DbName: mconf["db"].(string),
- UserName: Sysconfig["uname"].(string),
- Password: Sysconfig["upwd"].(string),
- }
- mgo.InitPool()
- //初始化es
- econf := Sysconfig["elastic"].(map[string]interface{})
- Es = &Elastic{
- S_esurl: util.ObjToString(econf["addr"]), //http://172.17.4.184:19800
- I_size: util.IntAll(econf["size"]),
- Username: util.ObjToString(econf["user"]),
- Password: util.ObjToString(econf["password"]),
- }
- Es.InitElasticSize()
- //
- if bidding["indexfields"] != nil {
- biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
- }
- if bidding["projectinfo"] != nil {
- pf := util.ObjToString(bidding["projectinfo"])
- if pf != "" {
- projectinfoFields = strings.Split(pf, ",")
- }
- }
- if bidding["purchasinglist"] != nil {
- pcl := util.ObjToString(bidding["purchasinglist"])
- if pcl != "" {
- purchasinglistFields = strings.Split(pcl, ",")
- }
- }
- if bidding["winnerorder"] != nil {
- winnerorder := util.ObjToString(bidding["winnerorder"])
- if winnerorder != "" {
- winnerorderlistFields = strings.Split(winnerorder, ",")
- }
- }
- //
- if bidding["indexfieldsmap"] != nil {
- for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
- biddingIndexFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(biddingIndexFieldsMap)
- }
- if bidding["projectinfomap"] != nil {
- for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
- projectinfoFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(projectinfoFieldsMap)
- }
- if bidding["purchasinglistmap"] != nil {
- for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
- purchasinglistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(purchasinglistFieldsMap)
- }
- if bidding["winnerordermap"] != nil {
- for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
- winnerorderlistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(winnerorderlistFieldsMap)
- }
- log.Println(projectinfoFields)
- log.Println(purchasinglistFields)
- EsBulkSize = 200
- saveProjectEsPool = make(chan map[string]interface{}, 5000)
- saveProjectSp = make(chan bool, 5)
- //初始化oss
- InitOss()
- }
- func main() {
- go SaveProjectEs()
- //biddingTask()
- projectTask()
- ch := make(chan bool, 1)
- <-ch
- }
- func SaveProjectEs() {
- arru := make([]map[string]interface{}, EsBulkSize)
- indexu := 0
- for {
- select {
- case v := <-saveProjectEsPool:
- arru[indexu] = v
- indexu++
- if indexu == EsBulkSize {
- saveProjectSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProjectSp
- }()
- Es.BulkSave("oprd_projectset_v1", arru)
- }(arru)
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveProjectSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProjectSp
- }()
- Es.BulkSave("oprd_projectset_v1", arru)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, EsBulkSize)
- indexu = 0
- }
- }
- }
- }
|