biddingindex.go 10 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. qutil "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. "qfw/util/redis"
  14. "gopkg.in/mgo.v2/bson"
  15. es "gopkg.in/olivere/elastic.v1"
  16. )
  17. var BulkSize = 100
  18. var ESLEN = 32766
  19. //对字段处理 bidamount budget
  20. //招标数据表和抽取表一一对应开始更新
  21. var esMap = map[string]bool{}
  22. var esLock = sync.Mutex{}
  23. func delEsTask() {
  24. time.AfterFunc(30*time.Second, delEsTask)
  25. esLock.Lock()
  26. delEsAndRedis()
  27. esLock.Unlock()
  28. }
  29. func delEsAndRedis() {
  30. if len(esMap) > 0 {
  31. client := elastic.GetEsConn()
  32. defer elastic.DestoryEsConn(client)
  33. if client != nil {
  34. defer qutil.Catch()
  35. req := client.Bulk()
  36. var keys = []interface{}{}
  37. str := ""
  38. for k, _ := range esMap {
  39. str += " " + k
  40. keys = append(keys, fmt.Sprintf("jypcdetail__rec%s", k))
  41. req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(k))
  42. }
  43. res, err := req.Do()
  44. if err != nil {
  45. log.Println("批量删除es出错", err.Error())
  46. }
  47. log.Println("del es", len(res.Succeeded()))
  48. log.Println(str)
  49. log.Println("del redis", redis.Del("other", keys...))
  50. } else {
  51. log.Println("es client get failed")
  52. }
  53. esMap = map[string]bool{}
  54. }
  55. }
  56. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  57. defer qutil.Catch()
  58. q, _ := mapInfo["query"].(map[string]interface{})
  59. bkey, _ := mapInfo["bkey"].(string)
  60. if q == nil {
  61. q = map[string]interface{}{
  62. "_id": bson.M{
  63. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  64. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  65. },
  66. }
  67. }
  68. //连接信息
  69. //线程池
  70. UpdatesLock := sync.Mutex{}
  71. //bidding库
  72. session := mgo.GetMgoConn(86400)
  73. defer mgo.DestoryMongoConn(session)
  74. count, _ := session.DB(db).C(c).Find(&q).Count()
  75. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  76. //查询招标数据
  77. query := session.DB(db).C(c).Find(q).Select(bson.M{
  78. "projectinfo.attachment": 0,
  79. "contenthtml": 0,
  80. }).Sort("_id").Iter()
  81. //查询抽取结果
  82. log.Println("查询抽取结果..")
  83. //extract库
  84. n := 0
  85. //更新数组
  86. arr := [][]map[string]interface{}{}
  87. arrEs := []map[string]interface{}{}
  88. //对比两张表数据,减少查询次数
  89. var compare map[string]interface{}
  90. log.Println("开始迭代..")
  91. con := 0
  92. for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
  93. if qutil.IntAll(tmp["extracttype"]) == -1 || ((tmp["buyerclass"] != nil || tmp["projectname"] != nil || tmp["keywords"] != nil) && force == 0) {
  94. tmp = make(map[string]interface{})
  95. con++
  96. if con%500 == 0 {
  97. log.Println("跳过:", con)
  98. }
  99. continue
  100. }
  101. //删除索引和缓存
  102. tid := qutil.BsonIdToSId(tmp["_id"])
  103. if tid != "" {
  104. esLock.Lock()
  105. esMap[tid] = true
  106. if len(esMap) > 100 {
  107. delEsAndRedis()
  108. }
  109. esLock.Unlock()
  110. //log.Println("del es ", tid, elastic.DelById(index, itype, tid))
  111. //删除缓存
  112. //redis.DelByCodePattern("other", fmt.Sprintf("*%s*", tid))
  113. }
  114. update := map[string]interface{}{}
  115. //对比方法----------------
  116. compare = nil
  117. obj, bres := mgo.FindById(extractc, tid, nil)
  118. if bres && obj != nil {
  119. compare = *obj
  120. }
  121. //extractsession.DB(extractdb).C(extractc).FindId(tmp["_id"]).One(&compare)
  122. //下面可以多线程跑的--->
  123. //处理分类
  124. if compare != nil {
  125. for _, k := range fields {
  126. v1 := compare[k]
  127. v2 := tmp[k]
  128. if v2 == nil && v1 != nil {
  129. update[k] = v1
  130. } else if v2 != nil && v1 != nil {
  131. update[k] = v1
  132. }
  133. }
  134. subscopeclass, _ := compare["subscopeclass"].([]interface{})
  135. if subscopeclass != nil {
  136. //str := ","
  137. m1 := map[string]bool{}
  138. newclass := []string{}
  139. for _, sc := range subscopeclass {
  140. sclass, _ := sc.(string)
  141. if !m1[sclass] {
  142. m1[sclass] = true
  143. //str += sclass + ","
  144. newclass = append(newclass, sclass)
  145. }
  146. }
  147. update["s_subscopeclass"] = strings.Join(newclass, ",")
  148. update["subscopeclass"] = newclass
  149. }
  150. //处理中标企业
  151. winner, _ := compare["winner"].(string)
  152. m1 := map[string]bool{}
  153. if winner != "" {
  154. m1[winner] = true
  155. }
  156. package1 := compare["package"]
  157. if package1 != nil {
  158. packageM, _ := package1.(map[string]interface{})
  159. for _, p := range packageM {
  160. pm, _ := p.(map[string]interface{})
  161. pw, _ := pm["winner"].(string)
  162. if pw != "" {
  163. m1[pw] = true
  164. }
  165. }
  166. }
  167. compare = nil
  168. if len(m1) > 0 {
  169. //str := ","
  170. winnerarr := []string{}
  171. for k, _ := range m1 {
  172. //str += k + ","
  173. winnerarr = append(winnerarr, k)
  174. }
  175. update["s_winner"] = strings.Join(winnerarr, ",")
  176. }
  177. }
  178. //------------------对比结束
  179. //处理key descript
  180. if bkey == "" {
  181. DealInfo(&tmp, &update)
  182. }
  183. //同时保存到elastic
  184. for tk, tv := range update {
  185. tmp[tk] = tv
  186. }
  187. //对projectscope字段的索引处理
  188. ps, _ := tmp["projectscope"].(string)
  189. if ps == "" {
  190. tmp["projectscope"] = "" //= tmp["detail"]
  191. }
  192. if len(ps) > ESLEN {
  193. tmp["projectscope"] = string(([]rune(ps))[:4000])
  194. }
  195. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  196. tmp["budget"] = nil
  197. } else if sbd, ok := tmp["budget"].(string); ok {
  198. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  199. }
  200. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  201. tmp["bidamount"] = nil
  202. } else if sbd, ok := tmp["bidamount"].(string); ok {
  203. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  204. }
  205. UpdatesLock.Lock()
  206. // for k1, _ := range tmp {
  207. // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
  208. // delete(tmp, k1)
  209. // }
  210. // }
  211. newTmp := map[string]interface{}{}
  212. for _, v := range biddingIndexFields {
  213. // if tmp[v] != nil {
  214. // newTmp[v] = tmp[v]
  215. // }
  216. if tmp[v] != nil {
  217. if "projectinfo" == v {
  218. mp, _ := tmp[v].(map[string]interface{})
  219. if mp != nil {
  220. newmap := map[string]interface{}{}
  221. for _, v1 := range projectinfoFields {
  222. if mp[v1] != nil {
  223. newmap[v1] = mp[v1]
  224. }
  225. }
  226. newTmp[v] = newmap
  227. }
  228. } else {
  229. if v == "detail" {
  230. detail, _ := tmp[v].(string)
  231. newTmp[v] = FilterDetail(detail)
  232. } else {
  233. newTmp[v] = tmp[v]
  234. }
  235. }
  236. }
  237. }
  238. arrEs = append(arrEs, newTmp)
  239. if len(update) > 0 {
  240. arr = append(arr, []map[string]interface{}{
  241. map[string]interface{}{
  242. "_id": tmp["_id"],
  243. },
  244. map[string]interface{}{
  245. "$set": update,
  246. },
  247. })
  248. }
  249. if len(arr) >= BulkSize-1 {
  250. mgo.UpdateBulkAll(db, c, arr...)
  251. arr = [][]map[string]interface{}{}
  252. }
  253. if len(arrEs) >= BulkSize-1 {
  254. tmps := arrEs
  255. elastic.BulkSave(index, itype, &tmps, true)
  256. arrEs = []map[string]interface{}{}
  257. }
  258. UpdatesLock.Unlock()
  259. if n%100 == 0 {
  260. log.Println("current:", n)
  261. }
  262. tmp = make(map[string]interface{})
  263. }
  264. UpdatesLock.Lock()
  265. if len(arr) > 0 {
  266. mgo.UpdateBulkAll(db, c, arr...)
  267. }
  268. if len(arrEs) > 0 {
  269. tmps := arrEs
  270. elastic.BulkSave(index, itype, &tmps, true)
  271. }
  272. UpdatesLock.Unlock()
  273. log.Println(mapInfo, "create bidding index...over", n, con)
  274. }
  275. var client *mu.Client
  276. var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$")
  277. var reg_space = regexp.MustCompile("(?ism)(<style.*?>.*?</style>)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)")
  278. var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+")
  279. var reg_dh = regexp.MustCompile("[,]+")
  280. var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]")
  281. var reg_no = regexp.MustCompile("^[0-9]*$")
  282. var MSG_SERVER = "123.56.236.148:7070"
  283. var DesLen = 120
  284. func inits() {
  285. ser := qutil.ObjToString(Sysconfig["msg_server"])
  286. if ser != "" {
  287. MSG_SERVER = ser
  288. }
  289. cf := &mu.ClientConfig{
  290. ClientName: "剑鱼抽关键词",
  291. EventHandler: func(p *mu.Packet) {},
  292. MsgServerAddr: MSG_SERVER,
  293. CanHandleEvents: []int{},
  294. OnConnectSuccess: func() {
  295. log.Println("c.")
  296. },
  297. ReadBufferSize: 10,
  298. WriteBufferSize: 10,
  299. }
  300. client, _ = mu.NewClient(cf)
  301. }
  302. var keypool = make(chan bool, 1)
  303. func DealInfo(obj, update *map[string]interface{}) {
  304. defer qutil.Catch()
  305. if (*obj)["keywords"] != nil && (*obj)["description"] != nil {
  306. return
  307. } else {
  308. (*update)["keywords"] = ""
  309. (*update)["description"] = ""
  310. }
  311. title := qutil.ObjToString((*obj)["title"])
  312. var m [][]string
  313. select {
  314. case <-func() <-chan bool {
  315. select {
  316. case keypool <- true:
  317. defer func() {
  318. <-keypool
  319. }()
  320. ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1)
  321. json.Unmarshal(ret, &m)
  322. case <-time.After(10 * time.Millisecond):
  323. }
  324. ch := make(chan bool, 1)
  325. ch <- true
  326. return ch
  327. }():
  328. case <-time.After(50 * time.Millisecond):
  329. }
  330. arr := []string{}
  331. keyword := []string{}
  332. keywordnew := []string{}
  333. for _, tmp := range m {
  334. if reg.MatchString(tmp[0]) {
  335. arr = append(arr, tmp[0])
  336. } else {
  337. if len(arr) > 0 {
  338. str := strings.Join(arr, "")
  339. keyword = append(keyword, str)
  340. arr = []string{}
  341. }
  342. if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) {
  343. keyword = append(keyword, tmp[0])
  344. }
  345. }
  346. }
  347. for _, v := range keyword {
  348. v = reg_no.ReplaceAllString(v, "")
  349. if len(v) > 0 {
  350. keywordnew = append(keywordnew, v)
  351. }
  352. }
  353. keywords := strings.Join(keywordnew, ",")
  354. (*update)["keywords"] = keywords
  355. content := ""
  356. if (*obj)["detail_bak"] != nil {
  357. content = qutil.ObjToString((*obj)["detail_bak"])
  358. } else {
  359. content = qutil.ObjToString((*obj)["detail"])
  360. }
  361. //内容替换
  362. content = strings.Replace(content, " ", "", -1)
  363. content = reg_space.ReplaceAllString(content, "")
  364. content = reg_row.ReplaceAllString(content, ",")
  365. content = reg_dh.ReplaceAllString(content, ",")
  366. content = reg_newdb.ReplaceAllString(content, "$1")
  367. if strings.HasPrefix(content, ",") {
  368. content = content[1:]
  369. }
  370. //log.Println(content)
  371. tc := []rune(content)
  372. ltc := len(tc)
  373. description := content
  374. if ltc > DesLen {
  375. description = string(tc[:DesLen])
  376. }
  377. (*update)["description"] = description
  378. //保存到数据库
  379. return
  380. }
  381. var filterReg = regexp.MustCompile("<[^>]+>")
  382. func FilterDetail(text string) string {
  383. return filterReg.ReplaceAllString(text, "")
  384. }