123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "log"
- mu "mfw/util"
- "net"
- "nsqdata"
- qu "qfw/util"
- "regexp"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- data_mgo, task_mgo, spider_mgo *MongodbSim
- task_coll, task_bidding, spider_coll string
- extract, extract_back, extract_log string
- udpclient mu.UdpClient
- nextNode []map[string]interface{}
- dupdays = 7
- DM, FullDM *datamap
- Update *updateInfo
- AddGroupPool *addGroupInfo
- //正则筛选相关
- FilterRegTitle = regexp.MustCompile("^_$")
- FilterRegTitle_0 = regexp.MustCompile("^_$")
- FilterRegTitle_1 = regexp.MustCompile("^_$")
- FilterRegTitle_2 = regexp.MustCompile("^_$")
- threadNum int
- SiteMap map[string]map[string]interface{}
- LowHeavy, TimingTask, IsFull, isUpdateSite bool
- timingSpanDay, timingPubScope int64
- gtid, lastid, sec_gtid, sec_lteid, lteid string
- updatelock, datalock, numlock, cronlock sync.Mutex
- jyfb_data map[string]string
- taskList []map[string]interface{}
- nspdata_1, nspdata_2 *nsqdata.Producer
- )
- //初始化加载
- func init() {
- flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
- flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
- flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
- flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
- flag.Parse()
- qu.ReadConfig(&Sysconfig)
- initMgo()
- initOther()
- initSite()
- }
- func mainT() {
- IsFull = true
- //AddGroupPool = newAddGroupPool()
- //go AddGroupPool.addGroupData()
- //fullDataRepeat() //全量判重
- increaseRepeat(map[string]interface{}{
- "gtid": "12ec61170ae152a3c2310f02",
- "lteid": "92ec61170ae152a3c2310f02",
- })
- //gtid = "62ec2dd00ae152a3c230c1a1"
- //lteid = "62ec2dd00ae152a3c230c1e1"
- //historyRepeat()
- time.Sleep(99999 * time.Hour)
- }
- //主函数
- func main() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- if TimingTask {
- log.Println("正常历史部署")
- go historyRepeat()
- } else {
- if !IsFull {
- log.Println("正常增量部署,监听任务")
- go getRepeatTask()
- }
- }
- time.Sleep(99999 * time.Hour)
- }
- //udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
- if sid == "" || eid == "" {
- log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
- } else {
- key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //计算是否需要加载站点~每天加载一次
- if isUpdateSite {
- initSite()
- }
- //插入任务-判断任务-是否存在
- updatelock.Lock()
- taskList = append(taskList, mapInfo)
- log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
- updatelock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //监听-获取-分发判重任务
- func getRepeatTask() {
- for {
- if len(taskList) > 0 {
- updatelock.Lock()
- len_list := len(taskList)
- if len_list > 1 {
- first_id := taskList[0]["gtid"]
- end_id := taskList[len_list-1]["lteid"]
- if first_id != "" && end_id != "" {
- log.Println("合并段落~正常~", first_id, "~", end_id)
- increaseRepeat(map[string]interface{}{
- "gtid": first_id,
- "lteid": end_id,
- })
- taskList = taskList[len_list:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- } else {
- log.Println("合并段落~错误~正常取段落~~~")
- mapInfo := taskList[0]
- if mapInfo != nil {
- increaseRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- }
- } else {
- mapInfo := taskList[0]
- if mapInfo != nil {
- increaseRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...", len(taskList), taskList)
- }
- updatelock.Unlock()
- } else {
- time.Sleep(15 * time.Second)
- }
- }
- }
|