package main import ( "fmt" "github.com/spf13/viper" "go.uber.org/zap" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sync" "time" ) var ( GF GlobalConf MgoS *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址 MgoD *mongodb.MongodbSim //获取联系方式的数据库 MgoB *mongodb.MongodbSim //获取163 bidding 数据 updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) saveSize = 100 NotFind = make(map[string][]interface{}, 0) mutex sync.Mutex ) func InitConfig() (err error) { viper.SetConfigFile("config.toml") // 指定配置文件路径 viper.SetConfigName("config") // 配置文件名称(无扩展名) viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项 viper.AddConfigPath("./") viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置 viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置 err = viper.ReadInConfig() // 查找并读取配置文件 if err != nil { // 处理读取配置文件的错误 return } err = viper.Unmarshal(&GF) return err } func InitLog() { err := log.InitLog( //log.Path("./logs/log.out"), log.Path(""), log.Level("info"), log.Compress(true), log.MaxSize(10), log.MaxBackups(10), log.MaxAge(7), log.Format("json"), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) } } func Init() { InitLog() err := InitConfig() if err != nil { log.Info("Init", zap.Any("InitConfig err", err)) } // MgoS = &mongodb.MongodbSim{ MongodbAddr: GF.MongoS.Host, DbName: GF.MongoS.DB, Size: GF.MongoS.Size, UserName: GF.MongoS.Username, Password: GF.MongoS.Password, } MgoS.InitPool() MgoD = &mongodb.MongodbSim{ MongodbAddr: GF.MongoD.Host, DbName: GF.MongoD.DB, Size: GF.MongoD.Size, UserName: GF.MongoD.Username, Password: GF.MongoD.Password, } if GF.Env.Direct > 0 { MgoD.Direct = true } else { MgoD.Direct = false } MgoD.InitPool() MgoB = &mongodb.MongodbSim{ MongodbAddr: GF.MongoD.Host, DbName: "qfw", Size: GF.MongoD.Size, UserName: GF.MongoD.Username, Password: GF.MongoD.Password, } if GF.Env.Direct > 0 { MgoB.Direct = true } else { MgoB.Direct = false } MgoB.InitPool() } func main() { Init() go updateMethod() fmt.Println("开始---") task() fmt.Println("结束~~~") if len(NotFind["buyer"]) > 0 { fmt.Println("采购单位 - 没有找到的企业数量为:", len(NotFind["buyer"])) fmt.Println("具体的企业信息是", NotFind["buyer"]) } if len(NotFind["winner"]) > 0 { fmt.Println("中标单位 - 没有找到的企业数量为:", len(NotFind["winner"])) fmt.Println("具体的企业信息是", NotFind["winner"]) } if len(NotFind["agency"]) > 0 { fmt.Println("代理机构 - 没有找到的企业数量为:", len(NotFind["agency"])) fmt.Println("具体的企业信息是", NotFind["agency"]) } select {} } func task() { sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) query := sess.DB(GF.MongoS.DB).C(GF.MongoS.Coll).Find(nil).Select(nil).Iter() count := 0 ch := make(chan bool, 20) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%100 == 0 { log.Info("dealBidding", zap.Int("current", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() dealData(tmp) }(tmp) tmp = map[string]interface{}{} } wg.Wait() log.Info("task", zap.Int("over ", count)) } func dealData(da map[string]interface{}) { if name, ok := da[GF.MongoS.Name]; ok { name = dealName(name.(string)) //采购单位联系人 if GF.Env.Buyer > 0 { log.Info("dealData", zap.Any("开始采购单位", name)) var newContacts = make([]map[string]interface{}, 0) buyerWhere := map[string]interface{}{ "buyer_name": name, } info, _ := MgoD.FindOne(GF.Env.BuyerColl, buyerWhere) data := *info if len(data) == 0 { mutex.Lock() NotFind["buyer"] = append(NotFind["buyer"], name) mutex.Unlock() } if len(data) > 0 { if contacts, ok := data["contact"].([]interface{}); ok { for _, v := range contacts { if contact, ok := v.(map[string]interface{}); ok { con := make(map[string]interface{}) con["phone"] = contact["phone"] con["contact_person"] = contact["contact_person"] con["infoid"] = contact["infoid"] bidd, _ := MgoB.FindById("bidding", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) bidding := *bidd if len(bidding) > 0 { con["publishtime"] = bidding["publishtime"] } else { bidold, _ := MgoB.FindById("bidding_back", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) biddingold := *bidold con["publishtime"] = biddingold["publishtime"] } newContacts = append(newContacts, con) } } } log.Info("dealData", zap.Any("结束采购单位", name)) } else { log.Info("dealData", zap.Any("没有找到数据", name)) } if len(newContacts) > 0 { update := map[string]interface{}{ "buyer_contacts": newContacts, } //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": da["_id"]}, {"$set": update}, } } } //中标单位 if GF.Env.Winner > 0 { log.Info("dealData", zap.Any("开始中标单位", name)) var newContacts = make([]map[string]interface{}, 0) buyerWhere := map[string]interface{}{ "company_name": name, } info, _ := MgoD.FindOne(GF.Env.WinnerColl, buyerWhere) data := *info if len(data) == 0 { mutex.Lock() NotFind["winner"] = append(NotFind["winner"], name) mutex.Unlock() } if len(data) > 0 { if contacts, ok := data["contact"].([]interface{}); ok { for _, v := range contacts { if contact, ok := v.(map[string]interface{}); ok { con := make(map[string]interface{}) con["phone"] = contact["phone"] con["contact_person"] = contact["contact_person"] con["infoid"] = contact["infoid"] bidd, _ := MgoB.FindById("bidding", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) bidding := *bidd if len(bidding) > 0 { con["publishtime"] = bidding["publishtime"] } else { bidold, _ := MgoB.FindById("bidding_back", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) biddingold := *bidold con["publishtime"] = biddingold["publishtime"] } newContacts = append(newContacts, con) } } } } log.Info("dealData", zap.Any("结束中标单位", name)) if len(newContacts) > 0 { update := map[string]interface{}{ "winner_contacts": newContacts, } //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": da["_id"]}, {"$set": update}, } } } //代理机构 if GF.Env.Agency > 0 { log.Info("dealData", zap.Any("开始代理机构", name)) var newContacts = make([]map[string]interface{}, 0) buyerWhere := map[string]interface{}{ "agency_name": name, } info, _ := MgoD.FindOne(GF.Env.AgencyColl, buyerWhere) data := *info if len(data) == 0 { mutex.Lock() NotFind["agency"] = append(NotFind["agency"], name) mutex.Unlock() } if len(data) > 0 { if contacts, ok := data["contact"].([]interface{}); ok { for _, v := range contacts { if contact, ok := v.(map[string]interface{}); ok { con := make(map[string]interface{}) con["phone"] = contact["phone"] con["contact_person"] = contact["contact_person"] con["infoid"] = contact["infoid"] bidd, _ := MgoB.FindById("bidding", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) bidding := *bidd if len(bidding) > 0 { con["publishtime"] = bidding["publishtime"] } else { bidold, _ := MgoB.FindById("bidding_back", contact["infoid"].(string), map[string]interface{}{"publishtime": 1}) biddingold := *bidold con["publishtime"] = biddingold["publishtime"] } newContacts = append(newContacts, con) } } } } log.Info("dealData", zap.Any("结束代理机构", name)) if len(newContacts) > 0 { update := map[string]interface{}{ "agency_contacts": newContacts, } //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": da["_id"]}, {"$set": update}, } } } } } //updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoS.UpdateBulk(GF.MongoS.Coll, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoS.UpdateBulk(GF.MongoS.Coll, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } }