|
@@ -18,11 +18,14 @@ import (
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
var (
|
|
var (
|
|
Sysconfig map[string]interface{} //配置文件
|
|
Sysconfig map[string]interface{} //配置文件
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
mgo *mongodb.MongodbSim //mongodb操作对象
|
|
mgo *mongodb.MongodbSim //mongodb操作对象
|
|
-
|
|
|
|
|
|
+ siteMgo *mongodb.MongodbSim
|
|
//mgoTest *mongodb.MongodbSim //mongodb操作对象
|
|
//mgoTest *mongodb.MongodbSim //mongodb操作对象
|
|
|
|
|
|
extract string
|
|
extract string
|
|
@@ -42,8 +45,9 @@ var (
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
|
|
|
|
|
|
|
|
- siteArr []map[string]interface{} //站点
|
|
|
|
- inV_n int //无效数据数量
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ SiteMap map[string]interface{} //站点map
|
|
)
|
|
)
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
@@ -52,7 +56,6 @@ func init() {
|
|
//172.17.145.163:27080
|
|
//172.17.145.163:27080
|
|
util.ReadConfig(&Sysconfig)
|
|
util.ReadConfig(&Sysconfig)
|
|
nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
- siteArr = util.ObjArrToMapArr(Sysconfig["site"].([]interface{}))
|
|
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
|
|
|
|
mgo = &mongodb.MongodbSim{
|
|
mgo = &mongodb.MongodbSim{
|
|
@@ -70,8 +73,8 @@ func init() {
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
//加载数据
|
|
//加载数据
|
|
DM = NewDatamap(dupdays, lastid)
|
|
DM = NewDatamap(dupdays, lastid)
|
|
- fmt.Println(DM.keys)
|
|
|
|
- fmt.Println(DM.data)
|
|
|
|
|
|
+ //fmt.Println(DM.keys)
|
|
|
|
+ //fmt.Println(DM.data)
|
|
FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
|
|
FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
|
|
FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
|
|
FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
|
|
FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
|
|
FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
|
|
@@ -79,15 +82,34 @@ func init() {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- //数据库
|
|
|
|
- //mongodb.InitMongodbPool(5, "192.168.3.207:27081", "")
|
|
|
|
|
|
+ //站点相关数据库
|
|
|
|
+ mongodb.InitMongodbPool(5, "192.168.3.207:27082", "")
|
|
|
|
+
|
|
|
|
+ siteMgo = &mongodb.MongodbSim{
|
|
|
|
+ MongodbAddr: "192.168.3.207:27082",
|
|
|
|
+ Size: 5,
|
|
|
|
+ DbName: "zhaolongyue",
|
|
|
|
+ }
|
|
|
|
+ siteMgo.InitPool()
|
|
|
|
|
|
- //mgoTest = &mongodb.MongodbSim{
|
|
|
|
- // MongodbAddr: "192.168.3.207:27081",
|
|
|
|
- // Size: 5,
|
|
|
|
- // DbName: "qfw",
|
|
|
|
- //}
|
|
|
|
- //mgoTest.InitPool()
|
|
|
|
|
|
+
|
|
|
|
+ SiteMap = make(map[string]interface{},0)
|
|
|
|
+
|
|
|
|
+ start := int(time.Now().Unix())
|
|
|
|
+ //站点配置
|
|
|
|
+ sess_site := siteMgo.GetMgoConn()
|
|
|
|
+ defer sess_site.Close()
|
|
|
|
+ res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter()
|
|
|
|
+ for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
|
|
+ data_map := map[string]string{
|
|
|
|
+ "area":util.ObjToString(site_dict["area"]),
|
|
|
|
+ "city":util.ObjToString(site_dict["city"]),
|
|
|
|
+ "district":util.ObjToString(site_dict["district"]),
|
|
|
|
+ }
|
|
|
|
+ SiteMap[site_dict["site"].(string)]= data_map
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fmt.Printf("用时:%d秒,%d个",int(time.Now().Unix())-start,len(SiteMap))
|
|
|
|
|
|
|
|
|
|
}
|
|
}
|
|
@@ -225,6 +247,7 @@ func mainTest() {
|
|
|
|
|
|
|
|
|
|
func main() {
|
|
func main() {
|
|
|
|
+ return
|
|
go checkMapJob()
|
|
go checkMapJob()
|
|
|
|
|
|
updport := Sysconfig["udpport"].(string)
|
|
updport := Sysconfig["udpport"].(string)
|
|
@@ -306,7 +329,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
//是否为无效数据
|
|
//是否为无效数据
|
|
if invalidData(info.buyer,info.projectname,info.projectcode) {
|
|
if invalidData(info.buyer,info.projectname,info.projectcode) {
|
|
- inV_n++
|
|
|
|
mapLock.Lock()
|
|
mapLock.Lock()
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
@@ -459,7 +481,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
mgo.UpdateBulk(extract, updateExtract...)
|
|
mgo.UpdateBulk(extract, updateExtract...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
}
|
|
}
|
|
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"],"无效数据:",inV_n)
|
|
|
|
|
|
+ log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
//任务完成,开始发送广播通知下面节点
|
|
if n > repeateN && mapInfo["stop"] == nil {
|
|
if n > repeateN && mapInfo["stop"] == nil {
|
|
@@ -554,7 +576,6 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
//是否为无效数据
|
|
//是否为无效数据
|
|
if invalidData(info.buyer,info.projectname,info.projectcode) {
|
|
if invalidData(info.buyer,info.projectname,info.projectcode) {
|
|
- inV_n++
|
|
|
|
mapLock.Lock()
|
|
mapLock.Lock()
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
@@ -578,9 +599,26 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
if reason == "未判重记录" {
|
|
if reason == "未判重记录" {
|
|
//把info的数据判重的标签更换,并新增字段
|
|
//把info的数据判重的标签更换,并新增字段
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ mapLock.Lock()
|
|
|
|
+ //构建数据库更新用到的
|
|
|
|
+ //对比的数据打判重标签
|
|
|
|
+ DM.replaceSourceData(info,info.id) //替换即添加
|
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
|
+ map[string]interface{}{
|
|
|
|
+ "_id": tmp["_id"],
|
|
|
|
+ },
|
|
|
|
+ map[string]interface{}{
|
|
|
|
+ "$set": map[string]interface{}{
|
|
|
|
+ "repeat": 0,
|
|
|
|
+ "repeatid": "-1",
|
|
|
|
+ },
|
|
|
|
+ },
|
|
|
|
+ })
|
|
|
|
+ if len(updateExtract) > 500 {
|
|
|
|
+ mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
|
+ }
|
|
|
|
+ mapLock.Unlock()
|
|
}else {
|
|
}else {
|
|
repeateN++
|
|
repeateN++
|
|
mapLock.Lock()
|
|
mapLock.Lock()
|
|
@@ -688,7 +726,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
mgo.UpdateBulk(extract, updateExtract...)
|
|
mgo.UpdateBulk(extract, updateExtract...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
}
|
|
}
|
|
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"],"无效数据:",inV_n)
|
|
|
|
|
|
+ log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
|
|
|
|
|
|
|
|
|
|