datamap.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "math"
  6. qutil "qfw/util"
  7. "qfw/util/mongodb"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. type Info struct {
  14. id string
  15. title string
  16. area string
  17. city string
  18. subtype string
  19. buyer string
  20. agency string //代理机构
  21. winner string //中标单位
  22. projectname string
  23. projectcode string
  24. publishtime int64
  25. comeintime int64
  26. ContainSpecialWord bool
  27. }
  28. var datelimit = float64(432000)
  29. type datamap struct {
  30. lock sync.Mutex //锁
  31. days int //保留几天数据
  32. data map[string][]*Info
  33. keymap []string
  34. keys map[string]bool
  35. }
  36. func NewDatamap(days int, lastid string) *datamap {
  37. datelimit = qutil.Float64All(days * 86400)
  38. dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
  39. if lastid == "" {
  40. return dm
  41. }
  42. //初始化加载数据
  43. sess := mgo.GetMgoConn()
  44. defer mgo.DestoryMongoConn(sess)
  45. it := sess.DB(mgo.DbName).C(extract).Find(mongodb.ObjToMQ(`{"_id":{"$lte":"`+lastid+`"}}`, true)).Sort("-_id").Iter()
  46. now1 := int64(0)
  47. n, continuSum := 0, 0
  48. for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
  49. //
  50. if qutil.IntAll(tmp["repeat"]) == 1 || qutil.ObjToString(tmp["subtype"]) == "变更" {
  51. continuSum++
  52. } else {
  53. cm := tmp["comeintime"]
  54. //cm := tmp["publishtime"]
  55. comeintime := qutil.Int64All(cm)
  56. if comeintime == 0 {
  57. id := qutil.BsonIdToSId(tmp["_id"])[0:8]
  58. comeintime, _ = strconv.ParseInt(id, 16, 64)
  59. }
  60. if now1 == 0 {
  61. now1 = comeintime
  62. }
  63. if qutil.Float64All(now1-comeintime) < datelimit {
  64. info := NewInfo(tmp)
  65. dkey := qutil.FormatDateWithObj(&cm, qutil.Date_yyyyMMdd)
  66. k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
  67. data := dm.data[k]
  68. if data == nil {
  69. data = []*Info{}
  70. //log.Println(k)
  71. }
  72. data = append(data, info)
  73. dm.data[k] = data
  74. dm.keys[dkey] = true
  75. } else {
  76. break
  77. }
  78. }
  79. if n%5000 == 0 {
  80. log.Println("current n:", n, continuSum)
  81. }
  82. tmp = make(map[string]interface{})
  83. }
  84. log.Println("load data:", n)
  85. return dm
  86. }
  87. func NewInfo(tmp map[string]interface{}) *Info {
  88. subtype := qutil.ObjToString(tmp["subtype"])
  89. area := qutil.ObjToString(tmp["area"])
  90. if area == "A" {
  91. area = "全国"
  92. }
  93. info := &Info{}
  94. info.id = qutil.BsonIdToSId(tmp["_id"])
  95. info.title = qutil.ObjToString(tmp["title"])
  96. info.area = area
  97. info.subtype = subtype
  98. info.buyer = qutil.ObjToString(tmp["buyer"])
  99. info.projectname = qutil.ObjToString(tmp["projectname"])
  100. info.ContainSpecialWord = FilterRegexp.MatchString(info.projectname) || FilterRegexp.MatchString(info.title)
  101. info.projectcode = qutil.ObjToString(tmp["projectcode"])
  102. info.city = qutil.ObjToString(tmp["city"])
  103. info.agency = qutil.ObjToString(tmp["agency"])
  104. //info.winner = qutil.ObjToString(tmp["winner"])
  105. info.publishtime = qutil.Int64All(tmp["publishtime"])
  106. return info
  107. }
  108. func (d *datamap) check(info *Info) (b bool, id string) {
  109. d.lock.Lock()
  110. defer d.lock.Unlock()
  111. keys := []string{}
  112. for k, _ := range d.keys {
  113. keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, info.area))
  114. if info.area != "全国" { //这个后续可以不要
  115. keys = append(keys, fmt.Sprintf("%s_%s_%s", k, info.subtype, "全国"))
  116. }
  117. }
  118. L:
  119. for _, k := range keys {
  120. data := d.data[k]
  121. if len(data) > 0 { //对比
  122. for _, v := range data {
  123. if v.id == info.id {
  124. return false, v.id
  125. }
  126. if math.Abs(qutil.Float64All(v.publishtime-info.publishtime)) > datelimit {
  127. continue
  128. }
  129. if v.agency != "" && info.agency != "" && v.agency != info.agency {
  130. continue
  131. }
  132. n := 0
  133. if v.buyer != "" && v.buyer == info.buyer {
  134. n++
  135. }
  136. if v.projectname != "" && v.projectname == info.projectname {
  137. n++
  138. }
  139. if !info.ContainSpecialWord && n > 1 {
  140. b = true
  141. id = v.id
  142. break L
  143. } else if v.projectcode != "" && v.projectcode == info.projectcode {
  144. n++
  145. }
  146. if !info.ContainSpecialWord && n > 1 || n > 2 {
  147. b = true
  148. id = v.id
  149. break L
  150. }
  151. //标题长度大于10且相等即为重复
  152. // if len([]rune(info.title)) > 10 && v.title == info.title {
  153. // b = true
  154. // id = v.id
  155. // break L
  156. // }
  157. //标题长度大于10且包含关系+buyer/projectname/projectcode/city(全国/A的只判断包含关系即可)相等即为重复
  158. if len([]rune(info.title)) > 10 && len([]rune(v.title)) > 10 && (strings.Contains(v.title, info.title) || strings.Contains(info.title, v.title)) {
  159. if info.area == "全国" || n > 0 || info.city == v.city {
  160. b = true
  161. id = v.id
  162. break L
  163. }
  164. }
  165. }
  166. }
  167. }
  168. if !b {
  169. ct, _ := strconv.ParseInt(info.id[:8], 16, 64)
  170. dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
  171. k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
  172. data := d.data[k]
  173. if data == nil {
  174. data = []*Info{info}
  175. d.data[k] = data
  176. if !d.keys[dkey] {
  177. d.keys[dkey] = true
  178. d.update(ct)
  179. }
  180. } else {
  181. data = append(data, info)
  182. d.data[k] = data
  183. }
  184. }
  185. return
  186. }
  187. func (d *datamap) update(t int64) {
  188. //每天0点清除历史数据
  189. d.keymap = d.GetLatelyFiveDay(t)
  190. m := map[string]bool{}
  191. for _, v := range d.keymap {
  192. m[v] = true
  193. }
  194. all, all1 := 0, 0
  195. for k, v := range d.data {
  196. all += len(v)
  197. if !m[k[:8]] {
  198. delete(d.data, k)
  199. }
  200. }
  201. for k, _ := range d.keys {
  202. if !m[k] {
  203. delete(d.keys, k)
  204. }
  205. }
  206. for _, v := range d.data {
  207. all1 += len(v)
  208. }
  209. log.Println("更新前后数据:", all, all1)
  210. }
  211. func (d *datamap) GetLatelyFiveDay(t int64) []string {
  212. array := make([]string, d.days)
  213. now := time.Unix(t, 0)
  214. for i := 0; i < d.days; i++ {
  215. array[i] = now.Format(qutil.Date_yyyyMMdd)
  216. now = now.AddDate(0, 0, -1)
  217. }
  218. return array
  219. }