package main import ( "go.mongodb.org/mongo-driver/bson" "jygit.jydev.jianyu360.cn/data_processing/common_utils" es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "strings" "time" ) var ( MgoB *mongodb.MongodbSim MgoQy *mongodb.MongodbSim MgoP *mongodb.MongodbSim Es *es.Elastic updatePool = make(chan []map[string]interface{}, 5000) updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 UdpClient udp.UdpClient biddingDataAddr *net.UDPAddr //bidding 地址 ) func InitMgo() { //MgoB = &mongodb.MongodbSim{ // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", // //MongodbAddr: "127.0.0.1:27083", // Size: 10, // DbName: "qfw", // UserName: "SJZY_RWbid_ES",a // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() MgoQy = &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "mixdata", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoQy.InitPool() MgoP = &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "qfw", Size: 10, //Direct: true, } MgoP.InitPool() // 本地数据库 //MgoB = &mongodb.MongodbSim{ // //MongodbAddr: "172.17.189.140:27080", // MongodbAddr: "127.0.0.1:27017", // Size: 10, // DbName: "wcc", // //UserName: "SJZY_RWbid_ES", // //Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() // 测试环境 //MgoB = &mongodb.MongodbSim{ // MongodbAddr: "192.168.3.206:27002", // //MongodbAddr: "127.0.0.1:27017", // Size: 10, // DbName: "qfw_data", // UserName: "root", // Password: "root", // //Direct: true, //} //MgoB.InitPool() } func InitEs() { Es = &es.Elastic{ //S_esurl: "http://127.0.0.1:19908", S_esurl: "http://172.17.4.184:19908", I_size: 5, Username: "jybid", Password: "Top2023_JEB01i@31", } Es.InitElasticSize() } func main() { updatePici() //更新bidding pici 字段 //getBidding2233() //exportBidding() log.Println("数据处理完毕") return //InitMgo() //InitEs() // //go updateEsMethod() //fixQyxy() //select {} //getBidding0311() //log.Println("数据删除完成") //------------------// //InitMgo() //getBidding2() //callAi() //getCount() //updateTop() //findData() //getCompanyName() //log.Println("over") //select {} //SpecialData() //StdData() // ============================================== ////1.调用分类大模型 //title := "松江南站大型居住社区C18-26-01号地块动迁安置房(智能化工程)" //detail := "
" // //data := map[string]interface{}{ // "title": title, // "detail": detail, //} //reqData := map[string]interface{}{ // "texts": []interface{}{data}, //} // //url := "http://172.17.162.35:24401" //now := time.Now() //res := sendAi(reqData, url) ////log.Println("res", res) //log.Println("time seconds", time.Since(now).Seconds()) //log.Println("lens", len(res)) //if len(res) > 0 { // resa := res["result"] // log.Println("resa", resa) // if dataa, ok := resa.([]interface{}); ok { // log.Println(222) // da := dataa[0] // if len(util.ObjToString(da)) > 0 { // cs := strings.Split(util.ObjToString(da), "-") // log.Println("toptype", cs[0]) // log.Println("subtype", cs[1]) // } // } //} // ============================================== } func dealNavColumn() { columns := make([]map[string]interface{}, 0) column := []string{"招标公告", "招标预告", "招标结果", "招标信用", "采购意向", "项目分包", "企业直采", "政府采购", "拟在建项目", "审批项目", "推荐项目", "业主委托项目", "热门项目", "新兴项目", "国家级项目", "省级项目"} for k, v := range column { column := map[string]interface{}{ "name": v, "sort": k + 1, } columns = append(columns, column) } now := time.Now() where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": now.AddDate(-1, 0, 0).Unix(), }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) // query := sess.DB("qfw").C("bidding").Find(&where).Select(map[string]interface{}{ "contenthtml": 0}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current", count) } //针对产权数据,暂时不入es 索引库 if util.IntAll(tmp["infoformat"]) == 3 { continue } if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["extracttype"]) != 1 { continue } title := util.ObjToString(tmp["title"]) if !strings.Contains(title, "省级财政资金") { continue } else { rea := TagBidding(tmp) reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果 insert := map[string]interface{}{ "bidding_id": mongodb.BsonIdToSId(tmp["_id"]), } MgoB.Save("wcc_bidding_id", insert) log.Println("bidding_id", mongodb.BsonIdToSId(tmp["_id"])) updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": bson.M{ "nav_column": reb, }}, } } } } // updateMethod 更新MongoDB func updateMethod() { updateSp := make(chan bool, 2) arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 200 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoB.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } } func hots() { exists := make(map[string]bool) res, _ := MgoB.Find("bidding_hots", nil, map[string]interface{}{"createtime": -1}, nil, false, -1, -1) for _, v := range *res { biddingID := util.ObjToString(v["bidding_id"]) if !exists[biddingID] { exists[biddingID] = true } } data := make([]map[string]interface{}, 0) for _, v := range *res { biddingID := util.ObjToString(v["bidding_id"]) if exists[biddingID] { data = append(data, v) exists[biddingID] = false } } MgoB.SaveBulk("wcc_hots", data...) } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("projectset", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }