package main import ( "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" "time" ) var ( MgoB *mongodb.MongodbSim updatePool = make(chan []map[string]interface{}, 5000) ) func InitMgo() { MgoB = &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoB.InitPool() } func main() { InitMgo() getBidding() //go updateMethod() //dealNavColumn() log.Println("over") //select {} } func dealNavColumn() { columns := make([]map[string]interface{}, 0) column := []string{"招标公告", "招标预告", "招标结果", "招标信用", "采购意向", "项目分包", "企业直采", "政府采购", "拟在建项目", "审批项目", "推荐项目", "业主委托项目", "热门项目", "新兴项目", "国家级项目", "省级项目"} for k, v := range column { column := map[string]interface{}{ "name": v, "sort": k + 1, } columns = append(columns, column) } now := time.Now() where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": now.AddDate(-1, 0, 0).Unix(), }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) // query := sess.DB("qfw").C("bidding").Find(&where).Select(map[string]interface{}{ "contenthtml": 0}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current", count) } //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { continue } if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["extracttype"]) != 1 { continue } title := util.ObjToString(tmp["title"]) if !strings.Contains(title, "省级财政资金") { continue } else { rea := TagBidding(tmp) reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 insert := map[string]interface{}{ "bidding_id": mongodb.BsonIdToSId(tmp["_id"]), } MgoB.Save("wcc_bidding_id", insert) log.Println("bidding_id", mongodb.BsonIdToSId(tmp["_id"])) updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{ "nav_column": reb, }}, } } } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 2) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } func hots() { exists := make(map[string]bool) res, _ := MgoB.Find("bidding_hots", nil, map[string]interface{}{"createtime": -1}, nil, false, -1, -1) for _, v := range *res { biddingID := util.ObjToString(v["bidding_id"]) if !exists[biddingID] { exists[biddingID] = true } } data := make([]map[string]interface{}, 0) for _, v := range *res { biddingID := util.ObjToString(v["bidding_id"]) if exists[biddingID] { data = append(data, v) exists[biddingID] = false } } MgoB.SaveBulk("wcc_hots", data...) }