package main import ( "encoding/json" "fmt" "log" mu "mfw/util" "net" qu "qfw/util" "sync" "time" ) type Province struct { P_Name string } type City struct { P_Name string C_Name string } type District struct { P_Name string C_Name string D_Name string } var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 data_mgo,qy_mgo *MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //节点信息 coll_name,qy_coll_name,jy_coll_name string //表名 check_lock sync.Mutex //更新锁 check_thread int //线程数 UpdateTask *updateInfo //更新池 ProvinceDict map[string][]Province //省份-map CityDict map[string][]City //城市-map DistrictDict map[string][]District //区县-map ) //初始化城市 func initCheckCity() { //初始化-城市配置 ProvinceDict = make(map[string][]Province,0) CityDict = make(map[string][]City,0) DistrictDict = make(map[string][]District,0) q := map[string]interface{}{ "town_code":map[string]interface{}{ "$exists":0, }, } sess := qy_mgo.GetMgoConn() defer qy_mgo.DestoryMongoConn(sess) it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter() total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("当前数量:", total) } district_code := qu.IntAll(tmp["district_code"]) city_code := qu.IntAll(tmp["city_code"]) if district_code > 0 { province := qu.ObjToString(tmp["province"]) city := qu.ObjToString(tmp["city"]) district := qu.ObjToString(tmp["district"]) data := District{province,city,district} if DistrictDict[district]==nil { DistrictDict[district] = []District{data} }else { arr := DistrictDict[district] arr = append(arr,data) DistrictDict[district] = arr } }else { if city_code>0 { province := qu.ObjToString(tmp["province"]) city := qu.ObjToString(tmp["city"]) data := City{province,city} if CityDict[city]==nil { CityDict[city] = []City{data} }else { arr := CityDict[city] arr = append(arr,data) CityDict[city] = arr } }else { province := qu.ObjToString(tmp["province"]) data := Province{province} if ProvinceDict[province]==nil { ProvinceDict[province] = []Province{data} }else { arr := ProvinceDict[province] arr = append(arr,data) ProvinceDict[province] = arr } } } tmp = make(map[string]interface{}) } log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d",len(ProvinceDict),len(CityDict),len(DistrictDict))) } //mgo-配置等 func initMgo() { mconf := Sysconfig["mongodb"].(map[string]interface{}) log.Println(mconf) data_mgo = &MongodbSim{ MongodbAddr: mconf["addrName"].(string), DbName: mconf["dbName"].(string), Size: qu.IntAllDef(mconf["pool"], 10), } data_mgo.InitPool() qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{}) qy_mgo = &MongodbSim{ MongodbAddr: qy_mconf["qy_addrName"].(string), DbName: qy_mconf["qy_dbName"].(string), Size: qu.IntAllDef(qy_mconf["pool"], 10), UserName: qy_mconf["qy_username"].(string), Password: qy_mconf["qy_password"].(string), } qy_mgo.InitPool() coll_name = mconf["collName"].(string) qy_coll_name = qy_mconf["qy_collName"].(string) jy_coll_name = Sysconfig["jy_collName"].(string) nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) check_thread = qu.IntAll(Sysconfig["check_thread"]) log.Println("mgo 等配置,加载完毕...") } //初始化 func init() { qu.ReadConfig(&Sysconfig) //加载配置文件 log.Println(Sysconfig) if len(Sysconfig) == 0 { log.Fatal("读取配置文件失败", Sysconfig) } initMgo() //初始化mgo initCheckCity() //初始化城市 //更新池 UpdateTask = newUpdatePool() go UpdateTask.updateData() } func mainT() { updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //临时校验 func main() { sid := "618dc3b045a326c6c3f2f230" eid := "618e137545a326c6c3f44195" startCheckData(sid,eid) time.Sleep(99999 * time.Hour) } //开始审查数据 func startCheckData(sid, eid string) { log.Println("开始审查数据...") defer qu.Catch() q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(sid), "$lte": StringTOBsonId(eid), }, } log.Println("查询条件:",q) check_pool := make(chan bool, check_thread) check_wg := &sync.WaitGroup{} sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter() total,isRepair := 0,0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%10000 == 0 { log.Println("当前数量:", total,isRepair,tmp["_id"]) } update_id := map[string]interface{}{"_id":tmp["_id"]} check_pool <- true check_wg.Add(1) go func(tmp map[string]interface{},update_id map[string]interface{}) { defer func() { <-check_pool check_wg.Done() }() //更新- update_check := make(map[string]interface{},0) //审查-城市 getCheckDataCity(tmp,&update_check) //审查-中标金额 getCheckDataBidamount(tmp,&update_check) if len(update_check)>0 { isRepair++ UpdateTask.updatePool <- []map[string]interface{}{ update_id, map[string]interface{}{ "$set": update_check, }, } } }(tmp,update_id) tmp = make(map[string]interface{}) } check_wg.Wait() log.Println("check is over - 总计数量",total,isRepair) } //udp监听 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var rep map[string]interface{} err := json.Unmarshal(data, &rep) if err != nil { log.Println(err) } else { sid, _ := rep["gtid"].(string) eid, _ := rep["lteid"].(string) if sid == "" || eid == "" { log.Println("err", "sid=", sid, ",eid=", eid) return } else { go udpclient.WriteUdp(data, mu.OP_NOOP, ra) log.Println("udp通知id段-审查数据", sid, "~", eid) startCheckData(sid, eid) log.Println("udp通知审查数据完成,下节点响应") for _, m := range nextNode { by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(m["stype"]), }) err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{ IP: net.ParseIP(m["addr"].(string)), Port: qu.IntAll(m["port"]), }) if err != nil { log.Println(err) } } } } case mu.OP_NOOP: //下个节点回应 log.Println("下节点回应",string(data)) } }