|
@@ -41,20 +41,21 @@ var (
|
|
|
S_ProvinceDict map[string][]S_Province //省份-map
|
|
|
S_CityDict map[string][]S_City //城市-map
|
|
|
S_DistrictDict map[string][]S_District //区县-map
|
|
|
-
|
|
|
//删除字段
|
|
|
unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
|
|
|
udplock, getasklock sync.Mutex
|
|
|
taskList []map[string]interface{}
|
|
|
+ //监控相关
|
|
|
+ responselock sync.Mutex
|
|
|
+ lastNodeResponse int64
|
|
|
)
|
|
|
|
|
|
-//初始化城市
|
|
|
+// 初始化城市
|
|
|
func initCheckCity() {
|
|
|
//初始化-城市配置
|
|
|
S_ProvinceDict = make(map[string][]S_Province, 0)
|
|
|
S_CityDict = make(map[string][]S_City, 0)
|
|
|
S_DistrictDict = make(map[string][]S_District, 0)
|
|
|
-
|
|
|
q := map[string]interface{}{
|
|
|
"town_code": map[string]interface{}{
|
|
|
"$exists": 0,
|
|
@@ -111,7 +112,7 @@ func initCheckCity() {
|
|
|
log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict)))
|
|
|
}
|
|
|
|
|
|
-//mgo-配置等
|
|
|
+// mgo-配置等
|
|
|
func initMgo() {
|
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
|
log.Println(mconf)
|
|
@@ -133,7 +134,7 @@ func initMgo() {
|
|
|
qy_mgo.InitPool()
|
|
|
|
|
|
bid_mgo = &MongodbSim{
|
|
|
- MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
|
|
|
+ MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
|
|
|
DbName: "qfw",
|
|
|
Size: 10,
|
|
|
UserName: "zhengkun",
|
|
@@ -151,7 +152,7 @@ func initMgo() {
|
|
|
log.Println("mgo 等配置,加载完毕...")
|
|
|
}
|
|
|
|
|
|
-//初始化
|
|
|
+// 初始化
|
|
|
func init() {
|
|
|
qu.ReadConfig(&Sysconfig) //加载配置文件
|
|
|
log.Println(Sysconfig)
|
|
@@ -167,26 +168,21 @@ func init() {
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
|
+ lastNodeResponse = time.Now().Unix()
|
|
|
updport := Sysconfig["udpport"].(string)
|
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
udpclient.Listen(processUdpMsg)
|
|
|
log.Println("Udp服务监听", updport)
|
|
|
|
|
|
go getRepeatTask()
|
|
|
+ go checkMailJob()
|
|
|
+ go lastUdpJob()
|
|
|
|
|
|
lock := make(chan bool)
|
|
|
<-lock
|
|
|
}
|
|
|
|
|
|
-//临时校验
|
|
|
-func mainT() {
|
|
|
- sid := "12982d658fa2ac55ba96517d"
|
|
|
- eid := "92982d658fa2ac55ba96517e"
|
|
|
- testCheckData(sid, eid)
|
|
|
- time.Sleep(99999 * time.Hour)
|
|
|
-}
|
|
|
-
|
|
|
-//开始审查数据
|
|
|
+// 开始审查数据
|
|
|
func startCheckData(sid, eid string) {
|
|
|
defer qu.Catch()
|
|
|
q := map[string]interface{}{
|
|
@@ -215,26 +211,16 @@ func startCheckData(sid, eid string) {
|
|
|
}()
|
|
|
//更新-
|
|
|
update_check := make(map[string]interface{}, 0)
|
|
|
-
|
|
|
- //审查-城市
|
|
|
- //getCheckDataCity(tmp, &update_check)
|
|
|
- //审查-中标金额
|
|
|
- //getCheckDataBidamount(tmp, &update_check)
|
|
|
- //验证是否修复发布时间 - 对比开标日期,投标截止日期
|
|
|
- getCheckDataPublishtime(tmp, &update_check)
|
|
|
- //清洗分类~
|
|
|
- //is_category:= getCheckDataCategory(tmp,&update_check)
|
|
|
-
|
|
|
+ //getCheckDataCity(tmp, &update_check) //审查-城市
|
|
|
+ //getCheckDataBidamount(tmp, &update_check) //审查-中标金额
|
|
|
+ getCheckDataPublishtime(tmp, &update_check) //修复-发布时间
|
|
|
+ //getCheckDataCategory(tmp,&update_check) //修复分类
|
|
|
//最终计算是否清洗
|
|
|
update_dict := make(map[string]interface{}, 0)
|
|
|
if len(update_check) > 0 {
|
|
|
update_dict["$set"] = update_check
|
|
|
}
|
|
|
- //if is_category {
|
|
|
- // update_dict["$unset"] = unset_dict
|
|
|
- //}
|
|
|
- if len(update_dict) > 0 {
|
|
|
- //注意事项~更新key 不能与 删除key 同时存在
|
|
|
+ if len(update_dict) > 0 { //注意事项~更新key不能与删除key同时存在
|
|
|
isRepair++
|
|
|
UpdateTask.updatePool <- []map[string]interface{}{
|
|
|
update_id,
|
|
@@ -245,13 +231,11 @@ func startCheckData(sid, eid string) {
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
check_wg.Wait()
|
|
|
-
|
|
|
log.Println("data_clean is over ", total, "~", isRepair)
|
|
|
-
|
|
|
sendNextNode(sid, eid)
|
|
|
}
|
|
|
|
|
|
-//udp监听
|
|
|
+// udp监听
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
switch act {
|
|
|
case mu.OP_TYPE_DATA:
|
|
@@ -262,68 +246,83 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
} else {
|
|
|
sid, _ := rep["gtid"].(string)
|
|
|
eid, _ := rep["lteid"].(string)
|
|
|
+ stype := qu.ObjToString(rep["stype"])
|
|
|
+ key := qu.ObjToString(rep["key"])
|
|
|
+ if stype == "monitor" {
|
|
|
+ log.Println("收到监测......")
|
|
|
+ udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
+ return
|
|
|
+ }
|
|
|
if sid == "" || eid == "" {
|
|
|
log.Println("err", "sid=", sid, ",eid=", eid)
|
|
|
return
|
|
|
} else {
|
|
|
- go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
|
|
|
- //插入任务
|
|
|
+ lastNodeResponse = time.Now().Unix()
|
|
|
+ udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
udplock.Lock()
|
|
|
taskList = append(taskList, map[string]interface{}{
|
|
|
"sid": sid,
|
|
|
"eid": eid,
|
|
|
- })
|
|
|
+ }) //插入任务
|
|
|
log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
|
|
|
udplock.Unlock()
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
case mu.OP_NOOP: //下个节点回应
|
|
|
- log.Println("下节点回应", string(data))
|
|
|
+ log.Println("下节点回应:", string(data))
|
|
|
+ udptaskmap.Delete(string(data))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//发送下阶段节点~
|
|
|
+// 发送下阶段节点~
|
|
|
func sendNextNode(sid string, eid string) {
|
|
|
//更新记录状态
|
|
|
updateProcessUdpIdsInfo(sid, eid)
|
|
|
-
|
|
|
- for _, m := range nextNode {
|
|
|
+ log.Println("判重任务完成...发送下节点udp...")
|
|
|
+ for _, to := range nextNode {
|
|
|
+ key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
|
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
|
"gtid": sid,
|
|
|
"lteid": eid,
|
|
|
- "stype": qu.ObjToString(m["stype"]),
|
|
|
+ "stype": qu.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
})
|
|
|
- new_err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(m["addr"].(string)),
|
|
|
- Port: qu.IntAll(m["port"]),
|
|
|
- })
|
|
|
- if new_err != nil {
|
|
|
- log.Println(new_err)
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: qu.IntAll(to["port"]),
|
|
|
}
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
}
|
|
|
- log.Println("udp通知审查数据完成,通知下节点")
|
|
|
}
|
|
|
|
|
|
-//更新流程记录id段落
|
|
|
+// 更新流程记录id段落
|
|
|
func updateProcessUdpIdsInfo(sid string, eid string) {
|
|
|
query := map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
+ "gtid": map[string]interface{}{
|
|
|
+ "$gte": sid,
|
|
|
+ },
|
|
|
+ "lteid": map[string]interface{}{
|
|
|
+ "$lte": eid,
|
|
|
+ },
|
|
|
}
|
|
|
- log.Println("开始更新流程段落记录~~", query)
|
|
|
- data := bid_mgo.FindOne("bidding_processing_ids", query)
|
|
|
- if len(data) > 0 {
|
|
|
- up_id := BsonTOStringId(data["_id"])
|
|
|
- if up_id != "" {
|
|
|
- update := map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "dataprocess": 4,
|
|
|
- "updatetime": time.Now().Unix(),
|
|
|
- },
|
|
|
+ task_coll := "bidding_processing_ids"
|
|
|
+ datas, _ := bid_mgo.Find(task_coll, query, nil, nil)
|
|
|
+ if len(datas) > 0 {
|
|
|
+ log.Println("开始更新流程段落记录~~", len(datas), "段")
|
|
|
+ for _, v := range datas {
|
|
|
+ up_id := BsonTOStringId(v["_id"])
|
|
|
+ if up_id != "" {
|
|
|
+ update := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataprocess": 4,
|
|
|
+ "updatetime": time.Now().Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ bid_mgo.UpdateById(task_coll, up_id, update)
|
|
|
+ log.Println("流程段落记录~~更新完毕~", update)
|
|
|
}
|
|
|
- bid_mgo.UpdateById("bidding_processing_ids", up_id, update)
|
|
|
- log.Println("流程段落记录~~更新完毕~", update)
|
|
|
}
|
|
|
} else {
|
|
|
log.Println("未查询到记录id段落~", query)
|
|
@@ -351,76 +350,55 @@ func httpDo(detail string) (e error) {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-//监听-获取-分发清洗任务
|
|
|
+// 监听-获取-分发清洗任务
|
|
|
func getRepeatTask() {
|
|
|
for {
|
|
|
if len(taskList) > 0 {
|
|
|
getasklock.Lock()
|
|
|
- mapInfo := taskList[0]
|
|
|
- if mapInfo != nil {
|
|
|
- taskList = taskList[1:]
|
|
|
- log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
|
|
|
- sid := qu.ObjToString(mapInfo["sid"])
|
|
|
- eid := qu.ObjToString(mapInfo["eid"])
|
|
|
- startCheckData(sid, eid)
|
|
|
+ len_list := len(taskList)
|
|
|
+ if len_list > 1 {
|
|
|
+ first_id := qu.ObjToString(taskList[0]["sid"])
|
|
|
+ end_id := qu.ObjToString(taskList[len_list-1]["eid"])
|
|
|
+ if first_id != "" && end_id != "" {
|
|
|
+ taskList = taskList[len_list:]
|
|
|
+ log.Println("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
|
|
|
+ startCheckData(first_id, end_id)
|
|
|
+ } else {
|
|
|
+ log.Println("合并段落~错误~正常取段落~~~")
|
|
|
+ mapInfo := taskList[0]
|
|
|
+ if mapInfo != nil {
|
|
|
+ taskList = taskList[1:]
|
|
|
+ log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
|
|
|
+ sid := qu.ObjToString(mapInfo["sid"])
|
|
|
+ eid := qu.ObjToString(mapInfo["eid"])
|
|
|
+ startCheckData(sid, eid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ mapInfo := taskList[0]
|
|
|
+ if mapInfo != nil {
|
|
|
+ taskList = taskList[1:]
|
|
|
+ log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
|
|
|
+ sid := qu.ObjToString(mapInfo["sid"])
|
|
|
+ eid := qu.ObjToString(mapInfo["eid"])
|
|
|
+ startCheckData(sid, eid)
|
|
|
+ }
|
|
|
}
|
|
|
getasklock.Unlock()
|
|
|
} else {
|
|
|
- time.Sleep(15 * time.Second)
|
|
|
+ time.Sleep(10 * time.Second)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//测试版
|
|
|
-func testCheckData(sid, eid string) {
|
|
|
- defer qu.Catch()
|
|
|
- q := map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(sid),
|
|
|
- "$lte": StringTOBsonId(eid),
|
|
|
- },
|
|
|
- }
|
|
|
- check_pool := make(chan bool, check_thread)
|
|
|
- check_wg := &sync.WaitGroup{}
|
|
|
- sess := data_mgo.GetMgoConn()
|
|
|
- defer data_mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
- total, isRepair := 0, 0
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
|
- if total%10000 == 0 {
|
|
|
- log.Println("当前数量:", total, isRepair, tmp["_id"])
|
|
|
+func lastUdpJob() {
|
|
|
+ for {
|
|
|
+ responselock.Lock()
|
|
|
+ if time.Now().Unix()-lastNodeResponse >= 1800 {
|
|
|
+ lastNodeResponse = time.Now().Unix() //重置时间
|
|
|
+ sendErrMailApi("数据清洗~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入清洗增量流程...相关人员检查..."))
|
|
|
}
|
|
|
- update_id := map[string]interface{}{"_id": tmp["_id"]}
|
|
|
- check_pool <- true
|
|
|
- check_wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}, update_id map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-check_pool
|
|
|
- check_wg.Done()
|
|
|
- }()
|
|
|
- //更新-
|
|
|
- update_check := make(map[string]interface{}, 0)
|
|
|
-
|
|
|
- //审查-城市
|
|
|
- getCheckDataCity(tmp, &update_check)
|
|
|
- //最终计算是否清洗
|
|
|
- update_dict := make(map[string]interface{}, 0)
|
|
|
- if len(update_check) > 0 {
|
|
|
- update_check["is_standard"] = 1
|
|
|
- update_dict["$set"] = update_check
|
|
|
- }
|
|
|
- if len(update_dict) > 0 {
|
|
|
- //注意事项~更新key 不能与 删除key 同时存在
|
|
|
- isRepair++
|
|
|
- UpdateTask.updatePool <- []map[string]interface{}{
|
|
|
- update_id,
|
|
|
- update_dict,
|
|
|
- }
|
|
|
- }
|
|
|
- }(tmp, update_id)
|
|
|
- tmp = make(map[string]interface{})
|
|
|
+ responselock.Unlock()
|
|
|
+ time.Sleep(300 * time.Second)
|
|
|
}
|
|
|
- check_wg.Wait()
|
|
|
-
|
|
|
- log.Println("data_clean is over ", total, "~", isRepair)
|
|
|
}
|