biddingindex.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. qutil "qfw/util"
  9. elastic "qfw/util/elastic"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "time"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. var BulkSize = 200
  17. //对字段处理 bidamount budget
  18. //招标数据表和抽取表一一对应开始更新
  19. func biddingTask(data []byte, mapInfo map[string]interface{}) {
  20. defer qutil.Catch()
  21. q, _ := mapInfo["query"].(map[string]interface{})
  22. bkey, _ := mapInfo["bkey"].(string)
  23. if q == nil {
  24. q = map[string]interface{}{
  25. "_id": bson.M{
  26. "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
  27. "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
  28. },
  29. }
  30. }
  31. //连接信息
  32. c, _ := bidding["collect"].(string)
  33. extractc, _ := bidding["extractcollect"].(string)
  34. db, _ := bidding["db"].(string)
  35. extractdb, _ := bidding["extractdb"].(string)
  36. index, _ := bidding["index"].(string)
  37. itype, _ := bidding["type"].(string)
  38. //extract库
  39. extractsession := extractmgo.GetMgoConn(86400)
  40. defer extractmgo.DestoryMongoConn(extractsession)
  41. extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
  42. eMap := map[string]map[string]interface{}{}
  43. for tmp := make(map[string]interface{}); extractquery.Next(tmp); {
  44. tid := qutil.BsonIdToSId(tmp["_id"])
  45. eMap[tid] = tmp
  46. tmp = make(map[string]interface{})
  47. }
  48. //bidding库
  49. session := mgo.GetMgoConn(86400)
  50. count, _ := session.DB(db).C(c).Find(&q).Count()
  51. log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
  52. n1, n2 := 0, 0
  53. if count < 200000 {
  54. res := make([]map[string]interface{}, 1)
  55. session.DB(db).C(c).Find(q).Select(bson.M{
  56. "projectinfo.attachment": 0,
  57. "contenthtml": 0,
  58. }).All(&res)
  59. mgo.DestoryMongoConn(session)
  60. if len(res) != count {
  61. log.Println("查询结果不一致", "count:", count, "res:", len(res))
  62. time.Sleep(20 * time.Second)
  63. toadd := &net.UDPAddr{
  64. IP: net.ParseIP("127.0.0.1"),
  65. Port: qutil.IntAll(Sysconfig["udpport"]),
  66. }
  67. udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
  68. } else {
  69. n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey)
  70. if (n1 + n2) != count {
  71. log.Println("任务错误,结果不一致")
  72. }
  73. }
  74. } else {
  75. log.Println("数据量太大,放弃!", count)
  76. mgo.DestoryMongoConn(session)
  77. }
  78. log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
  79. }
  80. func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
  81. n1, n2 := 0, 0
  82. //线程池
  83. UpdatesLock := sync.Mutex{}
  84. fields := strings.Split(bidding["fields"].(string), ",")
  85. //更新数组
  86. arr := [][]map[string]interface{}{}
  87. arrEs := []map[string]interface{}{}
  88. //对比两张表数据,减少查询次数
  89. var compare bson.M
  90. log.Println("开始迭代..")
  91. for n, tmp := range infos {
  92. n1++
  93. update := map[string]interface{}{}
  94. //对比方法----------------
  95. tid := qutil.BsonIdToSId(tmp["_id"])
  96. if eMap[tid] != nil {
  97. compare = eMap[tid]
  98. delete(eMap, tid)
  99. //更新bidding表,生成索引
  100. for _, k := range fields {
  101. v1 := compare[k]
  102. v2 := tmp[k]
  103. if v2 == nil && v1 != nil {
  104. update[k] = v1
  105. } else if v2 != nil && v1 != nil {
  106. //update[k+"_b"] = v2
  107. update[k] = v1
  108. } else if v2 != nil && v1 == nil {
  109. //update[k+"_b"] = v2
  110. }
  111. }
  112. if qutil.IntAll(compare["repeat"]) == 1 {
  113. update["extracttype"] = -1
  114. } else {
  115. update["extracttype"] = 1
  116. }
  117. } else {
  118. compare = nil
  119. }
  120. //下面可以多线程跑的--->
  121. //处理分类
  122. if compare != nil {
  123. subscopeclass, _ := compare["subscopeclass"].([]interface{})
  124. if subscopeclass != nil {
  125. //str := ","
  126. m1 := map[string]bool{}
  127. newclass := []string{}
  128. for _, sc := range subscopeclass {
  129. sclass, _ := sc.(string)
  130. if !m1[sclass] {
  131. m1[sclass] = true
  132. //str += sclass + ","
  133. newclass = append(newclass, sclass)
  134. }
  135. }
  136. update["s_subscopeclass"] = strings.Join(newclass, ",")
  137. update["subscopeclass"] = newclass
  138. }
  139. //处理中标企业
  140. winner, _ := compare["winner"].(string)
  141. m1 := map[string]bool{}
  142. if winner != "" {
  143. m1[winner] = true
  144. }
  145. package1 := compare["package"]
  146. if package1 != nil {
  147. packageM, _ := package1.(map[string]interface{})
  148. for _, p := range packageM {
  149. pm, _ := p.(map[string]interface{})
  150. pw, _ := pm["winner"].(string)
  151. if pw != "" {
  152. m1[pw] = true
  153. }
  154. }
  155. }
  156. compare = nil
  157. if len(m1) > 0 {
  158. //str := ","
  159. winnerarr := []string{}
  160. for k, _ := range m1 {
  161. //str += k + ","
  162. winnerarr = append(winnerarr, k)
  163. }
  164. update["s_winner"] = strings.Join(winnerarr, ",")
  165. }
  166. }
  167. //------------------对比结束
  168. //处理key descript
  169. if bkey == "" {
  170. DealInfo(&tmp, &update)
  171. }
  172. //同时保存到elastic
  173. for tk, tv := range update {
  174. tmp[tk] = tv
  175. }
  176. //对projectscope字段的索引处理
  177. ps, _ := tmp["projectscope"].(string)
  178. if ps == "" {
  179. tmp["projectscope"] = "" //= tmp["detail"]
  180. }
  181. if len(ps) > ESLEN {
  182. tmp["projectscope"] = string(([]rune(ps))[:4000])
  183. }
  184. if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
  185. tmp["budget"] = nil
  186. } else if sbd, ok := tmp["budget"].(string); ok {
  187. tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  188. }
  189. if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
  190. tmp["bidamount"] = nil
  191. } else if sbd, ok := tmp["bidamount"].(string); ok {
  192. tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
  193. }
  194. UpdatesLock.Lock()
  195. // for k1, _ := range tmp {
  196. // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
  197. // delete(tmp, k1)
  198. // }
  199. // }
  200. go IS.Add("bidding")
  201. if qutil.IntAll(update["extracttype"]) != -1 {
  202. newTmp := map[string]interface{}{}
  203. for _, v := range biddingIndexFields {
  204. // if tmp[v] != nil {
  205. // newTmp[v] = tmp[v]
  206. // }
  207. if tmp[v] != nil {
  208. if "projectinfo" == v {
  209. mp, _ := tmp[v].(map[string]interface{})
  210. if mp != nil {
  211. newmap := map[string]interface{}{}
  212. for _, v1 := range projectinfoFields {
  213. if mp[v1] != nil {
  214. newmap[v1] = mp[v1]
  215. }
  216. }
  217. newTmp[v] = newmap
  218. }
  219. } else {
  220. if v == "detail" {
  221. detail, _ := tmp[v].(string)
  222. newTmp[v] = FilterDetail(detail)
  223. } else {
  224. newTmp[v] = tmp[v]
  225. }
  226. }
  227. } else if v == "budget" || v == "bidamount" {
  228. newTmp[v] = nil
  229. }
  230. }
  231. arrEs = append(arrEs, newTmp)
  232. }
  233. if len(update) > 0 {
  234. arr = append(arr, []map[string]interface{}{
  235. map[string]interface{}{
  236. "_id": tmp["_id"],
  237. },
  238. map[string]interface{}{
  239. "$set": update,
  240. },
  241. })
  242. }
  243. if len(arr) >= BulkSize-1 {
  244. mgo.UpdateBulkAll(db, c, arr...)
  245. arr = [][]map[string]interface{}{}
  246. }
  247. if len(arrEs) >= BulkSize-1 {
  248. tmps := arrEs
  249. elastic.BulkSave(index, itype, &tmps, true)
  250. if len(multiIndex) == 2 {
  251. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  252. }
  253. arrEs = []map[string]interface{}{}
  254. }
  255. UpdatesLock.Unlock()
  256. if n%100 == 0 {
  257. log.Println("current:", n)
  258. }
  259. tmp = make(map[string]interface{})
  260. }
  261. UpdatesLock.Lock()
  262. if len(arr) > 0 {
  263. mgo.UpdateBulkAll(db, c, arr...)
  264. }
  265. if len(arrEs) > 0 {
  266. tmps := arrEs
  267. elastic.BulkSave(index, itype, &tmps, true)
  268. if len(multiIndex) == 2 {
  269. elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
  270. }
  271. }
  272. UpdatesLock.Unlock()
  273. return n1, n2
  274. }
  275. var client *mu.Client
  276. var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$")
  277. var reg_space = regexp.MustCompile("(?ism)(<style.*?>.*?</style>)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)")
  278. var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+")
  279. var reg_dh = regexp.MustCompile("[,]+")
  280. var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]")
  281. var reg_no = regexp.MustCompile("^[0-9]*$")
  282. var MSG_SERVER = "123.56.236.148:7070"
  283. var DesLen = 120
  284. func inits() {
  285. ser := qutil.ObjToString(Sysconfig["msg_server"])
  286. if ser != "" {
  287. MSG_SERVER = ser
  288. }
  289. cf := &mu.ClientConfig{
  290. ClientName: "剑鱼抽关键词",
  291. EventHandler: func(p *mu.Packet) {},
  292. MsgServerAddr: MSG_SERVER,
  293. CanHandleEvents: []int{},
  294. OnConnectSuccess: func() {
  295. log.Println("c.")
  296. },
  297. ReadBufferSize: 10,
  298. WriteBufferSize: 10,
  299. }
  300. client, _ = mu.NewClient(cf)
  301. }
  302. //var clientlock = &sync.Mutex{}
  303. var keypool = make(chan bool, 1)
  304. func DealInfo(obj, update *map[string]interface{}) {
  305. defer qutil.Catch()
  306. if (*obj)["keywords"] != nil && (*obj)["description"] != nil {
  307. return
  308. } else {
  309. (*update)["keywords"] = ""
  310. (*update)["description"] = ""
  311. }
  312. title := qutil.ObjToString((*obj)["title"])
  313. var m [][]string
  314. select {
  315. case <-func() <-chan bool {
  316. select {
  317. case keypool <- true:
  318. defer func() {
  319. <-keypool
  320. }()
  321. ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1)
  322. json.Unmarshal(ret, &m)
  323. case <-time.After(5 * time.Millisecond):
  324. }
  325. ch := make(chan bool, 1)
  326. ch <- true
  327. return ch
  328. }():
  329. case <-time.After(20 * time.Millisecond):
  330. }
  331. arr := []string{}
  332. keyword := []string{}
  333. keywordnew := []string{}
  334. for _, tmp := range m {
  335. if reg.MatchString(tmp[0]) {
  336. arr = append(arr, tmp[0])
  337. } else {
  338. if len(arr) > 0 {
  339. str := strings.Join(arr, "")
  340. keyword = append(keyword, str)
  341. arr = []string{}
  342. }
  343. if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) {
  344. keyword = append(keyword, tmp[0])
  345. }
  346. }
  347. }
  348. for _, v := range keyword {
  349. v = reg_no.ReplaceAllString(v, "")
  350. if len(v) > 0 {
  351. keywordnew = append(keywordnew, v)
  352. }
  353. }
  354. keywords := strings.Join(keywordnew, ",")
  355. (*update)["keywords"] = keywords
  356. content := ""
  357. if (*obj)["detail_bak"] != nil {
  358. content = qutil.ObjToString((*obj)["detail_bak"])
  359. } else {
  360. content = qutil.ObjToString((*obj)["detail"])
  361. }
  362. //内容替换
  363. content = strings.Replace(content, " ", "", -1)
  364. content = reg_space.ReplaceAllString(content, "")
  365. content = reg_row.ReplaceAllString(content, ",")
  366. content = reg_dh.ReplaceAllString(content, ",")
  367. content = reg_newdb.ReplaceAllString(content, "$1")
  368. if strings.HasPrefix(content, ",") {
  369. content = content[1:]
  370. }
  371. //log.Println(content)
  372. tc := []rune(content)
  373. ltc := len(tc)
  374. description := content
  375. if ltc > DesLen {
  376. description = string(tc[:DesLen])
  377. }
  378. (*update)["description"] = description
  379. //保存到数据库
  380. return
  381. }