package main import ( "log" "mongodb" _ "net/http/pprof" "qfw/util" "strings" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mgo *mongodb.MongodbSim //mongodb操作对象 biddingIndexFields []string biddingIndexFieldsMap = map[string]string{} projectinfoFields []string projectinfoFieldsMap = map[string]string{} multiIndex []string purchasinglistFields []string winnerorderlistFields []string purchasinglistFieldsMap = map[string]string{} winnerorderlistFieldsMap = map[string]string{} BulkSize = 400 detailLength = 50000 fileLength = 50000 savesize = 500 Es *Elastic EsBulkSize int saveProjectEsPool chan map[string]interface{} saveProjectSp chan bool bidding, project map[string]interface{} ) var UpdataMgoCache = make(chan []map[string]interface{}, 1000) var SP = make(chan bool, 5) func init() { util.ReadConfig(&Sysconfig) detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000) fileLength = util.IntAllDef(Sysconfig["filelength"], 50000) bidding, _ = Sysconfig["bidding"].(map[string]interface{}) project, _ = Sysconfig["project"].(map[string]interface{}) mconf, _ := Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ //mongodb为binding连接 MongodbAddr: mconf["addr"].(string), Size: util.IntAllDef(mconf["pool"], 5), DbName: mconf["db"].(string), UserName: Sysconfig["uname"].(string), Password: Sysconfig["upwd"].(string), } mgo.InitPool() //初始化es econf := Sysconfig["elastic"].(map[string]interface{}) Es = &Elastic{ S_esurl: util.ObjToString(econf["addr"]), //http://172.17.4.184:19800 I_size: util.IntAll(econf["size"]), Username: util.ObjToString(econf["user"]), Password: util.ObjToString(econf["password"]), } Es.InitElasticSize() // if bidding["indexfields"] != nil { biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{})) } if bidding["projectinfo"] != nil { pf := util.ObjToString(bidding["projectinfo"]) if pf != "" { projectinfoFields = strings.Split(pf, ",") } } if bidding["purchasinglist"] != nil { pcl := util.ObjToString(bidding["purchasinglist"]) if pcl != "" { purchasinglistFields = strings.Split(pcl, ",") } } if bidding["winnerorder"] != nil { winnerorder := util.ObjToString(bidding["winnerorder"]) if winnerorder != "" { winnerorderlistFields = strings.Split(winnerorder, ",") } } // if bidding["indexfieldsmap"] != nil { for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) { biddingIndexFieldsMap[k] = util.ObjToString(v) } log.Println(biddingIndexFieldsMap) } if bidding["projectinfomap"] != nil { for k, v := range bidding["projectinfomap"].(map[string]interface{}) { projectinfoFieldsMap[k] = util.ObjToString(v) } log.Println(projectinfoFieldsMap) } if bidding["purchasinglistmap"] != nil { for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) { purchasinglistFieldsMap[k] = util.ObjToString(v) } log.Println(purchasinglistFieldsMap) } if bidding["winnerordermap"] != nil { for k, v := range bidding["winnerordermap"].(map[string]interface{}) { winnerorderlistFieldsMap[k] = util.ObjToString(v) } log.Println(winnerorderlistFieldsMap) } log.Println(projectinfoFields) log.Println(purchasinglistFields) EsBulkSize = 200 saveProjectEsPool = make(chan map[string]interface{}, 5000) saveProjectSp = make(chan bool, 5) //初始化oss InitOss() } func main() { go SaveProjectEs() //biddingTask() projectTask() ch := make(chan bool, 1) <-ch } func SaveProjectEs() { arru := make([]map[string]interface{}, EsBulkSize) indexu := 0 for { select { case v := <-saveProjectEsPool: arru[indexu] = v indexu++ if indexu == EsBulkSize { saveProjectSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProjectSp }() Es.BulkSave("oprd_projectset_v1", arru) }(arru) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveProjectSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveProjectSp }() Es.BulkSave("oprd_projectset_v1", arru) }(arru[:indexu]) arru = make([]map[string]interface{}, EsBulkSize) indexu = 0 } } } }