package main import ( "bytes" "context" "encoding/json" "errors" "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "net/http" "strings" "sync" "time" ) // getBidding2 获取bidding 1.3日无二级分类数据 func getBidding2() { //2024-1-3日数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1704211200, "$lte": 1704297600, }, "subtype": map[string]interface{}{ "$exists": 1, }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //texts := make([]string, 0) query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "subtype": 1, "href": 1, "detail": 1, "channel": 1}).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["extracttype"]) == -1 { continue } id := mongodb.BsonIdToSId(tmp["_id"]) tmp["jyhref"] = GetJyURLByID(id) MgoB.SaveByOriID("wcc_bidding_20240103_subtype_exists", tmp) tmp = make(map[string]interface{}) } log.Println("over") } // callAi 调用大模型 func callAi() { sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //where := map[string]interface{}{ // "subtype_a": map[string]interface{}{ // "$exists": 0, // }, //} query := sess.DB("qfw_data").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter() count := 0 //ch := make(chan bool, 1) //wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("-------- current:", count, tmp["_id"], " ---------") } //ch <- true //wg.Add(1) //go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() id := mongodb.BsonIdToSId(tmp["_id"]) title := util.ObjToString(tmp["title"]) detail := util.ObjToString(tmp["detail"]) data := map[string]interface{}{ "title": title, "detail": detail, } reqData := map[string]interface{}{ "texts": []interface{}{data}, } now := time.Now() res := send(reqData) log.Println(time.Since(now).Seconds(), tmp["_id"]) subtype := res["result"].([]interface{}) result := subtype[0] types := strings.Split(util.ObjToString(result), "-") update := make(map[string]interface{}) if len(types) == 2 { update["toptype_ai"] = types[0] update["subtype_ai"] = types[1] //没有内容 if detail == "" { update["data_type"] = 1 } else { update["data_type"] = 0 } MgoB.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update}) } //}(tmp) tmp = make(map[string]interface{}) } //wg.Wait() log.Println("over") } // getBidding 调用分类大模型 func getBidding() { //2024-1-3日数据 where := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": 1704211200, "$lte": 1704297600, }, "subtype": map[string]interface{}{ "$exists": 0, }, } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) //texts := make([]string, 0) query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "href": 1, "detail": 1}).Iter() count := 0 ch := make(chan bool, 10) wg := &sync.WaitGroup{} for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%1000 == 0 { log.Println("current:", count) } if util.IntAll(tmp["extracttype"]) == -1 { continue } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() id := mongodb.BsonIdToSId(tmp["_id"]) title := util.ObjToString(tmp["title"]) detail := util.ObjToString(tmp["detail"]) tmp["bidding_id"] = id data := map[string]interface{}{ "title": title, "detail": detail, } reqData := map[string]interface{}{ "texts": []interface{}{data}, } res := SendAi(reqData) subtype := res["result"].([]interface{}) result := subtype[0] types := strings.Split(util.ObjToString(result), "-") if len(types) == 2 { tmp["new_toptype"] = types[0] tmp["new_subtype"] = types[1] } tmp["jyhref"] = GetJyURLByID(id) //没有内容 if detail == "" { tmp["data_type"] = 1 } else { tmp["data_type"] = 0 } MgoB.Save("wcc_20240103-2", tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("over") } func send(data map[string]interface{}) (res map[string]interface{}) { url := "http://192.168.3.109:16688" jsonData, err := json.Marshal(data) if err != nil { fmt.Println("JSON marshal error:", err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Println("Request error:", err) return } req.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { fmt.Println("Request error:", err) return } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { fmt.Println("Response decoding error:", err) return } return } // SendAi 调用大模型招标分类 func SendAi(data map[string]interface{}) (res map[string]interface{}) { // 设置 2 秒的超时 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() url := "http://192.168.3.109:16688" jsonData, err := json.Marshal(data) if err != nil { fmt.Println("JSON marshal error:", err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { fmt.Println("Request error:", err) return } req.Header.Set("Content-Type", "application/json") // 将请求与上下文关联 req = req.WithContext(ctx) client := &http.Client{} resp, err := client.Do(req) if err != nil { // 使用 errors.Is 检查错误是否是超时错误 if errors.Is(err, context.DeadlineExceeded) { fmt.Println("Request timed out") return } fmt.Println("Request error:", err) return } defer resp.Body.Close() err = json.NewDecoder(resp.Body).Decode(&res) if err != nil { fmt.Println("Response decoding error:", err) return } return }