|
@@ -1,157 +1,12 @@
|
|
package main
|
|
package main
|
|
|
|
|
|
import (
|
|
import (
|
|
- "encoding/json"
|
|
|
|
- "fmt"
|
|
|
|
- "io/ioutil"
|
|
|
|
"log"
|
|
"log"
|
|
mu "mfw/util"
|
|
mu "mfw/util"
|
|
- "net"
|
|
|
|
- "net/http"
|
|
|
|
qu "qfw/util"
|
|
qu "qfw/util"
|
|
- "strings"
|
|
|
|
- "sync"
|
|
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
-type S_Province struct {
|
|
|
|
- P_Name string
|
|
|
|
-}
|
|
|
|
-type S_City struct {
|
|
|
|
- P_Name string
|
|
|
|
- C_Name string
|
|
|
|
-}
|
|
|
|
-type S_District struct {
|
|
|
|
- P_Name string
|
|
|
|
- C_Name string
|
|
|
|
- D_Name string
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-var (
|
|
|
|
- Sysconfig map[string]interface{} //配置文件
|
|
|
|
- mconf map[string]interface{} //mongodb配置信息
|
|
|
|
- data_mgo, qy_mgo *MongodbSim
|
|
|
|
- bid_mgo *MongodbSim //mongodb操作对象
|
|
|
|
- udpclient mu.UdpClient //udp对象
|
|
|
|
- nextNode []map[string]interface{} //节点信息
|
|
|
|
- coll_name, qy_coll_name, jy_coll_name string
|
|
|
|
- check_lock sync.Mutex //更新锁
|
|
|
|
- check_thread int //线程数
|
|
|
|
- UpdateTask *updateInfo //更新池
|
|
|
|
- S_ProvinceDict map[string][]S_Province //省份-map
|
|
|
|
- S_CityDict map[string][]S_City //城市-map
|
|
|
|
- S_DistrictDict map[string][]S_District //区县-map
|
|
|
|
- //删除字段
|
|
|
|
- unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
|
|
|
|
- udplock, getasklock sync.Mutex
|
|
|
|
- taskList []map[string]interface{}
|
|
|
|
- //监控相关
|
|
|
|
- responselock sync.Mutex
|
|
|
|
- lastNodeResponse int64
|
|
|
|
-)
|
|
|
|
-
|
|
|
|
-// 初始化城市
|
|
|
|
-func initCheckCity() {
|
|
|
|
- //初始化-城市配置
|
|
|
|
- S_ProvinceDict = make(map[string][]S_Province, 0)
|
|
|
|
- S_CityDict = make(map[string][]S_City, 0)
|
|
|
|
- S_DistrictDict = make(map[string][]S_District, 0)
|
|
|
|
- q := map[string]interface{}{
|
|
|
|
- "town_code": map[string]interface{}{
|
|
|
|
- "$exists": 0,
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- sess := qy_mgo.GetMgoConn()
|
|
|
|
- defer qy_mgo.DestoryMongoConn(sess)
|
|
|
|
- it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
|
|
|
|
- total := 0
|
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
|
|
- if total%1000 == 0 {
|
|
|
|
- log.Println("当前数量:", total)
|
|
|
|
- }
|
|
|
|
- district_code := qu.IntAll(tmp["district_code"])
|
|
|
|
- city_code := qu.IntAll(tmp["city_code"])
|
|
|
|
- if district_code > 0 {
|
|
|
|
- province := qu.ObjToString(tmp["province"])
|
|
|
|
- city := qu.ObjToString(tmp["city"])
|
|
|
|
- district := qu.ObjToString(tmp["district"])
|
|
|
|
- data := S_District{province, city, district}
|
|
|
|
- if S_DistrictDict[district] == nil {
|
|
|
|
- S_DistrictDict[district] = []S_District{data}
|
|
|
|
- } else {
|
|
|
|
- arr := S_DistrictDict[district]
|
|
|
|
- arr = append(arr, data)
|
|
|
|
- S_DistrictDict[district] = arr
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- if city_code > 0 {
|
|
|
|
- province := qu.ObjToString(tmp["province"])
|
|
|
|
- city := qu.ObjToString(tmp["city"])
|
|
|
|
- data := S_City{province, city}
|
|
|
|
- if S_CityDict[city] == nil {
|
|
|
|
- S_CityDict[city] = []S_City{data}
|
|
|
|
- } else {
|
|
|
|
- arr := S_CityDict[city]
|
|
|
|
- arr = append(arr, data)
|
|
|
|
- S_CityDict[city] = arr
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- province := qu.ObjToString(tmp["province"])
|
|
|
|
- data := S_Province{province}
|
|
|
|
- if S_ProvinceDict[province] == nil {
|
|
|
|
- S_ProvinceDict[province] = []S_Province{data}
|
|
|
|
- } else {
|
|
|
|
- arr := S_ProvinceDict[province]
|
|
|
|
- arr = append(arr, data)
|
|
|
|
- S_ProvinceDict[province] = arr
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- tmp = make(map[string]interface{})
|
|
|
|
- }
|
|
|
|
- log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict)))
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// mgo-配置等
|
|
|
|
-func initMgo() {
|
|
|
|
- mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
|
|
- log.Println(mconf)
|
|
|
|
- data_mgo = &MongodbSim{
|
|
|
|
- MongodbAddr: mconf["addrName"].(string),
|
|
|
|
- DbName: mconf["dbName"].(string),
|
|
|
|
- Size: qu.IntAllDef(mconf["pool"], 10),
|
|
|
|
- }
|
|
|
|
- data_mgo.InitPool()
|
|
|
|
-
|
|
|
|
- qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
|
|
|
|
- qy_mgo = &MongodbSim{
|
|
|
|
- MongodbAddr: qy_mconf["qy_addrName"].(string),
|
|
|
|
- DbName: qy_mconf["qy_dbName"].(string),
|
|
|
|
- Size: qu.IntAllDef(qy_mconf["pool"], 10),
|
|
|
|
- UserName: qy_mconf["qy_username"].(string),
|
|
|
|
- Password: qy_mconf["qy_password"].(string),
|
|
|
|
- }
|
|
|
|
- qy_mgo.InitPool()
|
|
|
|
-
|
|
|
|
- bid_mgo = &MongodbSim{
|
|
|
|
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
|
|
|
|
- DbName: "qfw",
|
|
|
|
- Size: 10,
|
|
|
|
- UserName: "zhengkun",
|
|
|
|
- Password: "zk@123123",
|
|
|
|
- }
|
|
|
|
- bid_mgo.InitPool()
|
|
|
|
-
|
|
|
|
- coll_name = mconf["collName"].(string)
|
|
|
|
- qy_coll_name = qy_mconf["qy_collName"].(string)
|
|
|
|
-
|
|
|
|
- jy_coll_name = Sysconfig["jy_collName"].(string)
|
|
|
|
- nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
|
|
- check_thread = qu.IntAll(Sysconfig["check_thread"])
|
|
|
|
-
|
|
|
|
- log.Println("mgo 等配置,加载完毕...")
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
// 初始化
|
|
// 初始化
|
|
func init() {
|
|
func init() {
|
|
qu.ReadConfig(&Sysconfig) //加载配置文件
|
|
qu.ReadConfig(&Sysconfig) //加载配置文件
|
|
@@ -159,9 +14,7 @@ func init() {
|
|
if len(Sysconfig) == 0 {
|
|
if len(Sysconfig) == 0 {
|
|
log.Fatal("读取配置文件失败", Sysconfig)
|
|
log.Fatal("读取配置文件失败", Sysconfig)
|
|
}
|
|
}
|
|
- initMgo() //初始化mgo
|
|
|
|
- initCheckCity() //初始化城市
|
|
|
|
-
|
|
|
|
|
|
+ initMgo() //初始化mgo
|
|
//更新池
|
|
//更新池
|
|
UpdateTask = newUpdatePool()
|
|
UpdateTask = newUpdatePool()
|
|
go UpdateTask.updateData()
|
|
go UpdateTask.updateData()
|
|
@@ -170,6 +23,9 @@ func init() {
|
|
func main() {
|
|
func main() {
|
|
lastNodeResponse = time.Now().Unix()
|
|
lastNodeResponse = time.Now().Unix()
|
|
updport := Sysconfig["udpport"].(string)
|
|
updport := Sysconfig["udpport"].(string)
|
|
|
|
+ if Sysconfig["update_ai"].(bool) {
|
|
|
|
+ updport = Sysconfig["udpport_ai"].(string)
|
|
|
|
+ }
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient.Listen(processUdpMsg)
|
|
udpclient.Listen(processUdpMsg)
|
|
log.Println("Udp服务监听", updport)
|
|
log.Println("Udp服务监听", updport)
|
|
@@ -182,232 +38,9 @@ func main() {
|
|
<-lock
|
|
<-lock
|
|
}
|
|
}
|
|
|
|
|
|
-// 开始审查数据
|
|
|
|
-func startCheckData(sid, eid string) {
|
|
|
|
- defer qu.Catch()
|
|
|
|
- q := map[string]interface{}{
|
|
|
|
- "_id": map[string]interface{}{
|
|
|
|
- "$gt": StringTOBsonId(sid),
|
|
|
|
- "$lte": StringTOBsonId(eid),
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- check_pool := make(chan bool, check_thread)
|
|
|
|
- check_wg := &sync.WaitGroup{}
|
|
|
|
- sess := data_mgo.GetMgoConn()
|
|
|
|
- defer data_mgo.DestoryMongoConn(sess)
|
|
|
|
- it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
|
- total, isRepair := 0, 0
|
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
|
|
- if total%10000 == 0 {
|
|
|
|
- log.Println("当前数量:", total, isRepair, tmp["_id"])
|
|
|
|
- }
|
|
|
|
- update_id := map[string]interface{}{"_id": tmp["_id"]}
|
|
|
|
- check_pool <- true
|
|
|
|
- check_wg.Add(1)
|
|
|
|
- go func(tmp map[string]interface{}, update_id map[string]interface{}) {
|
|
|
|
- defer func() {
|
|
|
|
- <-check_pool
|
|
|
|
- check_wg.Done()
|
|
|
|
- }()
|
|
|
|
- //更新-
|
|
|
|
- update_check := make(map[string]interface{}, 0)
|
|
|
|
-
|
|
|
|
- //审查-城市-迁移
|
|
|
|
- //getCheckDataCity(tmp, &update_check)
|
|
|
|
- //审查-金额-迁移
|
|
|
|
- //getCheckDataBidamount(tmp, &update_check)
|
|
|
|
- //审查-分类-弃用
|
|
|
|
- //getCheckDataCategory(tmp,&update_check)
|
|
|
|
-
|
|
|
|
- //审查-发布时间
|
|
|
|
- getCheckDataPublishtime(tmp, &update_check)
|
|
|
|
- //审查-大模型与抽取
|
|
|
|
- getCheckDataAI(tmp, &update_check)
|
|
|
|
-
|
|
|
|
- //最终计算是否清洗
|
|
|
|
- update_dict := make(map[string]interface{}, 0)
|
|
|
|
- if len(update_check) > 0 {
|
|
|
|
- update_dict["$set"] = update_check
|
|
|
|
- }
|
|
|
|
- if len(update_dict) > 0 { //注意事项~更新key不能与删除key同时存在
|
|
|
|
- isRepair++
|
|
|
|
- UpdateTask.updatePool <- []map[string]interface{}{
|
|
|
|
- update_id,
|
|
|
|
- update_dict,
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }(tmp, update_id)
|
|
|
|
- tmp = make(map[string]interface{})
|
|
|
|
- }
|
|
|
|
- check_wg.Wait()
|
|
|
|
- log.Println("data_clean is over ", total, "~", isRepair)
|
|
|
|
- sendNextNode(sid, eid)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// udp监听
|
|
|
|
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
|
- switch act {
|
|
|
|
- case mu.OP_TYPE_DATA:
|
|
|
|
- var rep map[string]interface{}
|
|
|
|
- err := json.Unmarshal(data, &rep)
|
|
|
|
- if err != nil {
|
|
|
|
- log.Println(err)
|
|
|
|
- } else {
|
|
|
|
- sid, _ := rep["gtid"].(string)
|
|
|
|
- eid, _ := rep["lteid"].(string)
|
|
|
|
- stype := qu.ObjToString(rep["stype"])
|
|
|
|
- key := qu.ObjToString(rep["key"])
|
|
|
|
- if stype == "monitor" {
|
|
|
|
- log.Println("收到监测......")
|
|
|
|
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if sid == "" || eid == "" {
|
|
|
|
- log.Println("err", "sid=", sid, ",eid=", eid)
|
|
|
|
- return
|
|
|
|
- } else {
|
|
|
|
- lastNodeResponse = time.Now().Unix()
|
|
|
|
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
|
|
|
|
- udplock.Lock()
|
|
|
|
- taskList = append(taskList, map[string]interface{}{
|
|
|
|
- "sid": sid,
|
|
|
|
- "eid": eid,
|
|
|
|
- }) //插入任务
|
|
|
|
- log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
|
|
|
|
- udplock.Unlock()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- case mu.OP_NOOP: //下个节点回应
|
|
|
|
- log.Println("下节点回应:", string(data))
|
|
|
|
- udptaskmap.Delete(string(data))
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// 发送下阶段节点~
|
|
|
|
-func sendNextNode(sid string, eid string) {
|
|
|
|
- //更新记录状态
|
|
|
|
- updateProcessUdpIdsInfo(sid, eid)
|
|
|
|
- log.Println("判重任务完成...发送下节点udp...")
|
|
|
|
- for _, to := range nextNode {
|
|
|
|
- key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
|
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
|
- "gtid": sid,
|
|
|
|
- "lteid": eid,
|
|
|
|
- "stype": qu.ObjToString(to["stype"]),
|
|
|
|
- "key": key,
|
|
|
|
- })
|
|
|
|
- addr := &net.UDPAddr{
|
|
|
|
- IP: net.ParseIP(to["addr"].(string)),
|
|
|
|
- Port: qu.IntAll(to["port"]),
|
|
|
|
- }
|
|
|
|
- node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
|
- udptaskmap.Store(key, node)
|
|
|
|
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// 更新流程记录id段落
|
|
|
|
-func updateProcessUdpIdsInfo(sid string, eid string) {
|
|
|
|
- query := map[string]interface{}{
|
|
|
|
- "gtid": map[string]interface{}{
|
|
|
|
- "$gte": sid,
|
|
|
|
- },
|
|
|
|
- "lteid": map[string]interface{}{
|
|
|
|
- "$lte": eid,
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- task_coll := "bidding_processing_ids"
|
|
|
|
- datas, _ := bid_mgo.Find(task_coll, query, nil, nil)
|
|
|
|
- if len(datas) > 0 {
|
|
|
|
- log.Println("开始更新流程段落记录~~", len(datas), "段")
|
|
|
|
- for _, v := range datas {
|
|
|
|
- up_id := BsonTOStringId(v["_id"])
|
|
|
|
- if up_id != "" {
|
|
|
|
- update := map[string]interface{}{
|
|
|
|
- "$set": map[string]interface{}{
|
|
|
|
- "dataprocess": 4,
|
|
|
|
- "updatetime": time.Now().Unix(),
|
|
|
|
- },
|
|
|
|
- }
|
|
|
|
- bid_mgo.UpdateById(task_coll, up_id, update)
|
|
|
|
- log.Println("流程段落记录~~更新完毕~", update)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- log.Println("未查询到记录id段落~", query)
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func httpDo(detail string) (e error) {
|
|
|
|
- client := &http.Client{}
|
|
|
|
- req, err := http.NewRequest("POST", "http://127.0.0.1:9991/get",
|
|
|
|
- strings.NewReader("detail="+detail))
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
|
|
- resp, err := client.Do(req)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- defer resp.Body.Close()
|
|
|
|
- body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- log.Println("put ", string(body))
|
|
|
|
- return nil
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// 监听-获取-分发清洗任务
|
|
|
|
-func getRepeatTask() {
|
|
|
|
- for {
|
|
|
|
- if len(taskList) > 0 {
|
|
|
|
- getasklock.Lock()
|
|
|
|
- len_list := len(taskList)
|
|
|
|
- if len_list > 1 {
|
|
|
|
- first_id := qu.ObjToString(taskList[0]["sid"])
|
|
|
|
- end_id := qu.ObjToString(taskList[len_list-1]["eid"])
|
|
|
|
- if first_id != "" && end_id != "" {
|
|
|
|
- taskList = taskList[len_list:]
|
|
|
|
- log.Println("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
|
|
|
|
- startCheckData(first_id, end_id)
|
|
|
|
- } else {
|
|
|
|
- log.Println("合并段落~错误~正常取段落~~~")
|
|
|
|
- mapInfo := taskList[0]
|
|
|
|
- if mapInfo != nil {
|
|
|
|
- taskList = taskList[1:]
|
|
|
|
- log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
|
|
|
|
- sid := qu.ObjToString(mapInfo["sid"])
|
|
|
|
- eid := qu.ObjToString(mapInfo["eid"])
|
|
|
|
- startCheckData(sid, eid)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- mapInfo := taskList[0]
|
|
|
|
- if mapInfo != nil {
|
|
|
|
- taskList = taskList[1:]
|
|
|
|
- log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
|
|
|
|
- sid := qu.ObjToString(mapInfo["sid"])
|
|
|
|
- eid := qu.ObjToString(mapInfo["eid"])
|
|
|
|
- startCheckData(sid, eid)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- getasklock.Unlock()
|
|
|
|
- } else {
|
|
|
|
- time.Sleep(10 * time.Second)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-func lastUdpJob() {
|
|
|
|
- for {
|
|
|
|
- responselock.Lock()
|
|
|
|
- if time.Now().Unix()-lastNodeResponse >= 1800 {
|
|
|
|
- lastNodeResponse = time.Now().Unix() //重置时间
|
|
|
|
- sendErrMailApi("数据清洗~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入清洗增量流程...相关人员检查..."))
|
|
|
|
- }
|
|
|
|
- responselock.Unlock()
|
|
|
|
- time.Sleep(300 * time.Second)
|
|
|
|
- }
|
|
|
|
|
|
+func test() {
|
|
|
|
+ log.Println("测试修正...")
|
|
|
|
+ startCheckData("100000000000000000000000", "900000000000000000000000")
|
|
|
|
+ lock := make(chan bool)
|
|
|
|
+ <-lock
|
|
}
|
|
}
|