biddingindex.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "time"
  8. util "utils"
  9. "utils/mfw"
  10. "utils/mongodb"
  11. "utils/udp"
  12. )
  13. //招标数据表和抽取表一一对应开始更新
  14. func (t *TaskInfo) biddingTask(data []byte, mapInfo map[string]interface{}) {
  15. defer util.Catch()
  16. q, _ := mapInfo["query"].(map[string]interface{})
  17. bkey, _ := mapInfo["bkey"].(string)
  18. if q == nil {
  19. q = map[string]interface{}{
  20. "_id": map[string]interface{}{
  21. "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
  22. "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
  23. },
  24. }
  25. }
  26. //extract库
  27. extractConn := extractMgo.GetMgoConn()
  28. defer extractMgo.DestoryMongoConn(extractConn)
  29. extractc, _ := extract["collect"].(string)
  30. extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter()
  31. eMap := map[string]map[string]interface{}{}
  32. extCount, repeatCount := 0, 0
  33. for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
  34. if util.IntAll(tmp["repeat"]) == 1 {
  35. repeatCount++
  36. }
  37. tid := mongodb.BsonIdToSId(tmp["_id"])
  38. eMap[tid] = tmp
  39. tmp = make(map[string]interface{})
  40. }
  41. util.Debug("抽取表 重复数据量:", extCount, repeatCount)
  42. //bidding库
  43. biddingConn := biddingMgo.GetMgoConn()
  44. c, _ := bidding["collect"].(string)
  45. count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
  46. util.Debug("查询语句:", q, "同步总数:", count)
  47. n1, n2 := 0, 0
  48. if count < 200000 {
  49. var res []map[string]interface{}
  50. result := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(map[string]interface{}{
  51. "contenthtml": 0,
  52. }).Iter()
  53. for tmp := make(map[string]interface{}); result.Next(tmp); {
  54. res = append(res, tmp)
  55. tmp = make(map[string]interface{})
  56. }
  57. biddingMgo.DestoryMongoConn(biddingConn)
  58. util.Debug("查询结果", "bidding:", count, "抽取:", extCount)
  59. //if int64(len(res)) != count {
  60. // time.Sleep(20 * time.Second)
  61. // toadd := &net.UDPAddr{
  62. // IP: net.ParseIP("127.0.0.1"),
  63. // Port: util.IntAll(Sysconfig["udpport"]),
  64. // }
  65. // udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
  66. //}
  67. n1, n2 = t.doIndex(res, eMap, bkey)
  68. } else {
  69. util.Debug("数据量太大,放弃!", count)
  70. biddingMgo.DestoryMongoConn(biddingConn)
  71. }
  72. util.Debug(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2)
  73. if t.stype == "bidding_history" {
  74. // 历史判重id段结束之后 生全量数据索引
  75. t.stype = "biddingdata"
  76. t.thread = 30
  77. t.biddingDataTask(data, mapInfo)
  78. }
  79. }
  80. func (t *TaskInfo) doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey string) (int, int) {
  81. n1, n2 := 0, 0 //bidding数量,索引数量
  82. //对比两张表数据,减少查询次数
  83. var compare map[string]interface{}
  84. util.Debug("start ...")
  85. for n, tmp := range infos {
  86. n1++
  87. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  88. tmp = make(map[string]interface{})
  89. continue
  90. }
  91. tid := mongodb.BsonIdToSId(tmp["_id"])
  92. update := map[string]interface{}{} //要更新的mongo数据
  93. //对比方法----------------
  94. if eMap[tid] != nil {
  95. compare = eMap[tid]
  96. if t.stype == "bidding" {
  97. // 增量id段 正常数据
  98. if num := util.IntAll(compare["dataging"]); num == 1 { //extract中dataging=1跳过
  99. tmp = make(map[string]interface{})
  100. compare = nil
  101. continue
  102. }
  103. delete(eMap, tid)
  104. }
  105. if t.stype == "bidding_history" {
  106. //增量id段 历史数据
  107. if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
  108. tmp = make(map[string]interface{})
  109. compare = nil
  110. continue
  111. }
  112. delete(eMap, tid)
  113. }
  114. //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新
  115. modifyinfo := make(map[string]bool)
  116. if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil {
  117. for k, _ := range tmpmodifyinfo {
  118. modifyinfo[k] = true
  119. }
  120. }
  121. //更新bidding表,生成索引
  122. for _, k := range biddingMgoFields {
  123. v1 := compare[k] //extract
  124. v2 := tmp[k] //bidding
  125. if v2 == nil && v1 != nil && !modifyinfo[k] {
  126. update[k] = v1
  127. } else if v2 != nil && v1 != nil && !modifyinfo[k] {
  128. //update[k+"_b"] = v2
  129. update[k] = v1
  130. } else if v2 != nil && v1 == nil {
  131. //update[k+"_b"] = v2
  132. if k == "area" || k == "city" || k == "district" {
  133. update[k] = ""
  134. }
  135. }
  136. }
  137. if util.IntAll(compare["repeat"]) == 1 {
  138. update["extracttype"] = -1
  139. } else {
  140. update["extracttype"] = 1
  141. }
  142. } else {
  143. compare = nil
  144. if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
  145. update["dataging"] = 0
  146. }
  147. }
  148. //下面可以多线程跑的--->
  149. //处理分类
  150. if compare != nil { //extract
  151. FieldMethod(compare, update)
  152. compare = nil
  153. } else {
  154. area := util.ObjToString(tmp["area"])
  155. city := util.ObjToString(tmp["city"])
  156. district := util.ObjToString(tmp["district"])
  157. rdata := standardCheckCity(area, city, district)
  158. if len(rdata) > 0 {
  159. for k, v := range rdata {
  160. update[k] = v
  161. }
  162. }
  163. }
  164. //------------------对比结束
  165. //处理key descript
  166. if bkey == "" {
  167. DealInfo(&tmp, &update)
  168. }
  169. //同时保存到elastic
  170. for tk, tv := range update {
  171. tmp[tk] = tv
  172. }
  173. extractMap := make(map[string]interface{})
  174. if tmp["s_winner"] != "" {
  175. cid := FieldFun(tmp)
  176. if len(cid) > 0 {
  177. tmp["entidlist"] = cid
  178. update["entidlist"] = cid
  179. extractMap["entidlist"] = cid
  180. }
  181. }
  182. // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
  183. TypeMethod(tmp, update, extractMap)
  184. if len(extractMap) > 0 {
  185. if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
  186. updateExtractPool <- []map[string]interface{}{
  187. {"_id": tmp["_id"]},
  188. {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
  189. }
  190. } else {
  191. updateExtractPool <- []map[string]interface{}{
  192. {"_id": tmp["_id"]},
  193. {"$set": extractMap},
  194. }
  195. }
  196. }
  197. // 附件有效字段
  198. if i := validFile(tmp); i != 0 {
  199. if i == -1 {
  200. tmp["isValidFile"] = false
  201. update["isValidFile"] = false
  202. } else {
  203. tmp["isValidFile"] = true
  204. update["isValidFile"] = true
  205. }
  206. }
  207. clearMap(tmp)
  208. //go IS.Add("bidding")
  209. if util.IntAll(update["extracttype"]) != -1 {
  210. n2++
  211. newTmp := GetEsField(tmp, update, t.stype)
  212. newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
  213. if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
  214. // 剑鱼信息发布数据 通过udp通知信息发布程序
  215. go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
  216. }
  217. saveEsPool <- newTmp
  218. }
  219. if len(update) > 0 {
  220. delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除
  221. updateBiddingPool <- []map[string]interface{}{{
  222. "_id": tmp["_id"],
  223. },
  224. {"$set": update},
  225. }
  226. }
  227. if n%200 == 0 {
  228. util.Debug("current:", n)
  229. }
  230. tmp = make(map[string]interface{})
  231. }
  232. return n1, n2
  233. }
  234. var client *mfw.Client
  235. var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$")
  236. var reg_space = regexp.MustCompile("(?ism)(<style.*?>.*?</style>)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)")
  237. var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+")
  238. var reg_dh = regexp.MustCompile("[,]+")
  239. var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]")
  240. var reg_no = regexp.MustCompile("^[0-9]*$")
  241. var reg_letter = regexp.MustCompile("[a-z]*")
  242. var MSG_SERVER = "123.56.236.148:7070"
  243. var DesLen = 120
  244. func inits() {
  245. ser := util.ObjToString(Sysconfig["msg_server"])
  246. if ser != "" {
  247. MSG_SERVER = ser
  248. }
  249. cf := &mfw.ClientConfig{
  250. ClientName: "剑鱼抽关键词",
  251. EventHandler: func(p *mfw.Packet) {},
  252. MsgServerAddr: MSG_SERVER,
  253. CanHandleEvents: []int{},
  254. OnConnectSuccess: func() {
  255. util.Debug("剑鱼关键词 client")
  256. },
  257. ReadBufferSize: 10,
  258. WriteBufferSize: 10,
  259. }
  260. client, _ = mfw.NewClient(cf)
  261. }
  262. var keypool = make(chan bool, 1)
  263. func DealInfo(obj, update *map[string]interface{}) {
  264. defer util.Catch()
  265. if (*obj)["keywords"] != nil && (*obj)["description"] != nil {
  266. return
  267. } else {
  268. (*update)["keywords"] = ""
  269. (*update)["description"] = ""
  270. }
  271. title := util.ObjToString((*obj)["title"])
  272. var m [][]string
  273. select {
  274. case <-func() <-chan bool {
  275. ch := make(chan bool, 1)
  276. go func(chan bool) {
  277. select {
  278. case keypool <- true:
  279. defer func() {
  280. <-keypool
  281. }()
  282. ret, _ := client.Call("", mfw.UUID(8), 4010, mfw.SENDTO_TYPE_RAND_RECIVER, title, 1)
  283. json.Unmarshal(ret, &m)
  284. case <-time.After(10 * time.Millisecond):
  285. }
  286. ch <- true
  287. }(ch)
  288. return ch
  289. }():
  290. case <-time.After(40 * time.Millisecond):
  291. }
  292. arr := []string{}
  293. keyword := []string{}
  294. keywordnew := []string{}
  295. for _, tmp := range m {
  296. if reg.MatchString(tmp[0]) {
  297. arr = append(arr, tmp[0])
  298. } else {
  299. if len(arr) > 0 {
  300. str := strings.Join(arr, "")
  301. keyword = append(keyword, str)
  302. arr = []string{}
  303. }
  304. if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) {
  305. keyword = append(keyword, tmp[0])
  306. }
  307. }
  308. }
  309. for _, v := range keyword {
  310. v = reg_no.ReplaceAllString(v, "")
  311. if len(v) > 0 {
  312. keywordnew = append(keywordnew, v)
  313. }
  314. }
  315. keywords := strings.Join(keywordnew, ",")
  316. (*update)["keywords"] = keywords
  317. content := ""
  318. if (*obj)["detail_bak"] != nil {
  319. content = util.ObjToString((*obj)["detail_bak"])
  320. } else {
  321. content = util.ObjToString((*obj)["detail"])
  322. }
  323. //内容替换
  324. content = strings.Replace(content, " ", "", -1)
  325. content = reg_space.ReplaceAllString(content, "")
  326. content = reg_row.ReplaceAllString(content, ",")
  327. content = reg_dh.ReplaceAllString(content, ",")
  328. content = reg_newdb.ReplaceAllString(content, "$1")
  329. if strings.HasPrefix(content, ",") {
  330. content = content[1:]
  331. }
  332. tc := []rune(content)
  333. ltc := len(tc)
  334. description := content
  335. if ltc > DesLen {
  336. description = string(tc[:DesLen])
  337. }
  338. (*update)["description"] = description
  339. //保存到数据库
  340. return
  341. }
  342. // @Description tmp修改索引,update 修改bidding表,extractM修改抽取表
  343. // @Author J 2022/6/10 10:29 AM
  344. func TypeMethod(tmp, update, extractM map[string]interface{}) {
  345. if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok {
  346. if t := util.ObjToString(jyData["type"]); t != "" {
  347. switch t {
  348. //case "采购信息":
  349. case "招标公告":
  350. if util.ObjToString(tmp["toptype"]) != "招标" {
  351. tmp["toptype"] = "招标"
  352. update["toptype"] = "招标"
  353. extractM["toptype"] = "招标"
  354. delete(tmp, "subtype")
  355. delete(update, "subtype")
  356. }
  357. case "采购意向":
  358. if util.ObjToString(tmp["toptype"]) != "采购意向" {
  359. tmp["toptype"] = "采购意向"
  360. tmp["subtype"] = "采购意向"
  361. update["toptype"] = "采购意向"
  362. update["subtype"] = "采购意向"
  363. extractM["toptype"] = "采购意向"
  364. extractM["subtype"] = "采购意向"
  365. }
  366. case "招标预告":
  367. if util.ObjToString(tmp["toptype"]) != "预告" {
  368. tmp["toptype"] = "预告"
  369. update["toptype"] = "预告"
  370. extractM["toptype"] = "预告"
  371. delete(tmp, "subtype")
  372. delete(update, "subtype")
  373. }
  374. case "招标结果":
  375. if util.ObjToString(tmp["toptype"]) != "结果" {
  376. tmp["toptype"] = "结果"
  377. update["toptype"] = "结果"
  378. extractM["toptype"] = "结果"
  379. delete(tmp, "subtype")
  380. delete(update, "subtype")
  381. }
  382. }
  383. }
  384. }
  385. }
  386. // @Description rpc调用信息发布程序接口
  387. // @Author J 2022/4/13 9:13 AM
  388. func UdpMethod(id string) {
  389. mapinfo := map[string]interface{}{
  390. "infoid": id,
  391. "stype": "jyfb_data_over",
  392. }
  393. datas, _ := json.Marshal(mapinfo)
  394. util.Debug(JyUdpAddr, string(datas))
  395. _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr)
  396. }
  397. // @Description id不变,内容变化 重新索引数据
  398. // @Author J 2022/8/10 13:29
  399. func taskinfo(id string) {
  400. tmp, _ := biddingMgo.FindById("bidding", id, nil)
  401. if tmp == nil || len(*tmp) == 0 {
  402. util.Debug(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id))
  403. return
  404. }
  405. extractM, _ := extractMgo.FindById(util.ObjToString(extract["collect"]), id, nil)
  406. if extractM == nil || len(*extractM) == 0 {
  407. extractM, _ = extractMgo.FindById(util.ObjToString(extract["collect1"]), id, nil)
  408. if extractM == nil || len(*extractM) == 0 {
  409. util.Debug(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id))
  410. return
  411. }
  412. }
  413. update := map[string]interface{}{} //要更新的mongo数据
  414. //更新bidding表字段
  415. for _, k := range biddingMgoFields {
  416. v1 := (*extractM)[k] //extract
  417. v2 := (*tmp)[k] //bidding
  418. if v2 == nil && v1 != nil {
  419. update[k] = v1
  420. } else if v2 != nil && v1 != nil {
  421. update[k] = v1
  422. } else if v2 != nil && v1 == nil {
  423. if k == "area" || k == "city" || k == "district" {
  424. update[k] = ""
  425. }
  426. }
  427. }
  428. if util.IntAll((*extractM)["repeat"]) == 1 {
  429. update["extracttype"] = -1
  430. } else {
  431. update["extracttype"] = 1
  432. }
  433. if util.IntAll((*tmp)["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
  434. update["dataging"] = 0
  435. }
  436. //处理分类
  437. FieldMethod(*extractM, update)
  438. //同时保存到elastic
  439. for tk, tv := range update {
  440. (*tmp)[tk] = tv
  441. }
  442. extractMap := make(map[string]interface{})
  443. if util.ObjToString((*tmp)["s_winner"]) != "" {
  444. cid := FieldFun(*tmp)
  445. if len(cid) > 0 {
  446. (*tmp)["entidlist"] = cid
  447. update["entidlist"] = cid
  448. extractMap["entidlist"] = cid
  449. }
  450. updateExtractPool <- []map[string]interface{}{
  451. {"_id": mongodb.StringTOBsonId(id)},
  452. {"$set": extractMap},
  453. }
  454. }
  455. // 附件有效字段
  456. if i := validFile(*tmp); i != 0 {
  457. if i == -1 {
  458. (*tmp)["isValidFile"] = false
  459. update["isValidFile"] = false
  460. } else {
  461. (*tmp)["isValidFile"] = true
  462. update["isValidFile"] = true
  463. }
  464. }
  465. clearMap(*tmp)
  466. if util.IntAll(update["extracttype"]) != -1 {
  467. newTmp := GetEsField(*tmp, update, "")
  468. newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段
  469. saveEsPool <- newTmp
  470. }
  471. if len(update) > 0 {
  472. delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除
  473. updateBiddingPool <- []map[string]interface{}{{
  474. "_id": mongodb.StringTOBsonId(id),
  475. },
  476. {"$set": update},
  477. }
  478. }
  479. }