Jianghan 4 vuotta sitten
vanhempi
commit
62078b6f3f

+ 3 - 2
fullproject/src_v1/config.json

@@ -8,6 +8,7 @@
 	"hints":"publishtime_1",
     "extractColl": "jh_info",
     "projectColl": "jh_project",
+    "updateColl": "jh_info",
     "backupFlag": false,
     "siteColl": "site",
     "thread": 1,
@@ -16,8 +17,8 @@
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
     "es": {
-        "addr": "http://192.168.3.128:9800",
-        "index": "projectset_v3",
+        "addr": "http://127.0.0.1:9800",
+        "index": "projectset",
         "itype": "projectset",
         "pool": 10
     },

+ 2 - 0
fullproject/src_v1/init.go

@@ -21,6 +21,7 @@ var (
 	Sysconfig                                      map[string]interface{} //读取配置文件
 	MongoTool                                      *MongodbSim            //mongodb连接
 	ExtractColl, ProjectColl, BackupColl, SiteColl string                 //抽取表、项目表、项目快照表、站点表
+	UpdateColl									   string				  // 金额修改数据表
 	Thread                                         int                    //配置项线程数
 	//NextNode                 []interface{}
 )
@@ -63,6 +64,7 @@ func init() {
 
 	ExtractColl = Sysconfig["extractColl"].(string)
 	ProjectColl = Sysconfig["projectColl"].(string)
+	UpdateColl = Sysconfig["updateColl"].(string)
 	BackupColl = Sysconfig["projectColl"].(string) + "_back"
 	SiteColl = Sysconfig["siteColl"].(string)
 	Thread = util.IntAll(Sysconfig["thread"])

+ 14 - 4
fullproject/src_v1/main.go

@@ -91,8 +91,8 @@ func main() {
 
 //测试组人员使用
 func mainT() {
-	sid = "56388138af53745d9a000001"
-	eid = "5b671f32a5cb26b9b76ddbb6"
+	sid = "5e69e7cc85a9271abf1bdb0a"
+	eid = "5e993c6185a9271abf2f51b5"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()
@@ -104,7 +104,7 @@ func mainT() {
 	}
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
-	mapinfo["stype"] = "ql"
+	mapinfo["stype"] = "updateMoneyMgo"
 	mapinfo["ip"] = "127.0.0.1"
 	mapinfo["port"] = Sysconfig["udpport"]
 	if Sysconfig["loadStart"] != nil {
@@ -116,7 +116,8 @@ func mainT() {
 	P_QL.loadSite()
 	P_QL.currentType = mapinfo["stype"].(string)
 	P_QL.pici = time.Now().Unix()
-	P_QL.taskQl(mapinfo)
+	//P_QL.taskQl(mapinfo)
+	P_QL.taskQuery()
 	time.Sleep(20 * time.Second)
 }
 
@@ -197,6 +198,15 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					P_QL.pici = time.Now().Unix()
 					P_QL.taskUpdateMoney(mapInfo)
 				}()
+			case "updateMoneyMgo": //修改金额
+				go func() {
+					defer func() {
+						<-SingleThread
+					}()
+					P_QL.currentType = tasktype
+					P_QL.pici = time.Now().Unix()
+					P_QL.taskQuery()
+				}()
 			case "history": //历史数据合并,暂时不写
 				go func() {
 					defer func() {

+ 3 - 3
fullproject/src_v1/project.go

@@ -1143,7 +1143,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 				}
 			}
 		} else {
-			if project.Budget < info.Budget {
+			if project.Budget > info.Budget {
 				project.Budget = info.Budget
 				project.Budgettag = 0
 			}
@@ -1200,7 +1200,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 				}
 			} else {
 				if info.SubType == "中标" || info.SubType == "成交" {
-					if project.Bidamount < info.Bidamount {
+					if project.Bidamount > info.Bidamount {
 						project.Bidamount = info.Bidamount
 						project.Bidamounttag = 0
 					} else {
@@ -1220,7 +1220,7 @@ func CountAmount(project *ProjectInfo, info *Info, tmp map[string]interface{}) {
 								project.Bidamount = project.Bidamount + info.Bidamount
 								project.Bidamounttag = 0
 							} else {
-								if project.Bidamount < info.Bidamount {
+								if project.Bidamount > info.Bidamount {
 									project.Bidamount = info.Bidamount
 									project.Bidamounttag = 0
 								}

+ 106 - 98
fullproject/src_v1/task.go

@@ -8,7 +8,6 @@ import (
 	mu "mfw/util"
 	"qfw/util"
 	"regexp"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -103,12 +102,10 @@ var sp = make(chan bool, 5)
 
 //初始化全量合并对象
 func init() {
-	util.Debug("task init...")
 	P_QL = NewPT()
 	log.Println(len(P_QL.updatePool))
 	go P_QL.updateAllQueue()
 	go P_QL.clearMem()
-	util.Debug("task init end")
 }
 
 func (p *ProjectTask) updateAllQueue() {
@@ -341,110 +338,118 @@ func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
 	p.enter(db, coll, q)
 }
 
+func (p *ProjectTask) taskQuery() {
+	defer util.Catch()
+	count := 0
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+	fields := map[string]interface{} {"budget": 1, "bidamount": 1, "package": 1}
+	ms := sess.DB(MongoTool.DbName).C(UpdateColl).Find(map[string]interface{}{}).Select(fields)
+	query := ms.Iter()
+L:
+	for {
+		tmp := make(map[string]interface{})
+		if query.Next(&tmp) {
+			lastid := tmp["_id"]
+			tmp["id"] = tmp["_id"].(primitive.ObjectID).Hex();
+			if count%1000 == 0 {
+				log.Println("current modify", count, lastid)
+			}
+			p.taskUpdateMoney(tmp)
+			count++
+		} else {
+			break L
+		}
+	}
+}
+
 //修改公告信息的预算/中标金额
 func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
 	defer util.Catch()
 	id := udpInfo["id"].(string)
-	budget := util.ObjToString(udpInfo["budget"])
-	bidamount := util.ObjToString(udpInfo["bidamount"])
-	if budget == "" && bidamount == "" {
-		util.Debug("")
-		return
-	}
+	budget := util.Float64All(udpInfo["budget"])
+	bidamount := util.Float64All(udpInfo["bidamount"])
 
 	client := Es.GetEsConn()
 	defer Es.DestoryEsConn(client)
 	esquery := `{"query": {"bool": {"must": [{"term": {"list.infoid": "`+id+`"}}]}}}`
 	data := Es.Get(Index, Itype, esquery)
-	util.Debug(*data)
-	if data != nil {
+	if len(*data) > 0 {
 		pid := util.ObjToString((*data)[0]["_id"])
 		pro := MongoTool.FindById(ProjectColl, pid)
+		if len(pro) == 0 {
+			util.Debug("未找到项目, pid=", pid)
+			return
+		}
 		var info *map[string]interface{}
 		for _, v := range []interface{}(pro["list"].(primitive.A)){
 			v1 := v.(map[string]interface{})
 			if util.ObjToString(v1["infoid"]) == id {
 				info = util.ObjToMap(v)
 				infoField := util.ObjToMap(pro["infofield"])
-				if budget != "" {
-					if budget != "del" {
-						newBudget, _ := strconv.ParseFloat(budget, 64)
-						if pro["budget"] == (*info)["budget"] {
-							pro["budget"] = newBudget
-						}
-						if util.IntAll(pro["multipackage"]) == 1 {
-							if packages, ok := pro["package"].(map[string]interface{}); ok {
-							M :
-									for _, v := range packages{
-										v1 := []interface{}(v.(primitive.A))
-										for _, v2 := range v1{
-											v3 := v2.(map[string]interface{})
-											if util.ObjToString(v3["infoid"]) == id {
-												if v3["budget"] != nil {
-													v3["budget"] = newBudget
-												}
-											}else {
-												break M
-											}
+				if udpInfo["budget"] != nil{
+					util.Debug("update-------", (*info)["infoid"])
+					//if pro["budget"] == (*info)["budget"] {
+					//	pro["budget"] = budget
+					//}
+					//多包中的金额
+					if util.IntAll(pro["multipackage"]) == 1 {
+						if packages, ok := pro["package"].(map[string]interface{}); ok {
+						M :
+							for k, v := range packages{
+								v1 := []interface{}(v.(primitive.A))
+								for _, v2 := range v1{
+									v3 := v2.(map[string]interface{})
+									if util.ObjToString(v3["infoid"]) == id {
+										if v3["budget"] != nil {
+											pkg := udpInfo["package"].(map[string]interface{})
+											tmp := pkg[k].(map[string]interface{})
+											v3["budget"] = tmp["budget"]
 										}
+									}else {
+										break M
 									}
+								}
 							}
 						}
-						if pro["sortprice"] == (*info)["budget"] {
-							pro["sortprice"] = newBudget
-						}
-						(*info)["budget"] = newBudget
-						(*util.ObjToMap((*infoField)[id]))["budget"] = newBudget
-					}else {
-						delete(*info, "budget")
-						delete(*util.ObjToMap((*infoField)[id]), "budget")
-						if pro["budget"] == (*info)["budget"] {
-							money := FindMoney("budget", pro)
-							if money >= 0 {
-								pro["budget"] = money
-							}
-						}
 					}
+					(*info)["budget"] = budget
+					(*util.ObjToMap((*infoField)[id]))["budget"] = budget
+					if pro["sortprice"] == (*info)["budget"] {
+						pro["sortprice"] = budget
+					}
+				}else {
+					delete(*info, "budget")
 				}
-				if bidamount != "" {
-					if bidamount != "del" {
-						newBidamount, _ := strconv.ParseFloat(bidamount, 64)
-						if pro["bidamount"] == (*info)["bidamount"] {
-							pro["bidamount"] = newBidamount
-						}
-						if util.IntAll(pro["multipackage"]) == 1 {
-							if packages, ok := pro["package"].(map[string]interface{}); ok {
-							N :
-								for _, v := range packages{
-									v1 := []interface{}(v.(primitive.A))
-									for _, v2 := range v1{
-										v3 := v2.(map[string]interface{})
-										if util.ObjToString(v3["infoid"]) == id {
-											if v3["bidamount"] != nil {
-												v3["bidamount"] = newBidamount
-											}
-										}else {
-											break N
+				if udpInfo["bidamount"] != nil{
+					//if pro["bidamount"] == (*info)["bidamount"] {
+					//	pro["bidamount"] = bidamount
+					//}
+					v1["bidamount"] = bidamount
+					if util.IntAll(pro["multipackage"]) == 1 {
+						if packages, ok := pro["package"].(map[string]interface{}); ok {
+							for k, v := range packages{
+								v1 := []interface{}(v.(primitive.A))
+								for _, v2 := range v1{
+									v3 := v2.(map[string]interface{})
+									if util.ObjToString(v3["infoid"]) == id {
+										if v3["bidamount"] != nil {
+											pkg := udpInfo["package"].(map[string]interface{})
+											tmp := pkg[k].(map[string]interface{})
+											v3["bidamount"] = tmp["bidamount"]
 										}
 									}
 								}
 							}
 						}
-						if pro["sortprice"] == (*info)["bidamount"] {
-							pro["sortprice"] = newBidamount
-						}
-						(*info)["bidamount"] = newBidamount
-						(*util.ObjToMap((*infoField)[id]))["bidamount"] = newBidamount
-					}else {
-						delete((*info), "bidamount")
-						delete((*util.ObjToMap((*infoField)[id])), "bidamount")
-						if pro["bidamount"] == (*info)["bidamount"] {
-							money := FindMoney("bidamount", pro)
-							if money >= 0 {
-								pro["bidamount"] = money
-							}
-						}
 					}
+					(*info)["bidamount"] = bidamount
+					(*util.ObjToMap((*infoField)[id]))["bidamount"] = bidamount
+					if pro["sortprice"] == (*info)["bidamount"] {
+						pro["sortprice"] = bidamount
+					}
+				}else {
+					delete(*info, "bidamount")
 				}
 				break
 			}
@@ -455,17 +460,22 @@ func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
 		_ = json.Unmarshal(bys, &project)
 		bys1, _ := json.Marshal(info)
 		_ = json.Unmarshal(bys1, &pInfo)
-		CountAmount(project, pInfo, *info)
-
-		if project.Budget > 0 {
-			util.Debug(project.Budget)
-			pro["budget"] = project.Budget
-			pro["budgettag"] = 0
-		}
-		if project.Bidamount > 0 {
-			util.Debug(project.Bidamount)
-			pro["bidamount"] = project.Bidamount
-			pro["bidamounttag"] = 0
+		if len(project.Ids) > 1 {
+			CountAmount(project, pInfo, *info)
+			if project.Budget > 0 {
+				pro["budget"] = project.Budget
+			}
+			if project.Bidamount > 0 {
+				pro["bidamount"] = project.Bidamount
+			}
+		}else {
+			pro["budget"] = budget
+			pro["bidamount"] = bidamount
+			if budget > bidamount {
+				pro["sortprice"] = budget
+			}else {
+				pro["sortprice"] = bidamount
+			}
 		}
 		set := map[string]interface{}{
 			"$set": pro,
@@ -492,6 +502,7 @@ func (p *ProjectTask) taskUpdateMoney(udpInfo map[string]interface{}) {
 					}},
 				"stype": "project",
 			})
+			util.Debug(string(by))
 			_ = udpclient.WriteUdp(by, mu.OP_TYPE_DATA, toaddr[1])
 		}
 	}
@@ -525,14 +536,12 @@ func nextNode(mapInfo map[string]interface{}, pici int64) {
 	mapInfo["query"] = map[string]interface{}{
 		"pici": pici,
 	}
-	for n, to := range toaddr {
-		key := fmt.Sprintf("%d-%s-%d", pici, "project", n)
-		mapInfo["key"] = key
-		datas, _ := json.Marshal(mapInfo)
-		node := &udpNode{datas, to, time.Now().Unix(), 0}
-		udptaskmap.Store(key, node)
-		_ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, to)
-	}
+	key := fmt.Sprintf("%d-%s-%d", pici, "project", 0)
+	mapInfo["key"] = key
+	datas, _ := json.Marshal(mapInfo)
+	node := &udpNode{datas, toaddr[0], time.Now().Unix(), 0}
+	udptaskmap.Store(key, node)
+	_ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, toaddr[0])
 }
 
 func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
@@ -583,7 +592,6 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 				break L
 			}
 		}
-
 	}()
 	fields := map[string]interface{} {"area": 1, "city": 1, "district": 1, "comeintime": 1, "publishtime": 1, "bidopentime": 1, "title": 1, "projectname": 1, "href": 1,
 		"projectcode": 1, "buyerclass": 1, "winner": 1, "s_winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1,