main.go 8.5 KB


  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  8. "log"
  9. "net"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. MgoB *mongodb.MongodbSim
  15. MgoQy *mongodb.MongodbSim
  16. MgoP *mongodb.MongodbSim
  17. Es *es.Elastic
  18. updatePool = make(chan []map[string]interface{}, 5000)
  19. updateEsPool = make(chan []map[string]interface{}, 5000)
  20. updateEsSp = make(chan bool, 5) //保存协程
  21. UdpClient udp.UdpClient
  22. biddingDataAddr *net.UDPAddr //bidding 地址
  23. )
  24. func InitMgo() {
  25. //MgoB = &mongodb.MongodbSim{
  26. // MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  27. // //MongodbAddr: "127.0.0.1:27083",
  28. // Size: 10,
  29. // DbName: "qfw",
  30. // UserName: "SJZY_RWbid_ES",a
  31. // Password: "SJZY@B4i4D5e6S",
  32. // //Direct: true,
  33. //}
  34. //MgoB.InitPool()
  35. MgoQy = &mongodb.MongodbSim{
  36. MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
  37. //MongodbAddr: "127.0.0.1:27083",
  38. Size: 10,
  39. DbName: "mixdata",
  40. UserName: "SJZY_RWbid_ES",
  41. Password: "SJZY@B4i4D5e6S",
  42. Direct: true,
  43. }
  44. MgoQy.InitPool()
  45. MgoP = &mongodb.MongodbSim{
  46. //MongodbAddr: "127.0.0.1:27080",
  47. MongodbAddr: "172.17.4.85:27080",
  48. DbName: "qfw",
  49. Size: 10,
  50. //Direct: true,
  51. }
  52. MgoP.InitPool()
  53. // 本地数据库
  54. //MgoB = &mongodb.MongodbSim{
  55. // //MongodbAddr: "172.17.189.140:27080",
  56. // MongodbAddr: "127.0.0.1:27017",
  57. // Size: 10,
  58. // DbName: "wcc",
  59. // //UserName: "SJZY_RWbid_ES",
  60. // //Password: "SJZY@B4i4D5e6S",
  61. // //Direct: true,
  62. //}
  63. //MgoB.InitPool()
  64. // 测试环境
  65. //MgoB = &mongodb.MongodbSim{
  66. // MongodbAddr: "192.168.3.206:27002",
  67. // //MongodbAddr: "127.0.0.1:27017",
  68. // Size: 10,
  69. // DbName: "qfw_data",
  70. // UserName: "root",
  71. // Password: "root",
  72. // //Direct: true,
  73. //}
  74. //MgoB.InitPool()
  75. }
  76. func InitEs() {
  77. Es = &es.Elastic{
  78. //S_esurl: "http://127.0.0.1:19908",
  79. S_esurl: "http://172.17.4.184:19908",
  80. I_size: 5,
  81. Username: "jybid",
  82. Password: "Top2023_JEB01i@31",
  83. }
  84. Es.InitElasticSize()
  85. }
  86. func main() {
  87. updatePici() //更新bidding pici 字段
  88. //getBidding2233()
  89. //exportBidding()
  90. log.Println("数据处理完毕")
  91. return
  92. //InitMgo()
  93. //InitEs()
  94. //
  95. //go updateEsMethod()
  96. //fixQyxy()
  97. //select {}
  98. //getBidding0311()
  99. //log.Println("数据删除完成")
  100. //------------------//
  101. //InitMgo()
  102. //getBidding2()
  103. //callAi()
  104. //getCount()
  105. //updateTop()
  106. //findData()
  107. //getCompanyName()
  108. //log.Println("over")
  109. //select {}
  110. //SpecialData()
  111. //StdData()
  112. // ==============================================
  113. ////1.调用分类大模型
  114. //title := "松江南站大型居住社区C18-26-01号地块动迁安置房(智能化工程)"
  115. //detail := "<br><ul ><li>招标项目编号:</li><li >e3100000151006395007</li></ul><ul ><li>相关标段(包)编号:</li><li >e3100000151006395007001</li></ul><ul ><li>公示标题:</li><li >松江南站大型居住社区C18-26-01号地块动迁安置房(智能化工程)</li></ul><ul ><li>公示内容:</li><li >第一中标候选人:上海浩臣机电科技有限公司,投标价格:1228.1990,得分/投票:合格;第二中标候选人:上海格瑞特机电系统工程有限公司,投标价格:1230.1015,得分/投票:合格;</li></ul><ul ><li>公示发布时间:</li><li >2024-01-03</li></ul><ul ><li>公示发布媒介:</li><li >http://zjw.sh.gov.cn</li></ul><ul ><li>公示源URL:</li><li ><br>https://ciac.zjw.sh.gov.cn/XMJYPTInterWeb/Tender/PrinttoPdf?zbgcid=65693</li></ul><ul ><li>公示类型:</li><li >正常</li></ul><ul ><li>公示开始时间:</li><li >2024-01-03</li></ul><ul ><li>公示结束时间:</li><li >2024-01-05</li></ul>"
  116. //
  117. //data := map[string]interface{}{
  118. // "title": title,
  119. // "detail": detail,
  120. //}
  121. //reqData := map[string]interface{}{
  122. // "texts": []interface{}{data},
  123. //}
  124. //
  125. //url := "http://172.17.162.35:24401"
  126. //now := time.Now()
  127. //res := sendAi(reqData, url)
  128. ////log.Println("res", res)
  129. //log.Println("time seconds", time.Since(now).Seconds())
  130. //log.Println("lens", len(res))
  131. //if len(res) > 0 {
  132. // resa := res["result"]
  133. // log.Println("resa", resa)
  134. // if dataa, ok := resa.([]interface{}); ok {
  135. // log.Println(222)
  136. // da := dataa[0]
  137. // if len(util.ObjToString(da)) > 0 {
  138. // cs := strings.Split(util.ObjToString(da), "-")
  139. // log.Println("toptype", cs[0])
  140. // log.Println("subtype", cs[1])
  141. // }
  142. // }
  143. //}
  144. // ==============================================
  145. }
  146. func dealNavColumn() {
  147. columns := make([]map[string]interface{}, 0)
  148. column := []string{"招标公告", "招标预告", "招标结果", "招标信用", "采购意向", "项目分包", "企业直采",
  149. "政府采购", "拟在建项目", "审批项目", "推荐项目", "业主委托项目", "热门项目", "新兴项目",
  150. "国家级项目", "省级项目"}
  151. for k, v := range column {
  152. column := map[string]interface{}{
  153. "name": v,
  154. "sort": k + 1,
  155. }
  156. columns = append(columns, column)
  157. }
  158. now := time.Now()
  159. where := map[string]interface{}{
  160. "comeintime": map[string]interface{}{
  161. "$gte": now.AddDate(-1, 0, 0).Unix(),
  162. },
  163. }
  164. sess := MgoB.GetMgoConn()
  165. defer MgoB.DestoryMongoConn(sess)
  166. //
  167. query := sess.DB("qfw").C("bidding").Find(&where).Select(map[string]interface{}{
  168. "contenthtml": 0}).Iter()
  169. count := 0
  170. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  171. if count%1000 == 0 {
  172. log.Println("current", count)
  173. }
  174. //针对产权数据,暂时不入es 索引库
  175. if util.IntAll(tmp["infoformat"]) == 3 {
  176. continue
  177. }
  178. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  179. tmp = make(map[string]interface{})
  180. continue
  181. }
  182. if util.IntAll(tmp["extracttype"]) != 1 {
  183. continue
  184. }
  185. title := util.ObjToString(tmp["title"])
  186. if !strings.Contains(title, "省级财政资金") {
  187. continue
  188. } else {
  189. rea := TagBidding(tmp)
  190. reb := calculateFlag(rea, columns) //拿到十进制数字,标签栏目结果
  191. insert := map[string]interface{}{
  192. "bidding_id": mongodb.BsonIdToSId(tmp["_id"]),
  193. }
  194. MgoB.Save("wcc_bidding_id", insert)
  195. log.Println("bidding_id", mongodb.BsonIdToSId(tmp["_id"]))
  196. updatePool <- []map[string]interface{}{
  197. {"_id": tmp["_id"]},
  198. {"$set": bson.M{
  199. "nav_column": reb,
  200. }},
  201. }
  202. }
  203. }
  204. }
  205. // updateMethod 更新MongoDB
  206. func updateMethod() {
  207. updateSp := make(chan bool, 2)
  208. arru := make([][]map[string]interface{}, 200)
  209. indexu := 0
  210. for {
  211. select {
  212. case v := <-updatePool:
  213. arru[indexu] = v
  214. indexu++
  215. if indexu == 200 {
  216. updateSp <- true
  217. go func(arru [][]map[string]interface{}) {
  218. defer func() {
  219. <-updateSp
  220. }()
  221. MgoB.UpdateBulk("bidding", arru...)
  222. }(arru)
  223. arru = make([][]map[string]interface{}, 200)
  224. indexu = 0
  225. }
  226. case <-time.After(1000 * time.Millisecond):
  227. if indexu > 0 {
  228. updateSp <- true
  229. go func(arru [][]map[string]interface{}) {
  230. defer func() {
  231. <-updateSp
  232. }()
  233. MgoB.UpdateBulk("bidding", arru...)
  234. }(arru[:indexu])
  235. arru = make([][]map[string]interface{}, 200)
  236. indexu = 0
  237. }
  238. }
  239. }
  240. }
  241. func hots() {
  242. exists := make(map[string]bool)
  243. res, _ := MgoB.Find("bidding_hots", nil, map[string]interface{}{"createtime": -1}, nil, false, -1, -1)
  244. for _, v := range *res {
  245. biddingID := util.ObjToString(v["bidding_id"])
  246. if !exists[biddingID] {
  247. exists[biddingID] = true
  248. }
  249. }
  250. data := make([]map[string]interface{}, 0)
  251. for _, v := range *res {
  252. biddingID := util.ObjToString(v["bidding_id"])
  253. if exists[biddingID] {
  254. data = append(data, v)
  255. exists[biddingID] = false
  256. }
  257. }
  258. MgoB.SaveBulk("wcc_hots", data...)
  259. }
  260. // updateEsMethod 更新es
  261. func updateEsMethod() {
  262. arru := make([][]map[string]interface{}, 200)
  263. indexu := 0
  264. for {
  265. select {
  266. case v := <-updateEsPool:
  267. arru[indexu] = v
  268. indexu++
  269. if indexu == 200 {
  270. updateEsSp <- true
  271. go func(arru [][]map[string]interface{}) {
  272. defer func() {
  273. <-updateEsSp
  274. }()
  275. Es.UpdateBulk("projectset", arru...)
  276. }(arru)
  277. arru = make([][]map[string]interface{}, 200)
  278. indexu = 0
  279. }
  280. case <-time.After(1000 * time.Millisecond):
  281. if indexu > 0 {
  282. updateEsSp <- true
  283. go func(arru [][]map[string]interface{}) {
  284. defer func() {
  285. <-updateEsSp
  286. }()
  287. Es.UpdateBulk("projectset", arru...)
  288. }(arru[:indexu])
  289. arru = make([][]map[string]interface{}, 200)
  290. indexu = 0
  291. }
  292. }
  293. }
  294. }