123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- package main
- import "time"
- func RunSaveService() {
- go SaveFunc()
- go SaveExpandFunc()
- go SaveDetailFunc()
- go SaveAttrFunc()
- go SaveIntentFunc()
- go SaveBidderFunc()
- go SaveGoodsFunc()
- //go saveErrMethod()
- }
- func SaveFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveBasePool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveBaseSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBaseSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_baseinfo", BaseField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveExpandFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveExpandPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveExpandSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveExpandSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveExpandSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveExpandSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_expand_baseinfo", ExpandField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveDetailFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveDetailPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveDetailSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveDetailSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveDetailSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveDetailSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_detail", DetailField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveAttrFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveAttrPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveAttrSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveAttrSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveAttrSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveAttrSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_file_text", AttrField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveIntentFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveIntentPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveIntentSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveIntentSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveIntentSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveIntentSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_intention_baseinfo", IntentField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveBidderFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveBidderPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveBidderSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBidderSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveBidderSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveBidderSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_package_bidder_baseinfo", BidderField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveGoodsFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveGoodsPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveGoodsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveGoodsSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveGoodsSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveGoodsSp
- }()
- MysqlTool.InsertBulk("dwd_f_bid_package_goods_baseinfo", GoodsField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- /*
- 以下项目相关涉及
- */
- func SaveProFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveProPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveProSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProSp
- }()
- MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveProSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProSp
- }()
- MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveProbFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveProbPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveProbSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProbSp
- }()
- MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveProbSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProbSp
- }()
- MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveProTagFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveProTagPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveProTagSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProTagSp
- }()
- MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveProTagSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveProTagSp
- }()
- MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- func SaveRelationFunc() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveRelationPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveRelationSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRelationSp
- }()
- MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveRelationSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveRelationSp
- }()
- MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- // 字段错误数据
- func saveErrMethod() {
- arru := make([]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-saveErrPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveErrSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveErrSp
- }()
- MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveErrSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveErrSp
- }()
- MongoB.SaveBulk("bidding_mgo_to_tidb_f_err", arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|