main.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. var (
  12. MgoB *mongodb.MongodbSim
  13. MgoQy *mongodb.MongodbSim
  14. MgoP *mongodb.MongodbSim
  15. Es *es.Elastic
  16. updatePool = make(chan []map[string]interface{}, 5000)
  17. updateEsPool = make(chan []map[string]interface{}, 5000)
  18. updateEsSp = make(chan bool, 5) //保存协程
  19. )
  20. func InitMgo() {
  21. //MgoB = &mongodb.MongodbSim{
  22. // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  23. // //MongodbAddr: "127.0.0.1:27083",
  24. // Size: 10,
  25. // DbName: "qfw",
  26. // UserName: "SJZY_RWbid_ES",a
  27. // Password: "SJZY@B4i4D5e6S",
  28. // //Direct: true,
  29. //}
  30. //MgoB.InitPool()
  31. MgoQy = &mongodb.MongodbSim{
  32. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  33. //MongodbAddr: "127.0.0.1:27083",
  34. Size: 10,
  35. DbName: "mixdata",
  36. UserName: "SJZY_RWbid_ES",
  37. Password: "SJZY@B4i4D5e6S",
  38. Direct: true,
  39. }
  40. MgoQy.InitPool()
  41. MgoP = &mongodb.MongodbSim{
  42. //MongodbAddr: "127.0.0.1:27080",
  43. MongodbAddr: "172.17.4.85:27080",
  44. DbName: "qfw",
  45. Size: 10,
  46. //Direct: true,
  47. }
  48. MgoP.InitPool()
  49. // 本地数据库
  50. //MgoB = &mongodb.MongodbSim{
  51. // //MongodbAddr: "172.17.189.140:27080",
  52. // MongodbAddr: "127.0.0.1:27017",
  53. // Size: 10,
  54. // DbName: "wcc",
  55. // //UserName: "SJZY_RWbid_ES",
  56. // //Password: "SJZY@B4i4D5e6S",
  57. // //Direct: true,
  58. //}
  59. //MgoB.InitPool()
  60. // 测试环境
  61. //MgoB = &mongodb.MongodbSim{
  62. // MongodbAddr: "192.168.3.206:27002",
  63. // //MongodbAddr: "127.0.0.1:27017",
  64. // Size: 10,
  65. // DbName: "qfw_data",
  66. // UserName: "root",
  67. // Password: "root",
  68. // //Direct: true,
  69. //}
  70. //MgoB.InitPool()
  71. }
  72. func InitEs() {
  73. Es = &es.Elastic{
  74. //S_esurl: "http://127.0.0.1:19908",
  75. S_esurl: "http://172.17.4.184:19908",
  76. I_size: 5,
  77. Username: "jybid",
  78. Password: "Top2023_JEB01i@31",
  79. }
  80. Es.InitElasticSize()
  81. }
  82. func main() {
  83. exportBidding()
  84. log.Println("数据处理完毕")
  85. return
  86. //InitMgo()
  87. //InitEs()
  88. //
  89. //go updateEsMethod()
  90. //fixQyxy()
  91. //select {}
  92. //getBidding0311()
  93. //log.Println("数据删除完成")
  94. //------------------//
  95. //InitMgo()
  96. //getBidding2()
  97. //callAi()
  98. //getCount()
  99. //updateTop()
  100. //findData()
  101. //getCompanyName()
  102. //log.Println("over")
  103. //select {}
  104. //SpecialData()
  105. //StdData()
  106. // ==============================================
  107. ////1.调用分类大模型
  108. //title := "松江南站大型居住社区C18-26-01号地块动迁安置房(智能化工程)"
  109. //detail := "<br><ul ><li>招标项目编号:</li><li >e3100000151006395007</li></ul><ul ><li>相关标段(包)编号:</li><li >e3100000151006395007001</li></ul><ul ><li>公示标题:</li><li >松江南站大型居住社区C18-26-01号地块动迁安置房(智能化工程)</li></ul><ul ><li>公示内容:</li><li >第一中标候选人:上海浩臣机电科技有限公司,投标价格:1228.1990,得分/投票:合格;第二中标候选人:上海格瑞特机电系统工程有限公司,投标价格:1230.1015,得分/投票:合格;</li></ul><ul ><li>公示发布时间:</li><li >2024-01-03</li></ul><ul ><li>公示发布媒介:</li><li >http://zjw.sh.gov.cn</li></ul><ul ><li>公示源URL:</li><li ><br>https://ciac.zjw.sh.gov.cn/XMJYPTInterWeb/Tender/PrinttoPdf?zbgcid=65693</li></ul><ul ><li>公示类型:</li><li >正常</li></ul><ul ><li>公示开始时间:</li><li >2024-01-03</li></ul><ul ><li>公示结束时间:</li><li >2024-01-05</li></ul>"
  110. //
  111. //data := map[string]interface{}{
  112. // "title": title,
  113. // "detail": detail,
  114. //}
  115. //reqData := map[string]interface{}{
  116. // "texts": []interface{}{data},
  117. //}
  118. //
  119. //url := "http://172.17.162.35:24401"
  120. //now := time.Now()
  121. //res := sendAi(reqData, url)
  122. ////log.Println("res", res)
  123. //log.Println("time seconds", time.Since(now).Seconds())
  124. //log.Println("lens", len(res))
  125. //if len(res) > 0 {
  126. // resa := res["result"]
  127. // log.Println("resa", resa)
  128. // if dataa, ok := resa.([]interface{}); ok {
  129. // log.Println(222)
  130. // da := dataa[0]
  131. // if len(util.ObjToString(da)) > 0 {
  132. // cs := strings.Split(util.ObjToString(da), "-")
  133. // log.Println("toptype", cs[0])
  134. // log.Println("subtype", cs[1])
  135. // }
  136. // }
  137. //}
  138. // ==============================================
  139. }
  140. func dealNavColumn() {
  141. columns := make([]map[string]interface{}, 0)
  142. column := []string{"招标公告", "招标预告", "招标结果", "招标信用", "采购意向", "项目分包", "企业直采",
  143. "政府采购", "拟在建项目", "审批项目", "推荐项目", "业主委托项目", "热门项目", "新兴项目",
  144. "国家级项目", "省级项目"}
  145. for k, v := range column {
  146. column := map[string]interface{}{
  147. "name": v,
  148. "sort": k + 1,
  149. }
  150. columns = append(columns, column)
  151. }
  152. now := time.Now()
  153. where := map[string]interface{}{
  154. "comeintime": map[string]interface{}{
  155. "$gte": now.AddDate(-1, 0, 0).Unix(),
  156. },
  157. }
  158. sess := MgoB.GetMgoConn()
  159. defer MgoB.DestoryMongoConn(sess)
  160. //
  161. query := sess.DB("qfw").C("bidding").Find(&where).Select(map[string]interface{}{
  162. "contenthtml": 0}).Iter()
  163. count := 0
  164. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  165. if count%1000 == 0 {
  166. log.Println("current", count)
  167. }
  168. //针对产权数据,暂时不入es 索引库
  169. if util.IntAll(tmp["infoformat"]) == 3 {
  170. continue
  171. }
  172. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  173. tmp = make(map[string]interface{})
  174. continue
  175. }
  176. if util.IntAll(tmp["extracttype"]) != 1 {
  177. continue
  178. }
  179. title := util.ObjToString(tmp["title"])
  180. if !strings.Contains(title, "省级财政资金") {
  181. continue
  182. } else {
  183. rea := TagBidding(tmp)
  184. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  185. insert := map[string]interface{}{
  186. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  187. }
  188. MgoB.Save("wcc_bidding_id", insert)
  189. log.Println("bidding_id", mongodb.BsonIdToSId(tmp["_id"]))
  190. updatePool <- []map[string]interface{}{
  191. {"_id": tmp["_id"]},
  192. {"$set": bson.M{
  193. "nav_column": reb,
  194. }},
  195. }
  196. }
  197. }
  198. }
  199. // updateMethod 更新MongoDB
  200. func updateMethod() {
  201. updateSp := make(chan bool, 2)
  202. arru := make([][]map[string]interface{}, 200)
  203. indexu := 0
  204. for {
  205. select {
  206. case v := <-updatePool:
  207. arru[indexu] = v
  208. indexu++
  209. if indexu == 200 {
  210. updateSp <- true
  211. go func(arru [][]map[string]interface{}) {
  212. defer func() {
  213. <-updateSp
  214. }()
  215. MgoB.UpdateBulk("bidding", arru...)
  216. }(arru)
  217. arru = make([][]map[string]interface{}, 200)
  218. indexu = 0
  219. }
  220. case <-time.After(1000 * time.Millisecond):
  221. if indexu > 0 {
  222. updateSp <- true
  223. go func(arru [][]map[string]interface{}) {
  224. defer func() {
  225. <-updateSp
  226. }()
  227. MgoB.UpdateBulk("bidding", arru...)
  228. }(arru[:indexu])
  229. arru = make([][]map[string]interface{}, 200)
  230. indexu = 0
  231. }
  232. }
  233. }
  234. }
  235. func hots() {
  236. exists := make(map[string]bool)
  237. res, _ := MgoB.Find("bidding_hots", nil, map[string]interface{}{"createtime": -1}, nil, false, -1, -1)
  238. for _, v := range *res {
  239. biddingID := util.ObjToString(v["bidding_id"])
  240. if !exists[biddingID] {
  241. exists[biddingID] = true
  242. }
  243. }
  244. data := make([]map[string]interface{}, 0)
  245. for _, v := range *res {
  246. biddingID := util.ObjToString(v["bidding_id"])
  247. if exists[biddingID] {
  248. data = append(data, v)
  249. exists[biddingID] = false
  250. }
  251. }
  252. MgoB.SaveBulk("wcc_hots", data...)
  253. }
  254. // updateEsMethod 更新es
  255. func updateEsMethod() {
  256. arru := make([][]map[string]interface{}, 200)
  257. indexu := 0
  258. for {
  259. select {
  260. case v := <-updateEsPool:
  261. arru[indexu] = v
  262. indexu++
  263. if indexu == 200 {
  264. updateEsSp <- true
  265. go func(arru [][]map[string]interface{}) {
  266. defer func() {
  267. <-updateEsSp
  268. }()
  269. Es.UpdateBulk("projectset", arru...)
  270. }(arru)
  271. arru = make([][]map[string]interface{}, 200)
  272. indexu = 0
  273. }
  274. case <-time.After(1000 * time.Millisecond):
  275. if indexu > 0 {
  276. updateEsSp <- true
  277. go func(arru [][]map[string]interface{}) {
  278. defer func() {
  279. <-updateEsSp
  280. }()
  281. Es.UpdateBulk("projectset", arru...)
  282. }(arru[:indexu])
  283. arru = make([][]map[string]interface{}, 200)
  284. indexu = 0
  285. }
  286. }
  287. }
  288. }