|
@@ -3,16 +3,19 @@ package main
|
|
import (
|
|
import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "io/ioutil"
|
|
"log"
|
|
"log"
|
|
mu "mfw/util"
|
|
mu "mfw/util"
|
|
"net"
|
|
"net"
|
|
|
|
+ "net/http"
|
|
qu "qfw/util"
|
|
qu "qfw/util"
|
|
|
|
+ "strings"
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
type Province struct {
|
|
type Province struct {
|
|
- P_Name string
|
|
|
|
|
|
+ P_Name string
|
|
}
|
|
}
|
|
type City struct {
|
|
type City struct {
|
|
P_Name string
|
|
P_Name string
|
|
@@ -25,37 +28,41 @@ type District struct {
|
|
}
|
|
}
|
|
|
|
|
|
var (
|
|
var (
|
|
- Sysconfig map[string]interface{} //配置文件
|
|
|
|
- mconf map[string]interface{} //mongodb配置信息
|
|
|
|
- data_mgo,qy_mgo *MongodbSim //mongodb操作对象
|
|
|
|
- udpclient mu.UdpClient //udp对象
|
|
|
|
- nextNode []map[string]interface{} //节点信息
|
|
|
|
- coll_name,qy_coll_name,jy_coll_name string //表名
|
|
|
|
- check_lock sync.Mutex //更新锁
|
|
|
|
- check_thread int //线程数
|
|
|
|
- UpdateTask *updateInfo //更新池
|
|
|
|
-
|
|
|
|
- ProvinceDict map[string][]Province //省份-map
|
|
|
|
- CityDict map[string][]City //城市-map
|
|
|
|
- DistrictDict map[string][]District //区县-map
|
|
|
|
|
|
+ Sysconfig map[string]interface{} //配置文件
|
|
|
|
+ mconf map[string]interface{} //mongodb配置信息
|
|
|
|
+ data_mgo, qy_mgo *MongodbSim
|
|
|
|
+ bid_mgo *MongodbSim //mongodb操作对象
|
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
|
+ nextNode []map[string]interface{} //节点信息
|
|
|
|
+ coll_name, qy_coll_name, jy_coll_name string //表名
|
|
|
|
+ check_lock sync.Mutex //更新锁
|
|
|
|
+ check_thread int //线程数
|
|
|
|
+ UpdateTask *updateInfo //更新池
|
|
|
|
+
|
|
|
|
+ ProvinceDict map[string][]Province //省份-map
|
|
|
|
+ CityDict map[string][]City //城市-map
|
|
|
|
+ DistrictDict map[string][]District //区县-map
|
|
|
|
+
|
|
|
|
+ //删除字段
|
|
|
|
+ unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
|
|
)
|
|
)
|
|
|
|
|
|
//初始化城市
|
|
//初始化城市
|
|
-func initCheckCity() {
|
|
|
|
|
|
+func initCheckCity() {
|
|
//初始化-城市配置
|
|
//初始化-城市配置
|
|
- ProvinceDict = make(map[string][]Province,0)
|
|
|
|
- CityDict = make(map[string][]City,0)
|
|
|
|
- DistrictDict = make(map[string][]District,0)
|
|
|
|
|
|
+ ProvinceDict = make(map[string][]Province, 0)
|
|
|
|
+ CityDict = make(map[string][]City, 0)
|
|
|
|
+ DistrictDict = make(map[string][]District, 0)
|
|
|
|
|
|
q := map[string]interface{}{
|
|
q := map[string]interface{}{
|
|
- "town_code":map[string]interface{}{
|
|
|
|
- "$exists":0,
|
|
|
|
|
|
+ "town_code": map[string]interface{}{
|
|
|
|
+ "$exists": 0,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
sess := qy_mgo.GetMgoConn()
|
|
sess := qy_mgo.GetMgoConn()
|
|
defer qy_mgo.DestoryMongoConn(sess)
|
|
defer qy_mgo.DestoryMongoConn(sess)
|
|
it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
|
|
it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
|
|
- total := 0
|
|
|
|
|
|
+ total := 0
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
if total%1000 == 0 {
|
|
if total%1000 == 0 {
|
|
log.Println("当前数量:", total)
|
|
log.Println("当前数量:", total)
|
|
@@ -66,46 +73,46 @@ func initCheckCity() {
|
|
province := qu.ObjToString(tmp["province"])
|
|
province := qu.ObjToString(tmp["province"])
|
|
city := qu.ObjToString(tmp["city"])
|
|
city := qu.ObjToString(tmp["city"])
|
|
district := qu.ObjToString(tmp["district"])
|
|
district := qu.ObjToString(tmp["district"])
|
|
- data := District{province,city,district}
|
|
|
|
- if DistrictDict[district]==nil {
|
|
|
|
|
|
+ data := District{province, city, district}
|
|
|
|
+ if DistrictDict[district] == nil {
|
|
DistrictDict[district] = []District{data}
|
|
DistrictDict[district] = []District{data}
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
arr := DistrictDict[district]
|
|
arr := DistrictDict[district]
|
|
- arr = append(arr,data)
|
|
|
|
|
|
+ arr = append(arr, data)
|
|
DistrictDict[district] = arr
|
|
DistrictDict[district] = arr
|
|
}
|
|
}
|
|
- }else {
|
|
|
|
- if city_code>0 {
|
|
|
|
|
|
+ } else {
|
|
|
|
+ if city_code > 0 {
|
|
province := qu.ObjToString(tmp["province"])
|
|
province := qu.ObjToString(tmp["province"])
|
|
city := qu.ObjToString(tmp["city"])
|
|
city := qu.ObjToString(tmp["city"])
|
|
- data := City{province,city}
|
|
|
|
- if CityDict[city]==nil {
|
|
|
|
|
|
+ data := City{province, city}
|
|
|
|
+ if CityDict[city] == nil {
|
|
CityDict[city] = []City{data}
|
|
CityDict[city] = []City{data}
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
arr := CityDict[city]
|
|
arr := CityDict[city]
|
|
- arr = append(arr,data)
|
|
|
|
|
|
+ arr = append(arr, data)
|
|
CityDict[city] = arr
|
|
CityDict[city] = arr
|
|
}
|
|
}
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
province := qu.ObjToString(tmp["province"])
|
|
province := qu.ObjToString(tmp["province"])
|
|
data := Province{province}
|
|
data := Province{province}
|
|
- if ProvinceDict[province]==nil {
|
|
|
|
|
|
+ if ProvinceDict[province] == nil {
|
|
ProvinceDict[province] = []Province{data}
|
|
ProvinceDict[province] = []Province{data}
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
arr := ProvinceDict[province]
|
|
arr := ProvinceDict[province]
|
|
- arr = append(arr,data)
|
|
|
|
|
|
+ arr = append(arr, data)
|
|
ProvinceDict[province] = arr
|
|
ProvinceDict[province] = arr
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
tmp = make(map[string]interface{})
|
|
tmp = make(map[string]interface{})
|
|
}
|
|
}
|
|
- log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d",len(ProvinceDict),len(CityDict),len(DistrictDict)))
|
|
|
|
|
|
+ log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
|
|
}
|
|
}
|
|
|
|
|
|
//mgo-配置等
|
|
//mgo-配置等
|
|
-func initMgo() {
|
|
|
|
- mconf := Sysconfig["mongodb"].(map[string]interface{})
|
|
|
|
|
|
+func initMgo() {
|
|
|
|
+ mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
log.Println(mconf)
|
|
log.Println(mconf)
|
|
data_mgo = &MongodbSim{
|
|
data_mgo = &MongodbSim{
|
|
MongodbAddr: mconf["addrName"].(string),
|
|
MongodbAddr: mconf["addrName"].(string),
|
|
@@ -119,11 +126,19 @@ func initMgo() {
|
|
MongodbAddr: qy_mconf["qy_addrName"].(string),
|
|
MongodbAddr: qy_mconf["qy_addrName"].(string),
|
|
DbName: qy_mconf["qy_dbName"].(string),
|
|
DbName: qy_mconf["qy_dbName"].(string),
|
|
Size: qu.IntAllDef(qy_mconf["pool"], 10),
|
|
Size: qu.IntAllDef(qy_mconf["pool"], 10),
|
|
- UserName: qy_mconf["qy_username"].(string),
|
|
|
|
- Password: qy_mconf["qy_password"].(string),
|
|
|
|
|
|
+ UserName: qy_mconf["qy_username"].(string),
|
|
|
|
+ Password: qy_mconf["qy_password"].(string),
|
|
}
|
|
}
|
|
qy_mgo.InitPool()
|
|
qy_mgo.InitPool()
|
|
|
|
|
|
|
|
+ bid_mgo = &MongodbSim{
|
|
|
|
+ MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
|
|
|
|
+ DbName: "qfw",
|
|
|
|
+ Size: 10,
|
|
|
|
+ UserName: "zhengkun",
|
|
|
|
+ Password: "zk@123123",
|
|
|
|
+ }
|
|
|
|
+ bid_mgo.InitPool()
|
|
|
|
|
|
coll_name = mconf["collName"].(string)
|
|
coll_name = mconf["collName"].(string)
|
|
qy_coll_name = qy_mconf["qy_collName"].(string)
|
|
qy_coll_name = qy_mconf["qy_collName"].(string)
|
|
@@ -150,7 +165,7 @@ func init() {
|
|
go UpdateTask.updateData()
|
|
go UpdateTask.updateData()
|
|
}
|
|
}
|
|
|
|
|
|
-func main() {
|
|
|
|
|
|
+func main() {
|
|
updport := Sysconfig["udpport"].(string)
|
|
updport := Sysconfig["udpport"].(string)
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient.Listen(processUdpMsg)
|
|
udpclient.Listen(processUdpMsg)
|
|
@@ -159,11 +174,10 @@ func main() {
|
|
}
|
|
}
|
|
|
|
|
|
//临时校验
|
|
//临时校验
|
|
-func mainT() {
|
|
|
|
- sid := "624ed2324f7bde5444f1e973"
|
|
|
|
- eid := "624ed35e4f7bde5444f1ec1d"
|
|
|
|
-
|
|
|
|
- startCheckData(sid,eid)
|
|
|
|
|
|
+func mainT() {
|
|
|
|
+ sid := "12982d658fa2ac55ba96517d"
|
|
|
|
+ eid := "92982d658fa2ac55ba96517e"
|
|
|
|
+ startCheckData(sid, eid)
|
|
time.Sleep(99999 * time.Hour)
|
|
time.Sleep(99999 * time.Hour)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -177,7 +191,7 @@ func startCheckData(sid, eid string) {
|
|
"$lte": StringTOBsonId(eid),
|
|
"$lte": StringTOBsonId(eid),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
- log.Println("查询条件:",q)
|
|
|
|
|
|
+ log.Println("查询条件:", q)
|
|
|
|
|
|
check_pool := make(chan bool, check_thread)
|
|
check_pool := make(chan bool, check_thread)
|
|
check_wg := &sync.WaitGroup{}
|
|
check_wg := &sync.WaitGroup{}
|
|
@@ -185,57 +199,54 @@ func startCheckData(sid, eid string) {
|
|
sess := data_mgo.GetMgoConn()
|
|
sess := data_mgo.GetMgoConn()
|
|
defer data_mgo.DestoryMongoConn(sess)
|
|
defer data_mgo.DestoryMongoConn(sess)
|
|
it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
- total,isRepair := 0,0
|
|
|
|
|
|
+ total, isRepair := 0, 0
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
|
|
if total%10000 == 0 {
|
|
if total%10000 == 0 {
|
|
- log.Println("当前数量:", total,isRepair,tmp["_id"])
|
|
|
|
|
|
+ log.Println("当前数量:", total, isRepair, tmp["_id"])
|
|
}
|
|
}
|
|
- update_id := map[string]interface{}{"_id":tmp["_id"]}
|
|
|
|
|
|
+ update_id := map[string]interface{}{"_id": tmp["_id"]}
|
|
check_pool <- true
|
|
check_pool <- true
|
|
check_wg.Add(1)
|
|
check_wg.Add(1)
|
|
- go func(tmp map[string]interface{},update_id map[string]interface{}) {
|
|
|
|
|
|
+ go func(tmp map[string]interface{}, update_id map[string]interface{}) {
|
|
defer func() {
|
|
defer func() {
|
|
<-check_pool
|
|
<-check_pool
|
|
check_wg.Done()
|
|
check_wg.Done()
|
|
}()
|
|
}()
|
|
//更新-
|
|
//更新-
|
|
- update_check := make(map[string]interface{},0)
|
|
|
|
|
|
+ update_check := make(map[string]interface{}, 0)
|
|
//审查-城市
|
|
//审查-城市
|
|
- getCheckDataCity(tmp,&update_check)
|
|
|
|
|
|
+ getCheckDataCity(tmp, &update_check)
|
|
//审查-中标金额
|
|
//审查-中标金额
|
|
- getCheckDataBidamount(tmp,&update_check)
|
|
|
|
|
|
+ getCheckDataBidamount(tmp, &update_check)
|
|
//验证是否修复发布时间 - 对比开标日期,投标截止日期
|
|
//验证是否修复发布时间 - 对比开标日期,投标截止日期
|
|
- getCheckDataPublishtime(tmp,&update_check)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if len(update_check)>0 {
|
|
|
|
|
|
+ getCheckDataPublishtime(tmp, &update_check)
|
|
|
|
+ //清洗分类~
|
|
|
|
+ //is_category:= getCheckDataCategory(tmp,&update_check)
|
|
|
|
+
|
|
|
|
+ //最终计算是否清洗
|
|
|
|
+ update_dict := make(map[string]interface{}, 0)
|
|
|
|
+ if len(update_check) > 0 {
|
|
|
|
+ update_dict["$set"] = update_check
|
|
|
|
+ }
|
|
|
|
+ //if is_category {
|
|
|
|
+ // update_dict["$unset"] = unset_dict
|
|
|
|
+ //}
|
|
|
|
+ if len(update_dict) > 0 {
|
|
|
|
+ //注意事项~更新key 不能与 删除key 同时存在
|
|
isRepair++
|
|
isRepair++
|
|
UpdateTask.updatePool <- []map[string]interface{}{
|
|
UpdateTask.updatePool <- []map[string]interface{}{
|
|
update_id,
|
|
update_id,
|
|
- map[string]interface{}{
|
|
|
|
- "$set": update_check,
|
|
|
|
- },
|
|
|
|
|
|
+ update_dict,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
- }(tmp,update_id)
|
|
|
|
|
|
+ }(tmp, update_id)
|
|
tmp = make(map[string]interface{})
|
|
tmp = make(map[string]interface{})
|
|
}
|
|
}
|
|
check_wg.Wait()
|
|
check_wg.Wait()
|
|
|
|
|
|
- log.Println("check is over - 总计数量",total,isRepair)
|
|
|
|
|
|
+ log.Println("check is over - 总计数量", total, isRepair)
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
//udp监听
|
|
//udp监听
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
switch act {
|
|
switch act {
|
|
@@ -254,6 +265,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
|
|
go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
|
|
log.Println("udp通知id段-审查数据", sid, "~", eid)
|
|
log.Println("udp通知id段-审查数据", sid, "~", eid)
|
|
startCheckData(sid, eid)
|
|
startCheckData(sid, eid)
|
|
|
|
+
|
|
log.Println("udp通知审查数据完成,下节点响应")
|
|
log.Println("udp通知审查数据完成,下节点响应")
|
|
for _, m := range nextNode {
|
|
for _, m := range nextNode {
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
@@ -261,19 +273,67 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
"lteid": eid,
|
|
"lteid": eid,
|
|
"stype": qu.ObjToString(m["stype"]),
|
|
"stype": qu.ObjToString(m["stype"]),
|
|
})
|
|
})
|
|
- err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
|
|
|
+ new_err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
IP: net.ParseIP(m["addr"].(string)),
|
|
IP: net.ParseIP(m["addr"].(string)),
|
|
Port: qu.IntAll(m["port"]),
|
|
Port: qu.IntAll(m["port"]),
|
|
})
|
|
})
|
|
- if err != nil {
|
|
|
|
|
|
+ if new_err != nil {
|
|
log.Println(err)
|
|
log.Println(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ //更新记录状态
|
|
|
|
+ updateProcessUdpIdsInfo(sid, eid)
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
case mu.OP_NOOP: //下个节点回应
|
|
case mu.OP_NOOP: //下个节点回应
|
|
- log.Println("下节点回应",string(data))
|
|
|
|
|
|
+ log.Println("下节点回应", string(data))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+//更新流程记录id段落
|
|
|
|
+func updateProcessUdpIdsInfo(sid string, eid string) {
|
|
|
|
+ query := map[string]interface{}{
|
|
|
|
+ "gtid": sid,
|
|
|
|
+ "lteid": eid,
|
|
|
|
+ }
|
|
|
|
+ log.Println("开始更新流程段落记录~~", query)
|
|
|
|
+ data := bid_mgo.FindOne("bidding_processing_ids", query)
|
|
|
|
+ if len(data) > 0 {
|
|
|
|
+ up_id := BsonTOStringId(data["_id"])
|
|
|
|
+ if up_id != "" {
|
|
|
|
+ update := map[string]interface{}{
|
|
|
|
+ "$set": map[string]interface{}{
|
|
|
|
+ "dataprocess": 4,
|
|
|
|
+ "updatetime": time.Now().Unix(),
|
|
|
|
+ },
|
|
|
|
+ }
|
|
|
|
+ bid_mgo.UpdateById("bidding_processing_ids", up_id, update)
|
|
|
|
+ log.Println("流程段落记录~~更新完毕~", update)
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ log.Println("未查询到记录id段落~", query)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
|
|
+func httpDo(detail string) (e error) {
|
|
|
|
+ client := &http.Client{}
|
|
|
|
+ req, err := http.NewRequest("POST", "http://127.0.0.1:9991/get",
|
|
|
|
+ strings.NewReader("detail="+detail))
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
|
|
+ resp, err := client.Do(req)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ defer resp.Body.Close()
|
|
|
|
+ body, err := ioutil.ReadAll(resp.Body)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ log.Println("put ", string(body))
|
|
|
|
+ return nil
|
|
|
|
+}
|