datamap.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. package main
  2. import (
  3. "fmt"
  4. qutil "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "log"
  6. "reflect"
  7. "regexp"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type Info struct {
  13. id string //id
  14. title string //标题
  15. spidercode string //爬虫代码
  16. area string //省份
  17. city string //城市
  18. subtype string //信息类型
  19. buyer string //采购单位
  20. agency string //代理机构
  21. winner string //中标单位
  22. budget float64 //预算金额
  23. bidamount float64 //中标金额
  24. projectname string //项目名称
  25. projectcode string //项目编号
  26. contractnumber string //合同编号
  27. publishtime int64 //发布时间
  28. comeintime int64 //入库时间
  29. bidopentime int64 //开标时间
  30. bidopenaddress string //开标地点
  31. site string //站点
  32. href string //正文的url
  33. repeatid string //重复id
  34. specialWord bool //特殊词
  35. titleSpecialWord bool //标题特殊词
  36. isJphref bool //是否竞品数据
  37. c_title string //清洗后的标题
  38. c_projectname string //清洗后的项目名称
  39. }
  40. var datelimit = float64(432000) //五天
  41. var sitelock sync.Mutex //锁
  42. // 一般数据判重
  43. type datamap struct {
  44. lock sync.Mutex //锁
  45. days int //保留几天数据
  46. data map[string][]*Info
  47. keymap []string
  48. areakeys []string
  49. keys map[string]bool
  50. }
  51. // 历史~存量
  52. func TimedTaskDatamap(days int, lasttime int64, numIndex int) *datamap {
  53. datelimit = qutil.Float64All(days * 86400)
  54. dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{}, map[string]bool{}}
  55. if lasttime < 0 {
  56. log.Println("数据池空数据")
  57. return dm
  58. }
  59. start := int(time.Now().Unix())
  60. sess := data_mgo.GetMgoConn()
  61. defer data_mgo.DestoryMongoConn(sess)
  62. query := map[string]interface{}{"publishtime": map[string]interface{}{
  63. "$lt": lasttime,
  64. }}
  65. log.Println("query", query)
  66. it := sess.DB(data_mgo.DbName).C(extract_back).Find(query).Sort("-publishtime").Iter()
  67. n, continuSum := 0, 0
  68. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  69. //if n%10000 == 0 {
  70. // log.Println("当前 n:", n, "数量:", continuSum, tmp["_id"], tmp["publishtime"])
  71. //}
  72. if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 || qutil.IntAll(tmp["dataging"]) == 1 ||
  73. qutil.ObjToString(tmp["subtype"]) == "拟建" || qutil.ObjToString(tmp["subtype"]) == "产权" ||
  74. qutil.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
  75. } else {
  76. if fmt.Sprint(reflect.TypeOf(tmp["publishtime"])) == "string" {
  77. continue
  78. }
  79. pt := tmp["publishtime"]
  80. pt_time := qutil.Int64All(pt)
  81. if pt_time > time.Now().Unix() {
  82. continue
  83. }
  84. if qutil.Float64All(lasttime-pt_time) < datelimit {
  85. continuSum++
  86. info := NewInfo(tmp)
  87. dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
  88. k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
  89. data := dm.data[k]
  90. if data == nil {
  91. data = []*Info{}
  92. }
  93. data = append(data, info)
  94. dm.data[k] = data
  95. dm.keys[dkey] = true
  96. //添加省
  97. isAreaExist := false
  98. for _, v := range dm.areakeys {
  99. if v == info.area {
  100. isAreaExist = true
  101. }
  102. }
  103. if !isAreaExist {
  104. areaArr := dm.areakeys
  105. areaArr = append(areaArr, info.area)
  106. dm.areakeys = areaArr
  107. }
  108. } else {
  109. break
  110. }
  111. }
  112. tmp = make(map[string]interface{})
  113. }
  114. log.Printf("第%d组:数据池构建完成:%d秒,%d个\n", numIndex, int(time.Now().Unix())-start, n)
  115. return dm
  116. }
  117. // 增量
  118. func NewDatamap(days int, lastid string) *datamap {
  119. datelimit = qutil.Float64All(days * 86400 * 2)
  120. dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, []string{}, map[string]bool{}}
  121. if lastid == "" {
  122. log.Println("不构建数据池")
  123. return dm
  124. }
  125. //初始化加载数据
  126. sess := data_mgo.GetMgoConn()
  127. defer data_mgo.DestoryMongoConn(sess)
  128. query := map[string]interface{}{"_id": map[string]interface{}{
  129. "$lte": StringTOBsonId(lastid),
  130. }}
  131. log.Println("query", query)
  132. it := sess.DB(data_mgo.DbName).C(extract).Find(query).Sort("-publishtime").Iter()
  133. nowTime := time.Now().Unix() //当前时间的时间戳
  134. n, continuSum := 0, 0
  135. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  136. if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1 ||
  137. qutil.ObjToString(tmp["subtype"]) == "拟建" || qutil.ObjToString(tmp["subtype"]) == "产权" ||
  138. qutil.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
  139. } else {
  140. if fmt.Sprint(reflect.TypeOf(tmp["publishtime"])) == "string" {
  141. continue
  142. }
  143. pt := tmp["publishtime"]
  144. pt_time := qutil.Int64All(pt)
  145. if pt_time > time.Now().Unix() {
  146. continue
  147. }
  148. if qutil.Float64All(nowTime-pt_time) <= datelimit {
  149. continuSum++
  150. info := NewInfo(tmp)
  151. dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
  152. k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
  153. data := dm.data[k]
  154. if data == nil {
  155. data = []*Info{}
  156. }
  157. data = append(data, info)
  158. dm.data[k] = data
  159. dm.keys[dkey] = true
  160. //添加省
  161. isAreaExist := false
  162. for _, v := range dm.areakeys {
  163. if v == info.area {
  164. isAreaExist = true
  165. }
  166. }
  167. if !isAreaExist {
  168. areaArr := dm.areakeys
  169. areaArr = append(areaArr, info.area)
  170. dm.areakeys = areaArr
  171. }
  172. } else {
  173. break
  174. }
  175. }
  176. if n%10000 == 0 {
  177. log.Println("当前 n:", n, "数量:", continuSum, tmp["_id"])
  178. }
  179. tmp = make(map[string]interface{})
  180. }
  181. log.Println("load data:", n, "总数:", continuSum)
  182. return dm
  183. }
  184. // 数据构建
  185. func NewInfo(tmp map[string]interface{}) *Info {
  186. subtype := qutil.ObjToString(tmp["subtype"])
  187. if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
  188. subtype == "竞谈" || subtype == "竞价" {
  189. subtype = "招标"
  190. }
  191. area := qutil.ObjToString(tmp["area"])
  192. if area == "A" {
  193. area = "全国"
  194. }
  195. info := &Info{}
  196. info.id = BsonTOStringId(tmp["_id"])
  197. info.title = qutil.ObjToString(tmp["title"])
  198. info.area = area
  199. info.subtype = subtype
  200. info.spidercode = qutil.ObjToString(tmp["spidercode"])
  201. info.buyer = qutil.ObjToString(tmp["buyer"])
  202. info.projectname = qutil.ObjToString(tmp["projectname"])
  203. info.contractnumber = qutil.ObjToString(tmp["contractnumber"])
  204. info.projectcode = qutil.ObjToString(tmp["projectcode"])
  205. info.city = qutil.ObjToString(tmp["city"])
  206. info.agency = qutil.ObjToString(tmp["agency"])
  207. info.winner = deleteExtraSpaceName(qutil.ObjToString(tmp["winner"]))
  208. info.budget = qutil.Float64All(tmp["budget"])
  209. info.bidamount = qutil.Float64All(tmp["bidamount"])
  210. info.publishtime = qutil.Int64All(tmp["publishtime"])
  211. info.comeintime = qutil.Int64All(tmp["comeintime"])
  212. info.bidopentime = qutil.Int64All(tmp["bidopentime"])
  213. info.bidopenaddress = qutil.ObjToString(tmp["bidopenaddress"])
  214. info.site = qutil.ObjToString(tmp["site"])
  215. info.href = qutil.ObjToString(tmp["href"])
  216. info.repeatid = qutil.ObjToString(tmp["repeatid"])
  217. info.specialWord = FilterRegTitle.MatchString(info.title)
  218. info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) || FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
  219. info.isJphref = IsJpHref(qutil.ObjToString(tmp["href"]))
  220. //经过通用清洗后
  221. info.c_title = cleanNameFilterRedundant(info.title)
  222. info.c_projectname = cleanNameFilterRedundant(info.projectname)
  223. return info
  224. }
  225. // 判重方法
  226. // 判重方法
  227. // 判重方法
  228. func (d *datamap) check(info *Info) (b bool, source *Info, reasons string) {
  229. reason := ""
  230. keys := []string{}
  231. d.lock.Lock()
  232. for k, _ := range d.keys { //不同时间段
  233. if info.area == "全国" { //匹配所有省
  234. for _, v := range d.areakeys {
  235. keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, v))
  236. }
  237. } else { //匹配指定省
  238. keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
  239. }
  240. keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
  241. }
  242. d.lock.Unlock()
  243. L:
  244. for _, k := range keys {
  245. d.lock.Lock()
  246. data := d.data[k]
  247. d.lock.Unlock()
  248. if len(data) > 0 { //对比v 找到同类型,同省或全国的数据作对比
  249. for _, v := range data {
  250. reason = ""
  251. if v.id == info.id { //正常重复
  252. return false, v, ""
  253. }
  254. //buyer 优先级高,有值且不相等过滤
  255. if info.buyer != "" && v.buyer != "" && info.buyer != v.buyer {
  256. if buyerIsContinue(v, info) {
  257. continue
  258. }
  259. }
  260. // 竞品判重模式
  261. if v.isJphref || info.isJphref {
  262. if confirmJingPinIsRepeatData(v, info) {
  263. reason = "竞品模式~重复"
  264. b = true
  265. source = v
  266. reasons = reason
  267. break L
  268. }
  269. }
  270. //站点补城市
  271. if info.site != "" { //站点临时赋值
  272. if info.area == "全国" || info.city == "" {
  273. sitelock.Lock()
  274. dict := SiteMap[info.site]
  275. sitelock.Unlock()
  276. if dict != nil && qutil.ObjToString(dict["city"]) != "" {
  277. info.area = qutil.ObjToString(dict["area"])
  278. info.city = qutil.ObjToString(dict["city"])
  279. }
  280. }
  281. }
  282. //前置条件-五要素均相等
  283. if leadingElementSame(v, info) {
  284. reason = "五要素-相同-满足"
  285. b = true
  286. source = v
  287. reasons = reason
  288. break L
  289. }
  290. //前置条件 - 站点相关
  291. if info.site != "" && info.site == v.site {
  292. if info.href != "" && info.href == v.href {
  293. reason = "同站点-href相同"
  294. b = true
  295. source = v
  296. reasons = reason
  297. break L
  298. }
  299. //相同发布时间-标题无包含关系 - 项目名称不等
  300. if isTheSameDay(info.publishtime, v.publishtime) {
  301. if !isTheSimilarName(info.title, v.title) {
  302. continue
  303. }
  304. }
  305. //不同href
  306. if info.href != "" && info.href != v.href {
  307. if v.title == info.title {
  308. if !againRepeat(v, info, true) { //进行同站点二次判断
  309. reason = "同站点-href不同-标题相同等"
  310. b = true
  311. source = v
  312. reasons = reason
  313. break L
  314. } else {
  315. continue
  316. }
  317. } else {
  318. if againRepeat(v, info, true) {
  319. continue
  320. }
  321. }
  322. }
  323. }
  324. //特殊词处理
  325. specialNum := dealWithSpecialWordNumber(info, v)
  326. //前置条件 - 标题相关,有且一个关键词
  327. if specialNum == 1 {
  328. if againRepeat(v, info, false) {
  329. continue
  330. }
  331. }
  332. //前置条件 - 标题相关,均含有关键词
  333. if specialNum == 2 {
  334. if len([]rune(v.title)) > 10 && len([]rune(info.title)) > 10 &&
  335. v.title != "" && info.title != "" {
  336. letter1, letter2 := v.title, info.title
  337. res, _ := regexp.Compile("[0-9a-zA-Z]+")
  338. if res.MatchString(letter1) || res.MatchString(letter2) {
  339. letter1 = convertArabicNumeralsAndLetters(letter1)
  340. letter2 = convertArabicNumeralsAndLetters(letter2)
  341. }
  342. if strings.Contains(letter1, "重新招标") || strings.Contains(letter2, "重新招标") {
  343. letter1, letter2 = dealWithSpecialPhrases(letter1, letter2)
  344. }
  345. letter1 = cleanNameFilterRedundant(letter1)
  346. letter2 = cleanNameFilterRedundant(letter2)
  347. if letter1 == letter2 {
  348. reason = reason + "标题关键词相等有效关系"
  349. if !againRepeat(v, info, false) { //进行二级金额判断
  350. b = true
  351. source = v
  352. reasons = reason
  353. break L
  354. }
  355. } else {
  356. if !(strings.Contains(letter1, letter2) || strings.Contains(letter2, letter1)) {
  357. if againContainSpecialWord(v, info) { //无包含关系-即不相等
  358. continue
  359. }
  360. }
  361. }
  362. }
  363. }
  364. //新增快速数据过少判重
  365. if LowHeavy {
  366. repeat := false
  367. if repeat, reason = fastLowQualityHeavy(v, info, reason); repeat {
  368. b = true
  369. source = v
  370. reasons = reason
  371. break L
  372. }
  373. }
  374. //代理机构相同-非空相等
  375. if v.agency != "" && info.agency != "" && v.agency == info.agency {
  376. reason = reason + "同机构-"
  377. repeat := false
  378. if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
  379. b = true
  380. source = v
  381. reasons = reason
  382. break L
  383. }
  384. } else {
  385. reason = reason + "非同机构-"
  386. if info.city != "" && info.city == v.city {
  387. reason = reason + "同城-"
  388. repeat := false
  389. if repeat, reason = quickHeavyMethodTwo(v, info, reason); repeat {
  390. b = true
  391. source = v
  392. reasons = reason
  393. break L
  394. }
  395. } else {
  396. reason = reason + "不同城-"
  397. repeat := false
  398. if repeat, reason = quickHeavyMethodOne(v, info, reason); repeat {
  399. b = true
  400. source = v
  401. reasons = reason
  402. break L
  403. }
  404. }
  405. }
  406. }
  407. }
  408. }
  409. //往预存数据 d 添加
  410. if !b {
  411. ct := info.publishtime
  412. dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
  413. k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
  414. d.lock.Lock()
  415. data := d.data[k]
  416. if data == nil {
  417. data = []*Info{info}
  418. d.data[k] = data
  419. if !d.keys[dkey] {
  420. d.keys[dkey] = true
  421. d.update(ct)
  422. }
  423. } else {
  424. data = append(data, info)
  425. d.data[k] = data
  426. }
  427. //添加省
  428. isAreaExist := false
  429. for _, v := range d.areakeys {
  430. if v == info.area {
  431. isAreaExist = true
  432. }
  433. }
  434. if !isAreaExist {
  435. areaArr := d.areakeys
  436. areaArr = append(areaArr, info.area)
  437. d.areakeys = areaArr
  438. }
  439. d.lock.Unlock()
  440. }
  441. return
  442. }
  443. func (d *datamap) update(t int64) {
  444. if TimingTask {
  445. } else {
  446. if IsFull {
  447. d.keymap = d.GetLatelyFiveDay(t) //全量
  448. } else {
  449. d.keymap = d.GetLatelyFiveDayDouble(t) //增量
  450. }
  451. m := map[string]bool{}
  452. for _, v := range d.keymap {
  453. m[v] = true
  454. }
  455. for k, _ := range d.data {
  456. if !m[k[:8]] {
  457. delete(d.data, k)
  458. }
  459. }
  460. for k, _ := range d.keys {
  461. if !m[k] {
  462. delete(d.keys, k)
  463. }
  464. }
  465. }
  466. }
  467. func (d *datamap) GetLatelyFiveDay(t int64) []string {
  468. array := make([]string, d.days)
  469. now := time.Unix(t, 0)
  470. for i := 0; i < d.days; i++ {
  471. array[i] = now.Format(qutil.Date_yyyyMMdd)
  472. now = now.AddDate(0, 0, -1)
  473. }
  474. return array
  475. }
  476. func (d *datamap) GetLatelyFiveDayDouble(t int64) []string { //增量-两倍
  477. array := make([]string, d.days*2)
  478. now := time.Now()
  479. for i := 0; i < d.days*2; i++ {
  480. array[i] = now.Format(qutil.Date_yyyyMMdd)
  481. now = now.AddDate(0, 0, -1)
  482. }
  483. return array
  484. }
  485. // 替换原始数据池-更新
  486. func (d *datamap) replacePoolData(newData *Info) {
  487. d.lock.Lock()
  488. ct := newData.publishtime
  489. dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
  490. k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
  491. data := d.data[k]
  492. for k, v := range data {
  493. if v.id == newData.id { //替换
  494. data[k] = newData
  495. break
  496. }
  497. }
  498. d.data[k] = data
  499. d.lock.Unlock()
  500. }
  501. // 相互替换数据池-暂时弃用
  502. func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
  503. //删除数据池的老数据
  504. ct_old := oldData.publishtime
  505. dkey_old := qutil.FormatDateByInt64(&ct_old, qutil.Date_yyyyMMdd)
  506. k_old := fmt.Sprintf("%s_%s_%s", dkey_old, oldData.subtype, oldData.area)
  507. data_old := d.data[k_old]
  508. for k, v := range data_old {
  509. if v.id == oldData.id { //删除对应当前的老数据
  510. data_old = append(data_old[:k], data_old[k+1:]...)
  511. break
  512. }
  513. }
  514. d.data[k_old] = data_old
  515. //添加新的
  516. ct := newData.publishtime
  517. dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
  518. k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
  519. d.lock.Lock()
  520. data := d.data[k]
  521. if data == nil {
  522. data = []*Info{newData}
  523. d.data[k] = data
  524. if !d.keys[dkey] {
  525. d.keys[dkey] = true
  526. d.update(ct)
  527. }
  528. } else {
  529. data = append(data, newData)
  530. d.data[k] = data
  531. }
  532. //添加省
  533. isAreaExist := false
  534. for _, v := range d.areakeys {
  535. if v == newData.area {
  536. isAreaExist = true
  537. }
  538. }
  539. if !isAreaExist {
  540. areaArr := d.areakeys
  541. areaArr = append(areaArr, newData.area)
  542. d.areakeys = areaArr
  543. }
  544. d.lock.Unlock()
  545. }
  546. // 总计条数-暂时弃用
  547. func (d *datamap) currentTotalCount() int {
  548. num := qutil.IntAll(0)
  549. for _, v := range d.data {
  550. num = num + qutil.IntAll(len(v))
  551. }
  552. return num
  553. }