Jianghan 4 tahun lalu
induk
melakukan
30e09ad520

+ 1 - 2
fullproject/src_v1/load_data.go

@@ -172,7 +172,7 @@ func (p *ProjectTask) loadSpiderCode() {
 	defer MgoSpider.DestoryMongoConn(sess)
 	q := map[string]interface{}{}
 	field := map[string]interface{}{"code": 1, "isflow": 1}
-	it := sess.DB(MgoSpider.DbName).C("luaconfig_back").Find(&q).Select(field).Iter()
+	it := sess.DB(MgoSpider.DbName).C("luaconfig").Find(&q).Select(field).Iter()
 	n := 0
 	pool := make(chan map[string]interface{}, 100)
 	over := make(chan bool)
@@ -192,7 +192,6 @@ func (p *ProjectTask) loadSpiderCode() {
 		result := make(map[string]interface{})
 		if it.Next(&result) {
 			go func(res map[string]interface{}) {
-				util.Debug(result)
 				pool <- result
 			}(result)
 		} else {

+ 2 - 2
fullproject/src_v1/main.go

@@ -77,14 +77,14 @@ func main() {
 	//udp强制合并  信息id1,id2,id3 [项目id] 不存在时新建  qzhb
 	//udp强制拆分  项目id,信息id1,id2          qzcf
 	//udp重新合并  信息id1,id2,id3             cxhb
+	P_QL.loadSpiderCode()
+	P_QL.loadSite()
 	if Sysconfig["loadStart"] != nil {
 		loadStart := util.Int64All(Sysconfig["loadStart"])
 		if loadStart > -1 {
 			P_QL.loadData(loadStart)
 		}
 	}
-	P_QL.loadSpiderCode()
-	P_QL.loadSite()
 	go checkMapJob()
 	time.Sleep(99999 * time.Hour)
 }

+ 9 - 5
fullproject/src_v1/project.go

@@ -236,10 +236,10 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 	}
 
 	if !bFindProject {
-		if !IsCreatePro(info) {
-			qu.Debug("舍弃数据---", info.Id)
-			return
-		}
+		//if !IsCreatePro(info) {
+		//	qu.Debug("舍弃数据---", info.Id)
+		//	return
+		//}
 		id, p1 := p.NewProject(tmp, info)
 		p.AllIdsMapLock.Lock()
 		p.AllIdsMap[id] = &ID{Id: id, P: p1}
@@ -930,7 +930,11 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		set["buyertel"] = ""
 	}
 	if thisinfo.ContractCode != "" {
-		set["contractcode"] = pInfo.ContractCode + "," + thisinfo.ContractCode
+		if pInfo.ContractCode == "" {
+			set["contractcode"] = thisinfo.ContractCode
+		}else {
+			set["contractcode"] = pInfo.ContractCode + "," + thisinfo.ContractCode
+		}
 	}
 
 	//8--代理机构

+ 26 - 22
fullproject/src_v1/task.go

@@ -124,7 +124,7 @@ func NewPT() *ProjectTask {
 		coll:       ProjectColl,
 		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
 		statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
-		jgTime:		int64(util.IntAllDef(3, 3) * 86400),
+		jgTime:		int64(util.IntAllDef(7, 7) * 86400),
 	}
 	return p
 }
@@ -470,24 +470,11 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 					defer func() {
 						<-pool
 					}()
-					if util.IntAll(tmp["repeat"]) == 0 {
-						if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
-							//增量	dataging为1不参与合并
-							util.Debug("增量   dataging == 1 ", tmp["_id"])
-							return
-						}
-
-						p.fillInPlace(tmp)
-						info := ParseInfo(tmp)
-						p.currentTime = info.Publishtime
-						//普通合并
-						p.CommonMerge(tmp, info)
-					} else {
-						//信息错误,进行更新
-						p.mapBidLock.Lock()
-						countRepeat++
-						p.mapBidLock.Unlock()
-					}
+					p.fillInPlace(tmp)
+					info := ParseInfo(tmp)
+					p.currentTime = info.Publishtime
+					//普通合并
+					p.CommonMerge(tmp, info)
 				}(tmp)
 			case <-over:
 				break L
