123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- mu "mfw/util"
- "net"
- "net/http"
- qu "qfw/util"
- "strings"
- "sync"
- "time"
- )
- type S_Province struct {
- P_Name string
- }
- type S_City struct {
- P_Name string
- C_Name string
- }
- type S_District struct {
- P_Name string
- C_Name string
- D_Name string
- }
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- data_mgo, qy_mgo *MongodbSim
- bid_mgo *MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //节点信息
- coll_name, qy_coll_name, jy_coll_name string
- check_lock sync.Mutex //更新锁
- check_thread int //线程数
- UpdateTask *updateInfo //更新池
- S_ProvinceDict map[string][]S_Province //省份-map
- S_CityDict map[string][]S_City //城市-map
- S_DistrictDict map[string][]S_District //区县-map
- //删除字段
- unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
- udplock, getasklock sync.Mutex
- taskList []map[string]interface{}
- //监控相关
- responselock sync.Mutex
- lastNodeResponse int64
- )
- // 初始化城市
- func initCheckCity() {
- //初始化-城市配置
- S_ProvinceDict = make(map[string][]S_Province, 0)
- S_CityDict = make(map[string][]S_City, 0)
- S_DistrictDict = make(map[string][]S_District, 0)
- q := map[string]interface{}{
- "town_code": map[string]interface{}{
- "$exists": 0,
- },
- }
- sess := qy_mgo.GetMgoConn()
- defer qy_mgo.DestoryMongoConn(sess)
- it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
- total := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("当前数量:", total)
- }
- district_code := qu.IntAll(tmp["district_code"])
- city_code := qu.IntAll(tmp["city_code"])
- if district_code > 0 {
- province := qu.ObjToString(tmp["province"])
- city := qu.ObjToString(tmp["city"])
- district := qu.ObjToString(tmp["district"])
- data := S_District{province, city, district}
- if S_DistrictDict[district] == nil {
- S_DistrictDict[district] = []S_District{data}
- } else {
- arr := S_DistrictDict[district]
- arr = append(arr, data)
- S_DistrictDict[district] = arr
- }
- } else {
- if city_code > 0 {
- province := qu.ObjToString(tmp["province"])
- city := qu.ObjToString(tmp["city"])
- data := S_City{province, city}
- if S_CityDict[city] == nil {
- S_CityDict[city] = []S_City{data}
- } else {
- arr := S_CityDict[city]
- arr = append(arr, data)
- S_CityDict[city] = arr
- }
- } else {
- province := qu.ObjToString(tmp["province"])
- data := S_Province{province}
- if S_ProvinceDict[province] == nil {
- S_ProvinceDict[province] = []S_Province{data}
- } else {
- arr := S_ProvinceDict[province]
- arr = append(arr, data)
- S_ProvinceDict[province] = arr
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict)))
- }
- // mgo-配置等
- func initMgo() {
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- log.Println(mconf)
- data_mgo = &MongodbSim{
- MongodbAddr: mconf["addrName"].(string),
- DbName: mconf["dbName"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- }
- data_mgo.InitPool()
- qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
- qy_mgo = &MongodbSim{
- MongodbAddr: qy_mconf["qy_addrName"].(string),
- DbName: qy_mconf["qy_dbName"].(string),
- Size: qu.IntAllDef(qy_mconf["pool"], 10),
- UserName: qy_mconf["qy_username"].(string),
- Password: qy_mconf["qy_password"].(string),
- }
- qy_mgo.InitPool()
- bid_mgo = &MongodbSim{
- MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
- DbName: "qfw",
- Size: 10,
- UserName: "zhengkun",
- Password: "zk@123123",
- }
- bid_mgo.InitPool()
- coll_name = mconf["collName"].(string)
- qy_coll_name = qy_mconf["qy_collName"].(string)
- jy_coll_name = Sysconfig["jy_collName"].(string)
- nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- check_thread = qu.IntAll(Sysconfig["check_thread"])
- log.Println("mgo 等配置,加载完毕...")
- }
- // 初始化
- func init() {
- qu.ReadConfig(&Sysconfig) //加载配置文件
- log.Println(Sysconfig)
- if len(Sysconfig) == 0 {
- log.Fatal("读取配置文件失败", Sysconfig)
- }
- initMgo() //初始化mgo
- initCheckCity() //初始化城市
- //更新池
- UpdateTask = newUpdatePool()
- go UpdateTask.updateData()
- }
- func main() {
- lastNodeResponse = time.Now().Unix()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- go getRepeatTask()
- go checkMailJob()
- go lastUdpJob()
- lock := make(chan bool)
- <-lock
- }
- // 开始审查数据
- func startCheckData(sid, eid string) {
- defer qu.Catch()
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(sid),
- "$lte": StringTOBsonId(eid),
- },
- }
- check_pool := make(chan bool, check_thread)
- check_wg := &sync.WaitGroup{}
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
- total, isRepair := 0, 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%10000 == 0 {
- log.Println("当前数量:", total, isRepair, tmp["_id"])
- }
- update_id := map[string]interface{}{"_id": tmp["_id"]}
- check_pool <- true
- check_wg.Add(1)
- go func(tmp map[string]interface{}, update_id map[string]interface{}) {
- defer func() {
- <-check_pool
- check_wg.Done()
- }()
- //更新-
- update_check := make(map[string]interface{}, 0)
- //审查-城市-迁移
- //getCheckDataCity(tmp, &update_check)
- //审查-金额-迁移
- //getCheckDataBidamount(tmp, &update_check)
- //审查-分类-弃用
- //getCheckDataCategory(tmp,&update_check)
- //审查-发布时间
- getCheckDataPublishtime(tmp, &update_check)
- //审查-大模型与抽取
- getCheckDataAI(tmp, &update_check)
- //最终计算是否清洗
- update_dict := make(map[string]interface{}, 0)
- if len(update_check) > 0 {
- update_dict["$set"] = update_check
- }
- if len(update_dict) > 0 { //注意事项~更新key不能与删除key同时存在
- isRepair++
- UpdateTask.updatePool <- []map[string]interface{}{
- update_id,
- update_dict,
- }
- }
- }(tmp, update_id)
- tmp = make(map[string]interface{})
- }
- check_wg.Wait()
- log.Println("data_clean is over ", total, "~", isRepair)
- sendNextNode(sid, eid)
- }
- // udp监听
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- stype := qu.ObjToString(rep["stype"])
- key := qu.ObjToString(rep["key"])
- if stype == "monitor" {
- log.Println("收到监测......")
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- return
- }
- if sid == "" || eid == "" {
- log.Println("err", "sid=", sid, ",eid=", eid)
- return
- } else {
- lastNodeResponse = time.Now().Unix()
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- udplock.Lock()
- taskList = append(taskList, map[string]interface{}{
- "sid": sid,
- "eid": eid,
- }) //插入任务
- log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
- udplock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("下节点回应:", string(data))
- udptaskmap.Delete(string(data))
- }
- }
- // 发送下阶段节点~
- func sendNextNode(sid string, eid string) {
- //更新记录状态
- updateProcessUdpIdsInfo(sid, eid)
- log.Println("判重任务完成...发送下节点udp...")
- for _, to := range nextNode {
- key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: qu.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- // 更新流程记录id段落
- func updateProcessUdpIdsInfo(sid string, eid string) {
- query := map[string]interface{}{
- "gtid": map[string]interface{}{
- "$gte": sid,
- },
- "lteid": map[string]interface{}{
- "$lte": eid,
- },
- }
- task_coll := "bidding_processing_ids"
- datas, _ := bid_mgo.Find(task_coll, query, nil, nil)
- if len(datas) > 0 {
- log.Println("开始更新流程段落记录~~", len(datas), "段")
- for _, v := range datas {
- up_id := BsonTOStringId(v["_id"])
- if up_id != "" {
- update := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 4,
- "updatetime": time.Now().Unix(),
- },
- }
- bid_mgo.UpdateById(task_coll, up_id, update)
- log.Println("流程段落记录~~更新完毕~", update)
- }
- }
- } else {
- log.Println("未查询到记录id段落~", query)
- }
- }
- func httpDo(detail string) (e error) {
- client := &http.Client{}
- req, err := http.NewRequest("POST", "http://127.0.0.1:9991/get",
- strings.NewReader("detail="+detail))
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- log.Println("put ", string(body))
- return nil
- }
- // 监听-获取-分发清洗任务
- func getRepeatTask() {
- for {
- if len(taskList) > 0 {
- getasklock.Lock()
- len_list := len(taskList)
- if len_list > 1 {
- first_id := qu.ObjToString(taskList[0]["sid"])
- end_id := qu.ObjToString(taskList[len_list-1]["eid"])
- if first_id != "" && end_id != "" {
- taskList = taskList[len_list:]
- log.Println("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
- startCheckData(first_id, end_id)
- } else {
- log.Println("合并段落~错误~正常取段落~~~")
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- startCheckData(sid, eid)
- }
- }
- } else {
- mapInfo := taskList[0]
- if mapInfo != nil {
- taskList = taskList[1:]
- log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
- sid := qu.ObjToString(mapInfo["sid"])
- eid := qu.ObjToString(mapInfo["eid"])
- startCheckData(sid, eid)
- }
- }
- getasklock.Unlock()
- } else {
- time.Sleep(10 * time.Second)
- }
- }
- }
- func lastUdpJob() {
- for {
- responselock.Lock()
- if time.Now().Unix()-lastNodeResponse >= 1800 {
- lastNodeResponse = time.Now().Unix() //重置时间
- sendErrMailApi("数据清洗~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入清洗增量流程...相关人员检查..."))
- }
- responselock.Unlock()
- time.Sleep(300 * time.Second)
- }
- }
|