package main import ( "encoding/json" "fmt" "io/ioutil" "log" mu "mfw/util" "net" "net/http" qu "qfw/util" "strings" "sync" "time" ) type S_Province struct { P_Name string } type S_City struct { P_Name string C_Name string } type S_District struct { P_Name string C_Name string D_Name string } var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 data_mgo, qy_mgo *MongodbSim bid_mgo *MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //节点信息 coll_name, qy_coll_name, jy_coll_name string check_lock sync.Mutex //更新锁 check_thread int //线程数 UpdateTask *updateInfo //更新池 S_ProvinceDict map[string][]S_Province //省份-map S_CityDict map[string][]S_City //城市-map S_DistrictDict map[string][]S_District //区县-map //删除字段 unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1} udplock, getasklock sync.Mutex taskList []map[string]interface{} //监控相关 responselock sync.Mutex lastNodeResponse int64 ) // 初始化城市 func initCheckCity() { //初始化-城市配置 S_ProvinceDict = make(map[string][]S_Province, 0) S_CityDict = make(map[string][]S_City, 0) S_DistrictDict = make(map[string][]S_District, 0) q := map[string]interface{}{ "town_code": map[string]interface{}{ "$exists": 0, }, } sess := qy_mgo.GetMgoConn() defer qy_mgo.DestoryMongoConn(sess) it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter() total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("当前数量:", total) } district_code := qu.IntAll(tmp["district_code"]) city_code := qu.IntAll(tmp["city_code"]) if district_code > 0 { province := qu.ObjToString(tmp["province"]) city := qu.ObjToString(tmp["city"]) district := qu.ObjToString(tmp["district"]) data := S_District{province, city, district} if S_DistrictDict[district] == nil { S_DistrictDict[district] = []S_District{data} } else { arr := S_DistrictDict[district] arr = append(arr, data) S_DistrictDict[district] = arr } } else { if city_code > 0 { province := qu.ObjToString(tmp["province"]) city := qu.ObjToString(tmp["city"]) data := S_City{province, city} if S_CityDict[city] == nil { S_CityDict[city] = []S_City{data} } else { arr := S_CityDict[city] arr = append(arr, data) S_CityDict[city] = arr } } else { province := qu.ObjToString(tmp["province"]) data := S_Province{province} if S_ProvinceDict[province] == nil { S_ProvinceDict[province] = []S_Province{data} } else { arr := S_ProvinceDict[province] arr = append(arr, data) S_ProvinceDict[province] = arr } } } tmp = make(map[string]interface{}) } log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict))) } // mgo-配置等 func initMgo() { mconf = Sysconfig["mongodb"].(map[string]interface{}) log.Println(mconf) data_mgo = &MongodbSim{ MongodbAddr: mconf["addrName"].(string), DbName: mconf["dbName"].(string), Size: qu.IntAllDef(mconf["pool"], 10), } data_mgo.InitPool() qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{}) qy_mgo = &MongodbSim{ MongodbAddr: qy_mconf["qy_addrName"].(string), DbName: qy_mconf["qy_dbName"].(string), Size: qu.IntAllDef(qy_mconf["pool"], 10), UserName: qy_mconf["qy_username"].(string), Password: qy_mconf["qy_password"].(string), } qy_mgo.InitPool() bid_mgo = &MongodbSim{ MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081", DbName: "qfw", Size: 10, UserName: "zhengkun", Password: "zk@123123", } bid_mgo.InitPool() coll_name = mconf["collName"].(string) qy_coll_name = qy_mconf["qy_collName"].(string) jy_coll_name = Sysconfig["jy_collName"].(string) nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) check_thread = qu.IntAll(Sysconfig["check_thread"]) log.Println("mgo 等配置,加载完毕...") } // 初始化 func init() { qu.ReadConfig(&Sysconfig) //加载配置文件 log.Println(Sysconfig) if len(Sysconfig) == 0 { log.Fatal("读取配置文件失败", Sysconfig) } initMgo() //初始化mgo initCheckCity() //初始化城市 //更新池 UpdateTask = newUpdatePool() go UpdateTask.updateData() } func main() { lastNodeResponse = time.Now().Unix() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) go getRepeatTask() go checkMailJob() go lastUdpJob() lock := make(chan bool) <-lock } // 开始审查数据 func startCheckData(sid, eid string) { defer qu.Catch() q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } check_pool := make(chan bool, check_thread) check_wg := &sync.WaitGroup{} sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter() total, isRepair := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%10000 == 0 { log.Println("当前数量:", total, isRepair, tmp["_id"]) } update_id := map[string]interface{}{"_id": tmp["_id"]} check_pool <- true check_wg.Add(1) go func(tmp map[string]interface{}, update_id map[string]interface{}) { defer func() { <-check_pool check_wg.Done() }() //更新- update_check := make(map[string]interface{}, 0) //审查-城市-迁移 //getCheckDataCity(tmp, &update_check) //审查-金额-迁移 //getCheckDataBidamount(tmp, &update_check) //审查-分类-弃用 //getCheckDataCategory(tmp,&update_check) //审查-发布时间 getCheckDataPublishtime(tmp, &update_check) //审查-大模型与抽取 getCheckDataAI(tmp, &update_check) //最终计算是否清洗 update_dict := make(map[string]interface{}, 0) if len(update_check) > 0 { update_dict["$set"] = update_check } if len(update_dict) > 0 { //注意事项~更新key不能与删除key同时存在 isRepair++ UpdateTask.updatePool <- []map[string]interface{}{ update_id, update_dict, } } }(tmp, update_id) tmp = make(map[string]interface{}) } check_wg.Wait() log.Println("data_clean is over ", total, "~", isRepair) sendNextNode(sid, eid) } // udp监听 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) stype := qu.ObjToString(rep["stype"]) key := qu.ObjToString(rep["key"]) if stype == "monitor" { log.Println("收到监测......") udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) return } if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } else { lastNodeResponse = time.Now().Unix() udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) udplock.Lock() taskList = append(taskList, map[string]interface{}{ "sid": sid, "eid": eid, }) //插入任务 log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList) udplock.Unlock() } } case mu.OP_NOOP: //下个节点回应 log.Println("下节点回应:", string(data)) udptaskmap.Delete(string(data)) } } // 发送下阶段节点~ func sendNextNode(sid string, eid string) { //更新记录状态 updateProcessUdpIdsInfo(sid, eid) log.Println("判重任务完成...发送下节点udp...") for _, to := range nextNode { key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: qu.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } // 更新流程记录id段落 func updateProcessUdpIdsInfo(sid string, eid string) { query := map[string]interface{}{ "gtid": map[string]interface{}{ "$gte": sid, }, "lteid": map[string]interface{}{ "$lte": eid, }, } task_coll := "bidding_processing_ids" datas, _ := bid_mgo.Find(task_coll, query, nil, nil) if len(datas) > 0 { log.Println("开始更新流程段落记录~~", len(datas), "段") for _, v := range datas { up_id := BsonTOStringId(v["_id"]) if up_id != "" { update := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess": 4, "updatetime": time.Now().Unix(), }, } bid_mgo.UpdateById(task_coll, up_id, update) log.Println("流程段落记录~~更新完毕~", update) } } } else { log.Println("未查询到记录id段落~", query) } } func httpDo(detail string) (e error) { client := &http.Client{} req, err := http.NewRequest("POST", "http://127.0.0.1:9991/get", strings.NewReader("detail="+detail)) if err != nil { return err } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return err } log.Println("put ", string(body)) return nil } // 监听-获取-分发清洗任务 func getRepeatTask() { for { if len(taskList) > 0 { getasklock.Lock() len_list := len(taskList) if len_list > 1 { first_id := qu.ObjToString(taskList[0]["sid"]) end_id := qu.ObjToString(taskList[len_list-1]["eid"]) if first_id != "" && end_id != "" { taskList = taskList[len_list:] log.Println("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList) startCheckData(first_id, end_id) } else { log.Println("合并段落~错误~正常取段落~~~") mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) startCheckData(sid, eid) } } } else { mapInfo := taskList[0] if mapInfo != nil { taskList = taskList[1:] log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList) sid := qu.ObjToString(mapInfo["sid"]) eid := qu.ObjToString(mapInfo["eid"]) startCheckData(sid, eid) } } getasklock.Unlock() } else { time.Sleep(10 * time.Second) } } } func lastUdpJob() { for { responselock.Lock() if time.Now().Unix()-lastNodeResponse >= 1800 { lastNodeResponse = time.Now().Unix() //重置时间 sendErrMailApi("数据清洗~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入清洗增量流程...相关人员检查...")) } responselock.Unlock() time.Sleep(300 * time.Second) } }