Jianghan 4 éve
szülő
commit
0563c6d228

+ 2 - 2
fullproject/src_v1/main.go

@@ -70,7 +70,7 @@ func DealSign() {
 	}
 }
 
-func mainT() {
+func main() {
 	//udp跑增量  id段   project
 	//udp跑全量			qlT
 	//udp跑历史数据  信息id1,id2/或id段  ls
@@ -90,7 +90,7 @@ func mainT() {
 }
 
 //测试组人员使用
-func main() {
+func mainT() {
 	sid = "6062826adeed5af79ca930c9"
 	eid = "6062826adeed5af79ca930ca"
 	//flag.StringVar(&sid, "sid", "", "开始id")

+ 18 - 5
fullproject/src_v1/task.go

@@ -455,7 +455,7 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	countRepeat := 0
 
 	pool := make(chan bool, p.thread)
-	log.Println("start project", q, p.pici)
+	util.Debug("start project", q, p.pici)
 	sess := MongoTool.GetMgoConn()
 	defer MongoTool.DestoryMongoConn(sess)
 	infoPool := make(chan map[string]interface{}, 2000)
@@ -472,14 +472,27 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 					}()
 					p.fillInPlace(tmp)
 					info := ParseInfo(tmp)
-					util.Debug(tmp["projectname"])
-					util.Debug(info.ProjectName)
 					p.currentTime = info.Publishtime
 					//普通合并
 					p.CommonMerge(tmp, info)
 				}(tmp)
-			case <-over:
-				break L
+			default:
+				select {
+				case tmp := <-infoPool:
+					pool <- true
+					go func(tmp map[string]interface{}) {
+						defer func() {
+							<-pool
+						}()
+						p.fillInPlace(tmp)
+						info := ParseInfo(tmp)
+						p.currentTime = info.Publishtime
+						//普通合并
+						p.CommonMerge(tmp, info)
+					}(tmp)
+				case <- over:
+					break L
+				}
 			}
 		}
 	}()

+ 5 - 5
qyxy/src/main.go

@@ -42,8 +42,8 @@ func init() {
 		MongodbAddr: Sysconfig["mgodb"].(string),
 		Size:        qu.IntAllDef(Sysconfig["dbsize"], 5),
 		DbName:      Dbname,
-		//UserName: 	 Sysconfig["uname"].(string),
-		//Password: 	 Sysconfig["upwd"].(string),
+		UserName: 	 Sysconfig["uname"].(string),
+		Password: 	 Sysconfig["upwd"].(string),
 	}
 	Mgo.InitPool()
 	//es