@@ -520,10 +507,27 @@ L:
 			tmp := make(map[string]interface{})
 			if query.Next(&tmp) {
 				lastid = tmp["_id"]
-				if count%10 == 0 {
-					log.Println("current", count, lastid)
+				if P_QL.currentType == "ql" {
+					if count%20000 == 0 {
+						log.Println("current", count, lastid)
+					}
+				}else {
+					if count%1000 == 0 {
+						log.Println("current", count, lastid)
+					}
+				}
+				if util.IntAll(tmp["repeat"]) == 0 {
+					if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
+						infoPool <- tmp
+					}else {
+						util.Debug("增量   dataging == 1 ", tmp["_id"])
+					}
+				}else {
+					countRepeat++
+					if P_QL.currentType == "project" {
+						util.Debug("repeat err---", tmp["_id"])
+					}
 				}
-				infoPool <- tmp
 				count++
 			} else {
 				break L

+ 9 - 9
monitor/task.go

@@ -87,15 +87,15 @@ func EsCheck(result string) string {
 	util.Debug(*resp)
 	if resp.Status != "green" {
 		result += "<br>" + "检索库异常,异常内容:" + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "cluster_name:" + resp.ClusterName + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "status:" + resp.Status + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
-			"&nbsp;&nbsp;&nbsp;&nbsp;" + "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
+			"cluster_name:" + resp.ClusterName + "<br>" +
+			"status:" + resp.Status + "<br>" +
+			"number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "<br>" +
+			"number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "<br>" +
+			"number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "<br>" +
+			"active_shards:" + strconv.Itoa(resp.ActiveShards) + "<br>" +
+			"relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "<br>" +
+			"initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "<br>" +
+			"unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "<br>"
 	}
 	return result
 }

+ 2 - 2
qyxy/src/config.json

@@ -1,7 +1,7 @@
 {
 	"mgodb": "192.168.3.207:27092",
 	"dbsize": 12,
-	"dbname": "mixdata",
+	"dbname": "wjh",
 	"dbcoll": "qyxy",
 	"savecoll": "qyxy_std",
     "uname": "dataAnyWrite",
@@ -10,7 +10,7 @@
 	"updatetime": 0,
 	"elastic": {
         "addr": "http://192.168.3.128:9800",
-        "index": "qyxy_v2",
+        "index": "qyxy_v1",
         "itype": "qyxy",
         "otherindex": "qyxy_all",
         "otheritype": "qyxy",

+ 4 - 4
qyxy/src/main.go

@@ -42,8 +42,8 @@ func init() {
 		MongodbAddr: Sysconfig["mgodb"].(string),
 		Size:        qu.IntAllDef(Sysconfig["dbsize"], 5),
 		DbName:      Dbname,
-		UserName: 	 Sysconfig["uname"].(string),
-		Password: 	 Sysconfig["upwd"].(string),
+		//UserName: 	 Sysconfig["uname"].(string),
+		//Password: 	 Sysconfig["upwd"].(string),
 	}
 	Mgo.InitPool()
 	//es
@@ -71,9 +71,9 @@ func init() {
 }
 
 func main() {
-	go TimeTask()
+	//go TimeTask()
 	//QyxyStandard()
-	//HistoryQyxyStandard()
+	HistoryQyxyStandard()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 33 - 29
qyxy/src/task.go

@@ -397,23 +397,24 @@ func QyxyStandard() bool {
 			company_name := qu.ObjToString(esMap["company_name"])
 			if company_type == "个体工商户" {
 				if len([]rune(company_name)) >= 5 {
-					esMap["company_type_int"] = 5
+					esMap["company_type_int"] = 31
 				}else {
-					esMap["company_type_int"] = 6
+					esMap["company_type_int"] = 32
 				}
 			}else if company_type == "其他" || company_type == "" {
 				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 3
+					esMap["company_type_int"] = 21
 				}else {
-					esMap["company_type_int"] = 4
+					esMap["company_type_int"] = 22
 				}
 			}else {
-				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 1
+				if company_type == "内资分公司" {
+					esMap["company_type_int"] = 12
+				}else if len([]rune(company_name)) >= 4 {
+					esMap["company_type_int"] = 11
 				}else {
-					esMap["company_type_int"] = 2
+					esMap["company_type_int"] = 13
 				}
-
 			}
 			lock.Lock()
 			if EsSaveFlag {
@@ -466,8 +467,8 @@ func HistoryQyxyStandard() {
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{} //控制读写
 	arr := [][]map[string]interface{}{}
-	//count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
-	//log.Println("共查询:", count, "条")
+	count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
+	log.Println("共查询:", count, "条")
 	it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
 	sum := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
@@ -754,30 +755,32 @@ func HistoryQyxyStandard() {
 				mgoMap[k] = v
 			}
 			//es数据过滤
-			EsSaveFlag := true
+			//EsSaveFlag := true
 			company_type := qu.ObjToString(esMap["company_type"])
 			company_name := qu.ObjToString(esMap["company_name"])
 			if company_type == "个体工商户" {
 				if len([]rune(company_name)) >= 5 {
-					esMap["company_type_int"] = 5
+					esMap["company_type_int"] = 31
 				}else {
-					esMap["company_type_int"] = 6
+					esMap["company_type_int"] = 32
 				}
 			}else if company_type == "其他" || company_type == "" {
 				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 3
+					esMap["company_type_int"] = 21
 				}else {
-					esMap["company_type_int"] = 4
+					esMap["company_type_int"] = 22
 				}
 			}else {
-				if len([]rune(company_name)) >= 4 {
-					esMap["company_type_int"] = 1
+				if company_type == "内资分公司" {
+					esMap["company_type_int"] = 12
+				}else if len([]rune(company_name)) >= 4 {
+					esMap["company_type_int"] = 11
 				}else {
-					esMap["company_type_int"] = 2
+					esMap["company_type_int"] = 13
 				}
 			}
 			lock.Lock()
-			if EsSaveFlag {
+			//if EsSaveFlag {
 				if esMap["history_name"] != nil {
 					var nameArr []string
 					for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
@@ -790,18 +793,18 @@ func HistoryQyxyStandard() {
 					}
 				}
 				EsSaveCache <- esMap //过滤后数据保存
-			}
+			//}
 			//EsSaveAllCache <- esMap //所有数据保存
 			//SaveHistoryName(tmp)
 			//update = append(update, map[string]interface{}{"$set": mgoMap})
-			if len(update) == 2 {
-				arr = append(arr, update)
-			}
-			if len(arr) > 500 {
-				tmps := arr
-				Mgo.UpSertBulk(Savecoll, tmps...)
-				arr = [][]map[string]interface{}{}
-			}
+			//if len(update) == 2 {
+			//	arr = append(arr, update)
+			//}
+			//if len(arr) > 500 {
+			//	tmps := arr
+			//	Mgo.UpSertBulk(Savecoll, tmps...)
+			//	arr = [][]map[string]interface{}{}
+			//}
 			lock.Unlock()
 		}(tmp)
 		tmp = make(map[string]interface{})
@@ -809,7 +812,7 @@ func HistoryQyxyStandard() {
 	wg.Wait()
 	lock.Lock()
 	if len(arr) > 0 {
-		Mgo.UpSertBulk(Savecoll, arr...)
+		//Mgo.UpSertBulk(Savecoll, arr...)
 	}
 	lock.Unlock()
 	log.Println("Run Over...Count:", sum)
@@ -843,6 +846,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
+					qu.Debug(Index, Itype, arru)
 					Es.BulkSave(Index, Itype, &arru, true)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 500)

+ 2 - 0
udpcreateindex/src/biddingall.go

@@ -377,6 +377,8 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 							if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 								newTmp[field] = qutil.Int64All(tmp[field])
 							}
+						} else if field == "entidlist" {
+							newTmp[field] = tmp[field]
 						} else { //其它字段判断数据类型,不正确舍弃
 							if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 								continue

+ 3 - 0
udpcreateindex/src/biddingindex.go

@@ -54,6 +54,7 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 
 	//bidding库
 	session := mgo.GetMgoConn()
+	qutil.Debug(db, c)
 	count, _ := session.DB(db).C(c).Find(&q).Count()
 	log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
 	n1, n2 := 0, 0
@@ -400,6 +401,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 						if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
 							newTmp[field] = qutil.Int64All(tmp[field])
 						}
+					} else if field == "entidlist" {
+						newTmp[field] = tmp[field]
 					} else { //其它字段判断数据类型,不正确舍弃
 						if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
 							continue

+ 6 - 6
udpcreateindex/src/config.json

@@ -4,14 +4,14 @@
   "uname": "dataAnyWrite",
   "upwd": "data@dataAnyWrite",
   "mongodb": {
-    "addr": "192.168.3.205:27082,192.168.3.205:27083",
+    "addr": "192.168.3.207:27092",
     "pool": 10,
-    "db": "qfw"
+    "db": "qfw_data"
   },
   "savedb": {
     "addr": "192.168.3.207:27092",
     "size": 10,
-    "db": "wjh"
+    "db": "qfw_data"
   },
   "jkmail": {
     "to": "zhangjinkun@topnet.net.cn",
@@ -36,7 +36,7 @@
     "type": "bidding"
   },
   "bidding": {
-    "db": "wjh",
+    "db": "qfw_data",
     "collect": "bidding",
     "index": "bidding",
     "type": "bidding",
@@ -76,7 +76,7 @@
   "filelength": 50000,
   "detaillength": 50000,
   "project": {
-    "db": "wjh",
+    "db": "qfw_data",
     "collect": "projectset",
     "index": "projectset",
     "type": "projectset"
@@ -120,7 +120,7 @@
     }
   },
   "elastic": {
-    "addr": "http://192.168.3.11:9800",
+    "addr": "http://192.168.3.128:9800",
     "index": "bidding",
     "itype": "bidding",
     "pool": 12

+ 3 - 3
udpcreateindex/src/main.go

@@ -67,9 +67,9 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		UserName:	 Sysconfig["uname"].(string),
-		Password:    Sysconfig["upwd"].(string),
-		ReplSet: 	 "bidding",
+		//UserName:	 Sysconfig["uname"].(string),
+		//Password:    Sysconfig["upwd"].(string),
+		//ReplSet: 	 "bidding",
 	}
 	mgo.InitPool()
 	project2db = &mongodb.MongodbSim{