main.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "qfw/common/src/qfw/util"
  12. qu "qfw/util"
  13. "regexp"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. Sysconfig map[string]interface{} //配置文件
  19. mconf map[string]interface{} //mongodb配置信息
  20. data_mgo *MongodbSim //mongodb操作对象
  21. task_mgo *MongodbSim //mongodb操作对象
  22. task_collName string
  23. extract string
  24. extract_back string
  25. udpclient mu.UdpClient //udp对象
  26. nextNode []map[string]interface{} //下节点数组
  27. dupdays = 7 //初始化判重范围
  28. DM *datamap //
  29. Update *updateInfo
  30. //正则筛选相关
  31. FilterRegTitle = regexp.MustCompile("^_$")
  32. FilterRegTitle_0 = regexp.MustCompile("^_$")
  33. FilterRegTitle_1 = regexp.MustCompile("^_$")
  34. FilterRegTitle_2 = regexp.MustCompile("^_$")
  35. threadNum int //线程数量
  36. SiteMap map[string]map[string]interface{} //站点map
  37. LowHeavy bool //低质量数据判重
  38. TimingTask bool //是否定时任务
  39. timingSpanDay int64 //时间跨度
  40. timingPubScope int64 //发布时间周期
  41. gtid,lastid,sec_gtid,sec_lteid string //命令输入
  42. lteid string //历史增量属性
  43. IsFull bool //是否全量
  44. updatelock sync.Mutex //锁4
  45. numberlock sync.Mutex //锁4
  46. userName,passWord string //mongo -用户密码
  47. taskList []map[string]interface{} //任务池
  48. )
  49. func init() {
  50. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  51. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  52. flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
  53. flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
  54. flag.Parse()
  55. qu.ReadConfig(&Sysconfig)
  56. userName = qu.ObjToString(Sysconfig["userName"])
  57. passWord = qu.ObjToString(Sysconfig["passWord"])
  58. log.Println("集群用户密码:",userName,passWord)
  59. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  60. task_mgo = &MongodbSim{
  61. MongodbAddr: task_mconf["task_addrName"].(string),
  62. DbName: task_mconf["task_dbName"].(string),
  63. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  64. UserName: userName,
  65. Password: passWord,
  66. }
  67. task_mgo.InitPool()
  68. task_collName = task_mconf["task_collName"].(string)
  69. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  70. mconf = Sysconfig["mongodb"].(map[string]interface{})
  71. data_mgo = &MongodbSim{
  72. MongodbAddr: mconf["addr"].(string),
  73. DbName: mconf["db"].(string),
  74. Size: util.IntAllDef(mconf["pool"], 10),
  75. }
  76. data_mgo.InitPool()
  77. extract = mconf["extract"].(string)
  78. extract_back = mconf["extract_back"].(string)
  79. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  80. //加载数据
  81. DM = NewDatamap(dupdays, lastid)
  82. //更新池
  83. Update = newUpdatePool()
  84. go Update.updateData()
  85. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  86. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  87. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  88. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  89. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  90. LowHeavy = Sysconfig["lowHeavy"].(bool)
  91. TimingTask = Sysconfig["timingTask"].(bool)
  92. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  93. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  94. //站点配置
  95. site := mconf["site"].(map[string]interface{})
  96. SiteMap = make(map[string]map[string]interface{}, 0)
  97. start := int(time.Now().Unix())
  98. sess_site := data_mgo.GetMgoConn()
  99. defer data_mgo.DestoryMongoConn(sess_site)
  100. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  101. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  102. data_map := map[string]interface{}{
  103. "area": util.ObjToString(site_dict["area"]),
  104. "city": util.ObjToString(site_dict["city"]),
  105. "district": util.ObjToString(site_dict["district"]),
  106. "sitetype": util.ObjToString(site_dict["sitetype"]),
  107. "level": util.ObjToString(site_dict["level"]),
  108. "weight": util.ObjToString(site_dict["weight"]),
  109. }
  110. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  111. }
  112. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  113. }
  114. //udp接收
  115. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  116. switch act {
  117. case mu.OP_TYPE_DATA:
  118. var mapInfo map[string]interface{}
  119. err := json.Unmarshal(data, &mapInfo)
  120. if err != nil {
  121. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  122. } else if mapInfo != nil {
  123. key, _ := mapInfo["key"].(string)
  124. if key == "" {
  125. key = "udpok"
  126. }
  127. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  128. //插入任务-判断任务-是否存在
  129. updatelock.Lock()
  130. taskList = append(taskList,mapInfo)
  131. log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
  132. updatelock.Unlock()
  133. }
  134. case mu.OP_NOOP: //下个节点回应
  135. ok := string(data)
  136. if ok != "" {
  137. log.Println("ok:", ok)
  138. udptaskmap.Delete(ok)
  139. }
  140. }
  141. }
  142. //监听-获取-分发判重任务
  143. func getRepeatTask() {
  144. for {
  145. if len(taskList)>0 {
  146. updatelock.Lock()
  147. mapInfo := taskList[0]
  148. if mapInfo != nil {
  149. increaseRepeat(mapInfo) //判重方法
  150. }
  151. taskList = taskList[1:]
  152. log.Println("此段落结束当前任务池...",len(taskList),taskList)
  153. updatelock.Unlock()
  154. }else {
  155. time.Sleep(15 * time.Second)
  156. }
  157. }
  158. }
  159. //主函数
  160. func main() {
  161. go checkMapJob()
  162. updport := Sysconfig["udpport"].(string)
  163. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  164. udpclient.Listen(processUdpMsg)
  165. log.Println("Udp服务监听", updport)
  166. if TimingTask {
  167. log.Println("正常历史部署")
  168. go historyRepeat()
  169. }else {
  170. //IsFull = true
  171. if !IsFull {//正常增量
  172. log.Println("正常增量部署,监听任务")
  173. go getRepeatTask()
  174. //新增调试
  175. //sid := "1fffffffffffffffffffffff"
  176. //eid := "9fffffffffffffffffffffff"
  177. //increaseRepeat(map[string]interface{}{
  178. // "gtid":sid,
  179. // "lteid":eid,
  180. //})
  181. }else {
  182. sid := "1fffffffffffffffffffffff"
  183. eid := "9fffffffffffffffffffffff"
  184. fullRepeat(sid,eid)
  185. }
  186. }
  187. time.Sleep(99999 * time.Hour)
  188. }