@@ -61,7 +61,7 @@ func init() {
 	EsFields = qu.ObjArrToStringArr(econf["esfields"].([]interface{}))
 	//启动es保存
 	go SaveEs()    //过滤后数据
-	go SaveAllEs() //所有数据
+	//go SaveAllEs() //所有数据
 	//初始化其他信息
 	TaskTime = qu.IntAll(Sysconfig["tasktime"])
 	//Updatetime = qu.Int64All(Sysconfig["updatetime"])
@@ -71,9 +71,9 @@ func init() {
 }
 
 func main() {
-	//go TimeTask()
+	go TimeTask()
 	//QyxyStandard()
-	HistoryQyxyStandard()
+	//HistoryQyxyStandard()
 	ch := make(chan bool, 1)
 	<-ch
 }

+ 12 - 16
qyxy/src/task.go

@@ -60,7 +60,6 @@ type City struct {
 
 //定时任务
 func TimeTask() {
-	StartTask()
 	c := cron.New()
 	cronstr := "0 0 15 ? * Tue" //每周二15点执行
 	//cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime小时执行一次
@@ -392,7 +391,7 @@ func QyxyStandard() bool {
 				mgoMap[k] = v
 			}
 			//es数据过滤
-			EsSaveFlag := true
+			//EsSaveFlag := true
 			company_type := qu.ObjToString(esMap["company_type"])
 			company_name := qu.ObjToString(esMap["company_name"])
 			if company_type == "个体工商户" {
@@ -417,21 +416,19 @@ func QyxyStandard() bool {
 				}
 			}
 			lock.Lock()
-			if EsSaveFlag {
-				if esMap["history_name"] != nil {
-						var nameArr []string
-						for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
-							if v != "" {
-								nameArr = append(nameArr, v)
-							}
-						}
-						if len(nameArr) > 0 {
-							esMap["history_name"] = nameArr
-						}
+			if esMap["history_name"] != nil {
+				var nameArr []string
+				for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
+					if v != "" {
+						nameArr = append(nameArr, v)
+					}
+				}
+				if len(nameArr) > 0 {
+					esMap["history_name"] = nameArr
 				}
-				EsSaveCache <- esMap //过滤后数据保存
 			}
-			EsSaveAllCache <- esMap //所有数据保存
+			EsSaveCache <- esMap //过滤后数据保存
+			//EsSaveAllCache <- esMap //所有数据保存
 			update = append(update, map[string]interface{}{"$set": mgoMap})
 			SaveHistoryName(tmp)		//保存曾用名
 			if len(update) == 2 {
@@ -846,7 +843,6 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					qu.Debug(Index, Itype, arru)
 					Es.BulkSave(Index, Itype, &arru, true)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 500)

+ 2 - 2
qyxy_change/qy_baidu/main.go

@@ -37,8 +37,8 @@ func init() {
 		MongodbAddr: util.ObjToString((*pingan)["dbServer"]),
 		Size:        util.IntAll((*pingan)["dbSize"]),
 		DbName:      Dbname_pa,
-		UserName: 	 util.ObjToString((*pingan)["uname"]),
-		Password: 	 util.ObjToString((*pingan)["upwd"]),
+		//UserName: 	 util.ObjToString((*pingan)["uname"]),
+		//Password: 	 util.ObjToString((*pingan)["upwd"]),
 	}
 	MgoMix.InitPool()
 

+ 2 - 4
qyxy_change/qy_baidu/task.go

@@ -14,7 +14,7 @@ import (
 )
 
 func TimeTask() {
-	//GetPaData()
+	GetPaData()
 	c := cron.New()
 	cronstrBd := "0 0 */" + fmt.Sprint(BdTaskTime) + " * * ?" 		//每TaskTime小时执行一次
 	//cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?"			//每天TaskTime跑一次
@@ -46,7 +46,6 @@ func GetBdData() {
 
 func GetPaData() {
 	count := 0
-	lastid := ""
 	sess := MgoMix.GetMgoConn()
 	defer MgoMix.DestoryMongoConn(sess)
 	fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1, "company_type": 1, "establish_date": 1, "create_time": 1}
@@ -55,9 +54,8 @@ func GetPaData() {
 	util.Debug("ping an count ------", c)
 	tmp := make(map[string]interface{})
 	for query.Next(&tmp) {
-		lastid = mongodb.BsonIdToSId(tmp["company_id"])
 		if count%1000 == 0 {
-			util.Debug("ping an ----current-----", count, lastid)
+			util.Debug("ping an ----current-----", count)
 		}
 		if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
 			continue

+ 4 - 4
udpcreateindex/src/biddingall.go

@@ -97,10 +97,10 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 		//	}
 		//}
 
-		// if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
-		// 	tmp = make(map[string]interface{})
-		// 	continue
-		// }
+		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+			tmp = make(map[string]interface{})
+			continue
+		}
 		update := map[string]interface{}{}
 		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
 		//对比方法----------------

+ 5 - 4
udpcreateindex/src/biddingindex.go

@@ -143,10 +143,10 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	log.Println("开始迭代..")
 	for n, tmp := range infos {
 		n1++
-		// if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引
-		// 	tmp = make(map[string]interface{})
-		// 	continue
-		// }
+		if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
+			tmp = make(map[string]interface{})
+			continue
+		}
 		update := map[string]interface{}{} //要更新的mongo数据
 		//对比方法----------------
 		tid := mongodb.BsonIdToSId(tmp["_id"])
@@ -545,6 +545,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	}
 	if len(arrEs) > 0 {
 		tmps := arrEs
+		log.Println("---", tmps[0])
 		elastic.BulkSave(index, itype, &tmps, true)
 		if other_index != "" && other_itype != "" {
 			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)

+ 2 - 2
udpcreateindex/src/config.json

@@ -98,7 +98,7 @@
     "type": "qyxy_ent"
   },
   "standard": {
-    "addr": "192.168.3.205:27082,192.168.3.205:27083",
+    "addr": "192.168.3.207:27092",
     "pool": 10,
     "db": "mixdata",
     "winnerent": {
@@ -120,7 +120,7 @@
     }
   },
   "elastic": {
-    "addr": "http://127.0.0.1:9800",
+    "addr": "http://192.168.3.11:9800",
     "index": "bidding",
     "itype": "bidding",
     "pool": 12

+ 5 - 5
udpcreateindex/src/main.go

@@ -65,9 +65,9 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		UserName:	 Sysconfig["uname"].(string),
-		Password:    Sysconfig["upwd"].(string),
-		ReplSet: 	 "bidding",
+		//UserName:	 Sysconfig["uname"].(string),
+		//Password:    Sysconfig["upwd"].(string),
+		//ReplSet: 	 "bidding",
 	}
 	mgo.InitPool()
 	project2db = &mongodb.MongodbSim{
@@ -104,8 +104,8 @@ func init() {
 		MongodbAddr: standard["addr"].(string),
 		Size:        util.IntAllDef(standard["pool"], 5),
 		DbName:      standard["db"].(string),
-		UserName:    Sysconfig["uname"].(string),
-		Password:    Sysconfig["upwd"].(string),
+		//UserName:    Sysconfig["uname"].(string),
+		//Password:    Sysconfig["upwd"].(string),
 	}
 	mgostandard.InitPool()
 

+ 1 - 1
udpcreateindex/src/util/ossclient.go

@@ -11,7 +11,7 @@ import (
 )
 
 var (
-	ossEndpoint        = "ss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
+	ossEndpoint        = "oss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
 	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"
 	ossAccessKeySecret = "Bk98FsbPYXcJe72n1bG3Ssf73acuNh"
 	ossBucketName      = "topjy"