bidd.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // getBidding2 获取bidding 1.3日无二级分类数据
  17. func getBidding2() {
  18. //2024-1-3日数据
  19. where := map[string]interface{}{
  20. "comeintime": map[string]interface{}{
  21. "$gte": 1704211200,
  22. "$lte": 1704297600,
  23. },
  24. "subtype": map[string]interface{}{
  25. "$exists": 1,
  26. },
  27. }
  28. sess := MgoB.GetMgoConn()
  29. defer MgoB.DestoryMongoConn(sess)
  30. //texts := make([]string, 0)
  31. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "subtype": 1, "href": 1, "detail": 1, "channel": 1}).Iter()
  32. count := 0
  33. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  34. if count%1000 == 0 {
  35. log.Println("current:", count)
  36. }
  37. if util.IntAll(tmp["extracttype"]) == -1 {
  38. continue
  39. }
  40. id := mongodb.BsonIdToSId(tmp["_id"])
  41. tmp["jyhref"] = GetJyURLByID(id)
  42. MgoB.SaveByOriID("wcc_bidding_20240103_subtype_exists", tmp)
  43. tmp = make(map[string]interface{})
  44. }
  45. log.Println("over")
  46. }
  47. // callAi 调用大模型
  48. func callAi() {
  49. sess := MgoB.GetMgoConn()
  50. defer MgoB.DestoryMongoConn(sess)
  51. //where := map[string]interface{}{
  52. // "subtype_a": map[string]interface{}{
  53. // "$exists": 0,
  54. // },
  55. //}
  56. query := sess.DB("qfw_data").C("wcc_bidding_20240103_subtype_exists").Find(nil).Select(nil).Iter()
  57. count := 0
  58. //ch := make(chan bool, 1)
  59. //wg := &sync.WaitGroup{}
  60. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  61. if count%1000 == 0 {
  62. log.Println("-------- current:", count, tmp["_id"], " ---------")
  63. }
  64. //ch <- true
  65. //wg.Add(1)
  66. //go func(tmp map[string]interface{}) {
  67. // defer func() {
  68. // <-ch
  69. // wg.Done()
  70. // }()
  71. id := mongodb.BsonIdToSId(tmp["_id"])
  72. title := util.ObjToString(tmp["title"])
  73. detail := util.ObjToString(tmp["detail"])
  74. data := map[string]interface{}{
  75. "title": title,
  76. "detail": detail,
  77. }
  78. reqData := map[string]interface{}{
  79. "texts": []interface{}{data},
  80. }
  81. now := time.Now()
  82. res := send(reqData)
  83. log.Println(time.Since(now).Seconds(), tmp["_id"])
  84. subtype := res["result"].([]interface{})
  85. result := subtype[0]
  86. types := strings.Split(util.ObjToString(result), "-")
  87. update := make(map[string]interface{})
  88. if len(types) == 2 {
  89. update["toptype_ai"] = types[0]
  90. update["subtype_ai"] = types[1]
  91. //没有内容
  92. if detail == "" {
  93. update["data_type"] = 1
  94. } else {
  95. update["data_type"] = 0
  96. }
  97. MgoB.UpdateById("wcc_bidding_20240103_subtype_exists", id, map[string]interface{}{"$set": update})
  98. }
  99. //}(tmp)
  100. tmp = make(map[string]interface{})
  101. }
  102. //wg.Wait()
  103. log.Println("over")
  104. }
  105. // getBidding 调用分类大模型
  106. func getBidding() {
  107. //2024-1-3日数据
  108. where := map[string]interface{}{
  109. "comeintime": map[string]interface{}{
  110. "$gte": 1704211200,
  111. "$lte": 1704297600,
  112. },
  113. "subtype": map[string]interface{}{
  114. "$exists": 0,
  115. },
  116. }
  117. sess := MgoB.GetMgoConn()
  118. defer MgoB.DestoryMongoConn(sess)
  119. //texts := make([]string, 0)
  120. query := sess.DB("qfw").C("bidding").Find(where).Select(map[string]interface{}{"title": 1, "toptype": 1, "href": 1, "detail": 1}).Iter()
  121. count := 0
  122. ch := make(chan bool, 10)
  123. wg := &sync.WaitGroup{}
  124. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  125. if count%1000 == 0 {
  126. log.Println("current:", count)
  127. }
  128. if util.IntAll(tmp["extracttype"]) == -1 {
  129. continue
  130. }
  131. ch <- true
  132. wg.Add(1)
  133. go func(tmp map[string]interface{}) {
  134. defer func() {
  135. <-ch
  136. wg.Done()
  137. }()
  138. id := mongodb.BsonIdToSId(tmp["_id"])
  139. title := util.ObjToString(tmp["title"])
  140. detail := util.ObjToString(tmp["detail"])
  141. tmp["bidding_id"] = id
  142. data := map[string]interface{}{
  143. "title": title,
  144. "detail": detail,
  145. }
  146. reqData := map[string]interface{}{
  147. "texts": []interface{}{data},
  148. }
  149. res := SendAi(reqData)
  150. subtype := res["result"].([]interface{})
  151. result := subtype[0]
  152. types := strings.Split(util.ObjToString(result), "-")
  153. if len(types) == 2 {
  154. tmp["new_toptype"] = types[0]
  155. tmp["new_subtype"] = types[1]
  156. }
  157. tmp["jyhref"] = GetJyURLByID(id)
  158. //没有内容
  159. if detail == "" {
  160. tmp["data_type"] = 1
  161. } else {
  162. tmp["data_type"] = 0
  163. }
  164. MgoB.Save("wcc_20240103-2", tmp)
  165. }(tmp)
  166. tmp = make(map[string]interface{})
  167. }
  168. wg.Wait()
  169. log.Println("over")
  170. }
  171. func send(data map[string]interface{}) (res map[string]interface{}) {
  172. url := "http://192.168.3.109:16688"
  173. jsonData, err := json.Marshal(data)
  174. if err != nil {
  175. fmt.Println("JSON marshal error:", err)
  176. return
  177. }
  178. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  179. if err != nil {
  180. fmt.Println("Request error:", err)
  181. return
  182. }
  183. req.Header.Set("Content-Type", "application/json")
  184. client := &http.Client{}
  185. resp, err := client.Do(req)
  186. if err != nil {
  187. fmt.Println("Request error:", err)
  188. return
  189. }
  190. defer resp.Body.Close()
  191. err = json.NewDecoder(resp.Body).Decode(&res)
  192. if err != nil {
  193. fmt.Println("Response decoding error:", err)
  194. return
  195. }
  196. return
  197. }
  198. // SendAi 调用大模型招标分类
  199. func SendAi(data map[string]interface{}) (res map[string]interface{}) {
  200. // 设置 2 秒的超时
  201. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  202. defer cancel()
  203. url := "http://192.168.3.109:16688"
  204. jsonData, err := json.Marshal(data)
  205. if err != nil {
  206. fmt.Println("JSON marshal error:", err)
  207. return
  208. }
  209. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  210. if err != nil {
  211. fmt.Println("Request error:", err)
  212. return
  213. }
  214. req.Header.Set("Content-Type", "application/json")
  215. // 将请求与上下文关联
  216. req = req.WithContext(ctx)
  217. client := &http.Client{}
  218. resp, err := client.Do(req)
  219. if err != nil {
  220. // 使用 errors.Is 检查错误是否是超时错误
  221. if errors.Is(err, context.DeadlineExceeded) {
  222. fmt.Println("Request timed out")
  223. return
  224. }
  225. fmt.Println("Request error:", err)
  226. return
  227. }
  228. defer resp.Body.Close()
  229. err = json.NewDecoder(resp.Body).Decode(&res)
  230. if err != nil {
  231. fmt.Println("Response decoding error:", err)
  232. return
  233. }
  234. return
  235. }