biddingindex.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. qutil "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "gopkg.in/mgo.v2/bson"
  14. )
  15. //对字段处理 bidamount budget
  16. //招标数据表和抽取表一一对应开始更新
  17. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  18. defer qutil.Catch()
  19. q, _ := mapInfo["query"].(map[string]interface{})
  20. bkey, _ := mapInfo["bkey"].(string)
  21. if q == nil {
  22. q = map[string]interface{}{
  23. "_id": bson.M{
  24. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  25. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  26. },
  27. }
  28. }
  29. //连接信息
  30. c, _ := bidding["collect"].(string)
  31. extractc, _ := bidding["extractcollect"].(string)
  32. db, _ := bidding["db"].(string)
  33. extractdb, _ := bidding["extractdb"].(string)
  34. index, _ := bidding["index"].(string)
  35. itype, _ := bidding["type"].(string)
  36. //extract库
  37. extractsession := extractmgo.GetMgoConn(86400)
  38. defer extractmgo.DestoryMongoConn(extractsession)
  39. extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
  40. eMap := map[string]map[string]interface{}{}
  41. for tmp := make(map[string]interface{}); extractquery.Next(tmp); {
  42. tid := qutil.BsonIdToSId(tmp["_id"])
  43. eMap[tid] = tmp
  44. tmp = make(map[string]interface{})
  45. }
  46. //bidding库
  47. session := mgo.GetMgoConn(86400)
  48. count, _ := session.DB(db).C(c).Find(&q).Count()
  49. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  50. n1, n2 := 0, 0
  51. if count < 200000 {
  52. res := make([]map[string]interface{}, 1)
  53. session.DB(db).C(c).Find(q).Select(bson.M{
  54. "projectinfo.attachment": 0,
  55. "contenthtml": 0,
  56. }).All(&res)
  57. mgo.DestoryMongoConn(session)
  58. if len(res) != count {
  59. log.Println("查询结果不一致", "count:", count, "res:", len(res))
  60. time.Sleep(20 * time.Second)
  61. toadd := &net.UDPAddr{
  62. IP: net.ParseIP("127.0.0.1"),
  63. Port: qutil.IntAll(Sysconfig["udpport"]),
  64. }
  65. udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
  66. } else {
  67. n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey)
  68. if (n1 + n2) != count {
  69. log.Println("任务错误,结果不一致")
  70. }
  71. }
  72. } else {
  73. log.Println("数据量太大,放弃!", count)
  74. mgo.DestoryMongoConn(session)
  75. }
  76. log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
  77. }
  78. func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
  79. n1, n2 := 0, 0
  80. //线程池
  81. UpdatesLock := sync.Mutex{}
  82. fields := strings.Split(bidding["fields"].(string), ",")
  83. //更新数组
  84. arr := [][]map[string]interface{}{}
  85. arrEs := []map[string]interface{}{}
  86. //对比两张表数据,减少查询次数
  87. var compare bson.M
  88. log.Println("开始迭代..")
  89. for n, tmp := range infos {
  90. n1++
  91. if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
  92. tmp = make(map[string]interface{})
  93. continue
  94. }
  95. update := map[string]interface{}{} //要更新的mongo数据
  96. //对比方法----------------
  97. tid := qutil.BsonIdToSId(tmp["_id"])
  98. if eMap[tid] != nil {
  99. compare = eMap[tid]
  100. delete(eMap, tid)
  101. //更新bidding表,生成索引
  102. for _, k := range fields {
  103. v1 := compare[k] //extract
  104. v2 := tmp[k] //bidding
  105. if v2 == nil && v1 != nil {
  106. update[k] = v1
  107. } else if v2 != nil && v1 != nil {
  108. //update[k+"_b"] = v2
  109. update[k] = v1
  110. } else if v2 != nil && v1 == nil {
  111. //update[k+"_b"] = v2
  112. }
  113. }
  114. if qutil.IntAll(compare["repeat"]) == 1 {
  115. update["extracttype"] = -1
  116. } else {
  117. update["extracttype"] = 1
  118. }
  119. } else {
  120. compare = nil
  121. }
  122. //下面可以多线程跑的--->
  123. //处理分类
  124. if compare != nil { //extract
  125. subscopeclass, _ := compare["subscopeclass"].([]interface{})
  126. if subscopeclass != nil {
  127. //str := ","
  128. m1 := map[string]bool{}
  129. newclass := []string{}
  130. for _, sc := range subscopeclass {
  131. sclass, _ := sc.(string)
  132. if !m1[sclass] {
  133. m1[sclass] = true
  134. //str += sclass + ","
  135. newclass = append(newclass, sclass)
  136. }
  137. }
  138. update["s_subscopeclass"] = strings.Join(newclass, ",")
  139. update["subscopeclass"] = newclass
  140. }
  141. //处理中标企业
  142. // winner, _ := compare["winner"].(string)
  143. // m1 := map[string]bool{}
  144. // if winner != "" {
  145. // m1[winner] = true
  146. // }
  147. // package1 := compare["package"]
  148. // if package1 != nil {
  149. // packageM, _ := package1.(map[string]interface{})
  150. // for _, p := range packageM {
  151. // pm, _ := p.(map[string]interface{})
  152. // pw, _ := pm["winner"].(string)
  153. // if pw != "" {
  154. // m1[pw] = true
  155. // }
  156. // }
  157. // }
  158. compare = nil
  159. // if len(m1) > 0 {
  160. // //str := ","
  161. // winnerarr := []string{}
  162. // for k, _ := range m1 {
  163. // //str += k + ","
  164. // winnerarr = append(winnerarr, k)
  165. // }
  166. // update["s_winner"] = strings.Join(winnerarr, ",")
  167. // }
  168. }
  169. //------------------对比结束
  170. //处理key descript
  171. if bkey == "" {
  172. DealInfo(&tmp, &update)
  173. }
  174. //同时保存到elastic
  175. for tk, tv := range update {
  176. tmp[tk] = tv
  177. }
  178. if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
  179. if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
  180. delete(tmp, "supervisorrate")
  181. }
  182. }
  183. //对projectscope字段的索引处理
  184. ps, _ := tmp["projectscope"].(string)
  185. // if ps == "" {
  186. // tmp["projectscope"] = "" //= tmp["detail"]
  187. // }
  188. if len(ps) > ESLEN {
  189. tmp["projectscope"] = string(([]rune(ps))[:4000])
  190. }
  191. // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  192. // tmp["budget"] = nil
  193. // } else if sbd, ok := tmp["budget"].(string); ok {
  194. // tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  195. // }
  196. // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  197. // tmp["bidamount"] = nil
  198. // } else if sbd, ok := tmp["bidamount"].(string); ok {
  199. // tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  200. // }
  201. UpdatesLock.Lock()
  202. // for k1, _ := range tmp {
  203. // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
  204. // delete(tmp, k1)
  205. // }
  206. // }
  207. go IS.Add("bidding")
  208. if qutil.IntAll(update["extracttype"]) != -1 {
  209. newTmp := map[string]interface{}{} //最终生索引的数据
  210. for _, v := range biddingIndexFields { //索引字段
  211. if tmp[v] != nil {
  212. if "projectinfo" == v {
  213. mp, _ := tmp[v].(map[string]interface{})
  214. if mp != nil {
  215. newmap := map[string]interface{}{}
  216. for _, v1 := range projectinfoFields {
  217. if mp[v1] != nil {
  218. newmap[v1] = mp[v1]
  219. }
  220. }
  221. newTmp[v] = newmap
  222. // attachments := mp["attachments"]
  223. // con := ""
  224. // if attachments != nil {
  225. // am, _ := attachments.(map[string]interface{})
  226. // if am != nil {
  227. // for _, v1 := range am {
  228. // vm, _ := v1.(map[string]interface{})
  229. // if vm != nil {
  230. // c, _ := vm["content"].(string)
  231. // con += c
  232. // }
  233. // }
  234. // }
  235. // }
  236. // con = FilterDetailSpace(con)
  237. // if con != "" {
  238. // newTmp["attachments"] = con
  239. // }
  240. }
  241. } else {
  242. if v == "detail" {
  243. detail, _ := tmp[v].(string)
  244. newTmp[v] = FilterDetail(detail)
  245. } else {
  246. newTmp[v] = tmp[v]
  247. }
  248. }
  249. }
  250. }
  251. arrEs = append(arrEs, newTmp)
  252. }
  253. if len(update) > 0 {
  254. arr = append(arr, []map[string]interface{}{
  255. map[string]interface{}{
  256. "_id": tmp["_id"],
  257. },
  258. map[string]interface{}{
  259. "$set": update,
  260. },
  261. })
  262. }
  263. if len(arr) >= BulkSize-1 {
  264. mgo.UpdateBulkAll(db, c, arr...)
  265. arr = [][]map[string]interface{}{}
  266. }
  267. if len(arrEs) >= BulkSize-1 {
  268. tmps := arrEs
  269. elastic.BulkSave(index, itype, &tmps, true)
  270. if len(multiIndex) == 2 {
  271. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  272. }
  273. arrEs = []map[string]interface{}{}
  274. }
  275. UpdatesLock.Unlock()
  276. if n%100 == 0 {
  277. log.Println("current:", n)
  278. }
  279. tmp = make(map[string]interface{})
  280. }
  281. UpdatesLock.Lock()
  282. if len(arr) > 0 {
  283. mgo.UpdateBulkAll(db, c, arr...)
  284. }
  285. if len(arrEs) > 0 {
  286. tmps := arrEs
  287. elastic.BulkSave(index, itype, &tmps, true)
  288. if len(multiIndex) == 2 {
  289. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  290. }
  291. }
  292. UpdatesLock.Unlock()
  293. return n1, n2
  294. }
  295. var client *mu.Client
  296. var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$")
  297. var reg_space = regexp.MustCompile("(?ism)(<style.*?>.*?</style>)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)")
  298. var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+")
  299. var reg_dh = regexp.MustCompile("[,]+")
  300. var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]")
  301. var reg_no = regexp.MustCompile("^[0-9]*$")
  302. var MSG_SERVER = "123.56.236.148:7070"
  303. var DesLen = 120
  304. func inits() {
  305. ser := qutil.ObjToString(Sysconfig["msg_server"])
  306. if ser != "" {
  307. MSG_SERVER = ser
  308. }
  309. cf := &mu.ClientConfig{
  310. ClientName: "剑鱼抽关键词",
  311. EventHandler: func(p *mu.Packet) {},
  312. MsgServerAddr: MSG_SERVER,
  313. CanHandleEvents: []int{},
  314. OnConnectSuccess: func() {
  315. log.Println("c.")
  316. },
  317. ReadBufferSize: 10,
  318. WriteBufferSize: 10,
  319. }
  320. client, _ = mu.NewClient(cf)
  321. }
  322. //var clientlock = &sync.Mutex{}
  323. var keypool = make(chan bool, 1)
  324. func DealInfo(obj, update *map[string]interface{}) {
  325. defer qutil.Catch()
  326. if (*obj)["keywords"] != nil && (*obj)["description"] != nil {
  327. return
  328. } else {
  329. (*update)["keywords"] = ""
  330. (*update)["description"] = ""
  331. }
  332. title := qutil.ObjToString((*obj)["title"])
  333. var m [][]string
  334. select {
  335. case <-func() <-chan bool {
  336. ch := make(chan bool, 1)
  337. go func(chan bool) {
  338. select {
  339. case keypool <- true:
  340. defer func() {
  341. <-keypool
  342. }()
  343. ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1)
  344. json.Unmarshal(ret, &m)
  345. case <-time.After(10 * time.Millisecond):
  346. }
  347. ch <- true
  348. }(ch)
  349. return ch
  350. }():
  351. case <-time.After(40 * time.Millisecond):
  352. }
  353. arr := []string{}
  354. keyword := []string{}
  355. keywordnew := []string{}
  356. for _, tmp := range m {
  357. if reg.MatchString(tmp[0]) {
  358. arr = append(arr, tmp[0])
  359. } else {
  360. if len(arr) > 0 {
  361. str := strings.Join(arr, "")
  362. keyword = append(keyword, str)
  363. arr = []string{}
  364. }
  365. if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) {
  366. keyword = append(keyword, tmp[0])
  367. }
  368. }
  369. }
  370. for _, v := range keyword {
  371. v = reg_no.ReplaceAllString(v, "")
  372. if len(v) > 0 {
  373. keywordnew = append(keywordnew, v)
  374. }
  375. }
  376. keywords := strings.Join(keywordnew, ",")
  377. (*update)["keywords"] = keywords
  378. content := ""
  379. if (*obj)["detail_bak"] != nil {
  380. content = qutil.ObjToString((*obj)["detail_bak"])
  381. } else {
  382. content = qutil.ObjToString((*obj)["detail"])
  383. }
  384. //内容替换
  385. content = strings.Replace(content, " ", "", -1)
  386. content = reg_space.ReplaceAllString(content, "")
  387. content = reg_row.ReplaceAllString(content, ",")
  388. content = reg_dh.ReplaceAllString(content, ",")
  389. content = reg_newdb.ReplaceAllString(content, "$1")
  390. if strings.HasPrefix(content, ",") {
  391. content = content[1:]
  392. }
  393. //log.Println(content)
  394. tc := []rune(content)
  395. ltc := len(tc)
  396. description := content
  397. if ltc > DesLen {
  398. description = string(tc[:DesLen])
  399. }
  400. (*update)["description"] = description
  401. //保存到数据库
  402. return
  403. }