|
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "log"
- mu "mfw/util"
- "net"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "regexp"
- "sync"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- data_mgo *MongodbSim //mongodb操作对象
- task_mgo *MongodbSim //mongodb操作对象
- task_collName string
- extract string
- extract_back string
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- dupdays = 7 //初始化判重范围
- DM *datamap //
- Update *updateInfo
- //正则筛选相关
- FilterRegTitle = regexp.MustCompile("^_$")
- FilterRegTitle_0 = regexp.MustCompile("^_$")
- FilterRegTitle_1 = regexp.MustCompile("^_$")
- FilterRegTitle_2 = regexp.MustCompile("^_$")
- threadNum int //线程数量
- SiteMap map[string]map[string]interface{} //站点map
- LowHeavy bool //低质量数据判重
- TimingTask bool //是否定时任务
- timingSpanDay int64 //时间跨度
- timingPubScope int64 //发布时间周期
- gtid,lastid,sec_gtid,sec_lteid string //命令输入
- lteid string //历史增量属性
- IsFull bool //是否全量
- updatelock sync.Mutex //锁4
- numberlock sync.Mutex //锁4
- userName,passWord string //mongo -用户密码
- taskList []map[string]interface{} //任务池
- )
- func init() {
- flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
- flag.StringVar(>id, "gtid", "", "历史增量的起始id") //历史
- flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
- flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
- flag.Parse()
- qu.ReadConfig(&Sysconfig)
- userName = qu.ObjToString(Sysconfig["userName"])
- passWord = qu.ObjToString(Sysconfig["passWord"])
- log.Println("集群用户密码:",userName,passWord)
- task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
- task_mgo = &MongodbSim{
- MongodbAddr: task_mconf["task_addrName"].(string),
- DbName: task_mconf["task_dbName"].(string),
- Size: util.IntAllDef(task_mconf["task_pool"], 10),
- UserName: userName,
- Password: passWord,
- }
- task_mgo.InitPool()
- task_collName = task_mconf["task_collName"].(string)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- data_mgo = &MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: util.IntAllDef(mconf["pool"], 10),
- }
- data_mgo.InitPool()
- extract = mconf["extract"].(string)
- extract_back = mconf["extract_back"].(string)
- dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
- //加载数据
- DM = NewDatamap(dupdays, lastid)
- //更新池
- Update = newUpdatePool()
- go Update.updateData()
- FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
- FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
- FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
- FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
- threadNum = util.IntAllDef(Sysconfig["threads"], 1)
- LowHeavy = Sysconfig["lowHeavy"].(bool)
- TimingTask = Sysconfig["timingTask"].(bool)
- timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
- timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
- //站点配置
- site := mconf["site"].(map[string]interface{})
- SiteMap = make(map[string]map[string]interface{}, 0)
- start := int(time.Now().Unix())
- sess_site := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess_site)
- res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
- for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
- data_map := map[string]interface{}{
- "area": util.ObjToString(site_dict["area"]),
- "city": util.ObjToString(site_dict["city"]),
- "district": util.ObjToString(site_dict["district"]),
- "sitetype": util.ObjToString(site_dict["sitetype"]),
- "level": util.ObjToString(site_dict["level"]),
- "weight": util.ObjToString(site_dict["weight"]),
- }
- SiteMap[util.ObjToString(site_dict["site"])] = data_map
- }
- log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
- }
- //udp接收
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- //插入任务-判断任务-是否存在
- updatelock.Lock()
- taskList = append(taskList,mapInfo)
- log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
- updatelock.Unlock()
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //监听-获取-分发判重任务
- func getRepeatTask() {
- for {
- if len(taskList)>0 {
- updatelock.Lock()
- mapInfo := taskList[0]
- if mapInfo != nil {
- increaseRepeat(mapInfo) //判重方法
- }
- taskList = taskList[1:]
- log.Println("此段落结束当前任务池...",len(taskList),taskList)
- updatelock.Unlock()
- }else {
- time.Sleep(15 * time.Second)
- }
- }
- }
- //主函数
- func main() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- if TimingTask {
- log.Println("正常历史部署")
- go historyRepeat()
- }else {
- //IsFull = true
- if !IsFull {//正常增量
- log.Println("正常增量部署,监听任务")
- go getRepeatTask()
- //新增调试
- //sid := "1fffffffffffffffffffffff"
- //eid := "9fffffffffffffffffffffff"
- //increaseRepeat(map[string]interface{}{
- // "gtid":sid,
- // "lteid":eid,
- //})
- }else {
- sid := "1fffffffffffffffffffffff"
- eid := "9fffffffffffffffffffffff"
- fullRepeat(sid,eid)
- }
- }
- time.Sleep(99999 * time.Hour)
- }
|