123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- package main
- import (
- "encoding/json"
- "fmt"
- "log"
- mu "mfw/util"
- "net"
- qu "qfw/util"
- "sync"
- "time"
- )
- type Province struct {
- P_Name string
- }
- type City struct {
- P_Name string
- C_Name string
- }
- type District struct {
- P_Name string
- C_Name string
- D_Name string
- }
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- data_mgo,qy_mgo *MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //节点信息
- coll_name,qy_coll_name,jy_coll_name string //表名
- check_lock sync.Mutex //更新锁
- check_thread int //线程数
- UpdateTask *updateInfo //更新池
- ProvinceDict map[string][]Province //省份-map
- CityDict map[string][]City //城市-map
- DistrictDict map[string][]District //区县-map
- )
- //初始化城市
- func initCheckCity() {
- //初始化-城市配置
- ProvinceDict = make(map[string][]Province,0)
- CityDict = make(map[string][]City,0)
- DistrictDict = make(map[string][]District,0)
- q := map[string]interface{}{
- "town_code":map[string]interface{}{
- "$exists":0,
- },
- }
- sess := qy_mgo.GetMgoConn()
- defer qy_mgo.DestoryMongoConn(sess)
- it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
- total := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("当前数量:", total)
- }
- district_code := qu.IntAll(tmp["district_code"])
- city_code := qu.IntAll(tmp["city_code"])
- if district_code > 0 {
- province := qu.ObjToString(tmp["province"])
- city := qu.ObjToString(tmp["city"])
- district := qu.ObjToString(tmp["district"])
- data := District{province,city,district}
- if DistrictDict[district]==nil {
- DistrictDict[district] = []District{data}
- }else {
- arr := DistrictDict[district]
- arr = append(arr,data)
- DistrictDict[district] = arr
- }
- }else {
- if city_code>0 {
- province := qu.ObjToString(tmp["province"])
- city := qu.ObjToString(tmp["city"])
- data := City{province,city}
- if CityDict[city]==nil {
- CityDict[city] = []City{data}
- }else {
- arr := CityDict[city]
- arr = append(arr,data)
- CityDict[city] = arr
- }
- }else {
- province := qu.ObjToString(tmp["province"])
- data := Province{province}
- if ProvinceDict[province]==nil {
- ProvinceDict[province] = []Province{data}
- }else {
- arr := ProvinceDict[province]
- arr = append(arr,data)
- ProvinceDict[province] = arr
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d",len(ProvinceDict),len(CityDict),len(DistrictDict)))
- }
- //mgo-配置等
- func initMgo() {
- mconf := Sysconfig["mongodb"].(map[string]interface{})
- log.Println(mconf)
- data_mgo = &MongodbSim{
- MongodbAddr: mconf["addrName"].(string),
- DbName: mconf["dbName"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- }
- data_mgo.InitPool()
- qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
- qy_mgo = &MongodbSim{
- MongodbAddr: qy_mconf["qy_addrName"].(string),
- DbName: qy_mconf["qy_dbName"].(string),
- Size: qu.IntAllDef(qy_mconf["pool"], 10),
- UserName: qy_mconf["qy_username"].(string),
- Password: qy_mconf["qy_password"].(string),
- }
- qy_mgo.InitPool()
- coll_name = mconf["collName"].(string)
- qy_coll_name = qy_mconf["qy_collName"].(string)
- jy_coll_name = Sysconfig["jy_collName"].(string)
- nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- check_thread = qu.IntAll(Sysconfig["check_thread"])
- log.Println("mgo 等配置,加载完毕...")
- }
- //初始化
- func init() {
- qu.ReadConfig(&Sysconfig) //加载配置文件
- log.Println(Sysconfig)
- if len(Sysconfig) == 0 {
- log.Fatal("读取配置文件失败", Sysconfig)
- }
- initMgo() //初始化mgo
- initCheckCity() //初始化城市
- //更新池
- UpdateTask = newUpdatePool()
- go UpdateTask.updateData()
- }
- func mainT() {
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- //临时校验
- func main() {
- sid := "618dc3b045a326c6c3f2f230"
- eid := "618e137545a326c6c3f44195"
- startCheckData(sid,eid)
- time.Sleep(99999 * time.Hour)
- }
- //开始审查数据
- func startCheckData(sid, eid string) {
- log.Println("开始审查数据...")
- defer qu.Catch()
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(sid),
- "$lte": StringTOBsonId(eid),
- },
- }
- log.Println("查询条件:",q)
- check_pool := make(chan bool, check_thread)
- check_wg := &sync.WaitGroup{}
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
- total,isRepair := 0,0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%10000 == 0 {
- log.Println("当前数量:", total,isRepair,tmp["_id"])
- }
- update_id := map[string]interface{}{"_id":tmp["_id"]}
- check_pool <- true
- check_wg.Add(1)
- go func(tmp map[string]interface{},update_id map[string]interface{}) {
- defer func() {
- <-check_pool
- check_wg.Done()
- }()
- //更新-
- update_check := make(map[string]interface{},0)
- //审查-城市
- getCheckDataCity(tmp,&update_check)
- //审查-中标金额
- getCheckDataBidamount(tmp,&update_check)
- if len(update_check)>0 {
- isRepair++
- UpdateTask.updatePool <- []map[string]interface{}{
- update_id,
- map[string]interface{}{
- "$set": update_check,
- },
- }
- }
- }(tmp,update_id)
- tmp = make(map[string]interface{})
- }
- check_wg.Wait()
- log.Println("check is over - 总计数量",total,isRepair)
- }
- //udp监听
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var rep map[string]interface{}
- err := json.Unmarshal(data, &rep)
- if err != nil {
- log.Println(err)
- } else {
- sid, _ := rep["gtid"].(string)
- eid, _ := rep["lteid"].(string)
- if sid == "" || eid == "" {
- log.Println("err", "sid=", sid, ",eid=", eid)
- return
- } else {
- go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
- log.Println("udp通知id段-审查数据", sid, "~", eid)
- startCheckData(sid, eid)
- log.Println("udp通知审查数据完成,下节点响应")
- for _, m := range nextNode {
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(m["stype"]),
- })
- err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
- IP: net.ParseIP(m["addr"].(string)),
- Port: qu.IntAll(m["port"]),
- })
- if err != nil {
- log.Println(err)
- }
- }
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("下节点回应",string(data))
- }
- }
|