Browse Source

Merge branch 'dev3.2' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.2

fengweiqiang 6 years ago
parent
commit
ca598ebae1
5 changed files with 118 additions and 308 deletions
  1. 20 3
      src/config.json
  2. 51 33
      src/jy/cluster/aliecs.go
  3. 45 264
      src/jy/extract/extractcity.go
  4. 1 7
      src/jy/extract/extractudp.go
  5. 1 1
      src/main_test.go

+ 20 - 3
src/config.json

@@ -35,9 +35,26 @@
         "available": false,
         "AccessID": "",
         "AccessSecret": "",
-        "LaunchTemplateId4": "lt-2ze19qyi8votdjgeq2ma",
-        "LaunchTemplateId8": "lt-2zeidqiydzusn7hw7lt8",
-        "VSwitchId": "vsw-2ze23am2bl9e3v6rnyhfb"
+        "ZoneIds": [
+            {
+                "zoneid": "cn-beijing-f",
+                "LaunchTemplateId4": "lt-2zejb8ayql48hn0hcjpy",
+                "LaunchTemplateId8": "lt-2zegx87hj07phcdtoh61",
+                "vswitchid": "vsw-2zei6snkgmqxcnnx6g04d"
+            },
+            {
+                "zoneid": "cn-beijing-g",
+                "LaunchTemplateId4": "lt-2ze5ktfgopayi48ok0hu",
+                "LaunchTemplateId8": "lt-2ze0qfrxdnkuwldj9s0u",
+                "vswitchid": "vsw-2ze586sxfwsaov4s5w88d"
+            },
+            {
+                "zoneid": "cn-beijing-h",
+                "LaunchTemplateId4": "lt-2ze5ir54gy4ui8okr71f",
+                "LaunchTemplateId8": "	lt-2ze5fzxwgt8jcqczvmjy",
+                "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
+            }
+        ]
     },
     "filelength": 100000,
     "saveblock": true

+ 51 - 33
src/jy/cluster/aliecs.go

@@ -30,50 +30,68 @@ const (
 )
 
 //批量创建实例
-func RunInstances(TaskName, computer, flow string, num, hours int) {
+func RunInstances(taskName, computer, flow string, num, hours int) {
 	if esconfig, ok := ju.Config["esconfig"].(map[string]interface{}); ok {
-		InternetMaxBandwidthOut := "0"
+		widthOut := "0"
 		if flow == "true" {
-			InternetMaxBandwidthOut = "10"
+			widthOut = "10"
 		}
-		log.Println(esconfig["LaunchTemplateId"+computer], InternetMaxBandwidthOut)
+		log.Println(esconfig["LaunchTemplateId"+computer], widthOut)
 		if b, ok := esconfig["available"].(bool); ok && b {
-			res := GET("RunInstances", [][]string{
-				[]string{"RegionId", "cn-beijing"},
-				[]string{"LaunchTemplateId", qu.ObjToString(esconfig["LaunchTemplateId"+computer])},
-				//[]string{"ImageId", "centos_7_06_64_20G_alibase_20181212.vhd"},
-				//[]string{"InstanceType", "ecs.ic5.large"},
-				//[]string{"SecurityGroupId", "sg-bp16x3td2evrejhkshp7"},
-				[]string{"VSwitchId", qu.ObjToString(esconfig["VSwitchId"])},
-				[]string{"InternetMaxBandwidthIn", "50"},
-				[]string{"InternetMaxBandwidthOut", InternetMaxBandwidthOut},
-				[]string{"InstanceChargeType", "PostPaid"},
-				[]string{"SpotStrategy", "SpotWithPriceLimit"},
-				[]string{"SpotPriceLimit", "4.99"},
-				[]string{"InstanceName", "extract"},
-				[]string{"UniqueSuffix", "true"},
-				[]string{"Password", Password},
-				[]string{"Amount", fmt.Sprint(num)},
-				[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).UTC().Format("2006-01-02T15:04:05Z")},
-			})
-			//  {"RequestId":"95653A72-4907-4DD0-86F9-00E216193173","InstanceIdSets":{"InstanceIdSet":["i-2ze0z0xdiqgtwji5jd9s"]}}
-			if tmp, ok := res["InstanceIdSets"].(map[string]interface{}); ok {
-				if t, ok := tmp["InstanceIdSet"].([]interface{}); ok {
-					//实例id持久化
-					for _, v := range t {
-						db.Mgo.Save("ecs", map[string]interface{}{
-							"InstanceId": v,
-							"TaskName":   TaskName,
-							"UseFor":     "extract",
-						})
+			if zoneIds, _ := esconfig["ZoneIds"].([]interface{}); ok {
+				pernum := num / len(zoneIds)
+				if pernum < 1 {
+					kv, _ := zoneIds[len(zoneIds)-1].(map[string]interface{})
+					runInstances(kv, taskName, widthOut, computer, num, hours)
+				} else {
+					for k, v := range zoneIds {
+						if (k == len(zoneIds)-1) && (num%len(zoneIds) != 0) {
+							pernum = num - pernum*(len(zoneIds)-1)
+						}
+						kv, _ := v.(map[string]interface{})
+						runInstances(kv, taskName, widthOut, computer, pernum, hours)
 					}
 				}
 			}
-			log.Println(res)
 		}
 	}
 }
 
+func runInstances(kv map[string]interface{}, taskName, widthOut, computer string, pernum, hours int) {
+	log.Println(kv, taskName, widthOut, computer, pernum, hours)
+	res := GET("RunInstances", [][]string{
+		[]string{"RegionId", "cn-beijing"},
+		[]string{"ZoneId", qu.ObjToString(kv["zoneid"])},
+		[]string{"VSwitchId", qu.ObjToString(kv["vswitchid"])},
+		[]string{"LaunchTemplateId", qu.ObjToString(kv["LaunchTemplateId"+computer])},
+		//[]string{"ImageId", "centos_7_06_64_20G_alibase_20181212.vhd"},
+		//[]string{"InstanceType", "ecs.ic5.large"},
+		//[]string{"SecurityGroupId", "sg-bp16x3td2evrejhkshp7"},[]string{"InternetMaxBandwidthIn", "50"},
+		[]string{"InternetMaxBandwidthOut", widthOut},
+		[]string{"InstanceChargeType", "PostPaid"},
+		[]string{"SpotStrategy", "SpotWithPriceLimit"},
+		[]string{"SpotPriceLimit", "4.99"},
+		[]string{"InstanceName", "extract"},
+		[]string{"UniqueSuffix", "true"},
+		[]string{"Password", Password},
+		[]string{"Amount", fmt.Sprint(pernum)},
+		[]string{"AutoReleaseTime", time.Now().Add(time.Duration(hours) * time.Hour).UTC().Format("2006-01-02T15:04:05Z")},
+	})
+	if tmp, ok := res["InstanceIdSets"].(map[string]interface{}); ok {
+		if t, ok := tmp["InstanceIdSet"].([]interface{}); ok {
+			//实例id持久化
+			for _, v := range t {
+				db.Mgo.Save("ecs", map[string]interface{}{
+					"InstanceId": v,
+					"TaskName":   taskName,
+					"UseFor":     "extract",
+				})
+			}
+		}
+	}
+	log.Println(res)
+}
+
 //查询多台实例的详细信息
 func DescribeInstances() {
 	res := GET("DescribeInstances", [][]string{

+ 45 - 264
src/jy/extract/extractcity.go

@@ -148,7 +148,7 @@ func (e *ExtractTask) ExtractCity(j *ju.Job, resulttmp map[string]interface{}, i
 	//	area, _ := resulttmp["area"].(string)
 	//	city, _ := resulttmp["city"].(string)
 	//	district, _ := resulttmp["district"].(string)
-	//  qu.Debug("之前结果结果===", area, city, district)
+	//	qu.Debug("之前结果结果===", area, city, district)
 	arearesult := ""
 	cityresult := ""
 	districtresult := ""
@@ -156,18 +156,18 @@ func (e *ExtractTask) ExtractCity(j *ju.Job, resulttmp map[string]interface{}, i
 	if len(finishP) == 1 { //最高分一个
 		arearesult = finishP[0] //抽取结果直接赋值
 		cityresult = GetCity(arearesult, cityresult, e, finishC)
-		districtresult = GetDistrict(arearesult, districtresult, e, finishD)
+		cityresult, districtresult = GetDistrict(arearesult, cityresult, districtresult, e, finishD)
 	} else if len(finishP) > 1 { //province最高分多个
 		if len(finishC) == 1 {
 			cityresult = finishC[0]
 			if cfMap := e.CityFullMap[cityresult]; cfMap != nil {
 				arearesult = cfMap.P.Brief
-				districtresult = GetDistrict(arearesult, districtresult, e, finishD)
+				cityresult, districtresult = GetDistrict(arearesult, cityresult, districtresult, e, finishD)
 			}
 		} else { //对应的city有多个(多个province和city)
 			arearesult = finishP[0] //抽取结果直接赋值
 			cityresult = GetCity(arearesult, cityresult, e, finishC)
-			districtresult = GetDistrict(arearesult, districtresult, e, finishD)
+			cityresult, districtresult = GetDistrict(arearesult, cityresult, districtresult, e, finishD)
 		}
 	}
 	//qu.Debug("结果===", arearesult, "--", cityresult, "--", districtresult)
@@ -300,8 +300,7 @@ func (e *ExtractTask) GetCityByOthers(j *ju.Job, sm *SortMap) ([]map[string]stri
 	area2 := []map[string]string{}
 	city2 := []map[string]string{}
 	district2 := []map[string]string{}
-	isExtP := false
-	isExtC := false
+	isExtPC := false
 	for _, from := range sm.Keys { //buyeraddr;title;projectname
 		str, _ := sm.Map[from].(string)
 		//分别记录buyeraddr;title;projectname全称匹配的打分情况
@@ -353,34 +352,32 @@ func (e *ExtractTask) GetCityByOthers(j *ju.Job, sm *SortMap) ([]map[string]stri
 
 		//取最高分的province,city,district
 		ph1 := HighestScore(pscore1)
-		if ph1 != "" {
-			isExtP = true
-		}
 		ch1 := HighestScore(cscore1)
-		if ch1 != "" {
-			isExtC = true
-		}
 		dh1 := HighestScore(dscore1)
-		if dh1 != "" {
-			isExtP = true
-			isExtC = true
+		isMatch := IsMatch(ph1, ch1, e) //最高分p和最高分c可能不对应
+		if ch1 != "" && ph1 != "" && isMatch {
+			isExtPC = true
 		}
+		//是否相互匹配
 		area2 = append(area2, map[string]string{from + "_all": ph1})
 		city2 = append(city2, map[string]string{from + "_all": ch1})
 		district2 = append(district2, map[string]string{from + "_all": dh1})
 		//buyeraddr,title,projectname匹配对应的结果加入最终得分
-		if from == "buyeraddr" || from == "buyer" { //全称匹配,buyeraddr和buyer3分,title和projectname2分
-			PCDScore(j, "province", ph1, 3)
-			PCDScore(j, "city", ch1, 3)
-			PCDScore(j, "district", dh1, 3)
-		} else {
-			PCDScore(j, "province", ph1, 2)
-			PCDScore(j, "city", ch1, 2)
-			PCDScore(j, "district", dh1, 2)
+		if isMatch {
+			if from == "buyeraddr" || from == "buyer" { //全称匹配,buyeraddr和buyer3分,title和projectname2分
+				PCDScore(j, "province", ph1, 3)
+				PCDScore(j, "city", ch1, 3)
+				PCDScore(j, "district", dh1, 3)
+			} else {
+				PCDScore(j, "province", ph1, 2)
+				PCDScore(j, "city", ch1, 2)
+				PCDScore(j, "district", dh1, 2)
+			}
 		}
+
 	}
 	//判断全称是否抽出了province和city,一个未抽出走简称抽取
-	if !isExtP || !isExtC {
+	if !isExtPC {
 		for _, from := range sm.Keys { //buyeraddr;title;projectname
 			str, _ := sm.Map[from].(string)
 			pscore2 := make(map[string]int)
@@ -434,6 +431,19 @@ func (e *ExtractTask) GetCityByOthers(j *ju.Job, sm *SortMap) ([]map[string]stri
 	return area2, city2, district2
 }
 
+func IsMatch(p, c string, e *ExtractTask) bool {
+	ism := false
+	if p != "" && c == "" {
+		return true
+	}
+	if cfMap := e.CityFullMap[c]; cfMap != nil {
+		if cfMap.P.Brief == p {
+			ism = true
+		}
+	}
+	return ism
+}
+
 //计算province,city,district得分
 func PCDScore(j *ju.Job, stype, text string, score int) {
 	defer qu.Catch()
@@ -514,16 +524,24 @@ func GetCity(area, city string, e *ExtractTask, finishC []string) string {
 	return city
 }
 
-func GetDistrict(area, district string, e *ExtractTask, finishD []string) string {
+func GetDistrict(area, city, district string, e *ExtractTask, finishD []string) (string, string) {
 	for _, d := range finishD { //取最高分与province匹配的district
 		if dcMap := e.DistrictCityMap[d]; dcMap != nil {
 			if dcMap.P.Brief == area {
 				district = d
-				break
+				tmpcity := dcMap.Name
+				if city != tmpcity {
+					if cfMap := e.CityFullMap[tmpcity]; cfMap != nil {
+						if cfMap.P.Brief == area {
+							city = tmpcity
+							break
+						}
+					}
+				}
 			}
 		}
 	}
-	return district
+	return city, district
 }
 
 func GetPCDByAreaDFA(province, acd string, e *ExtractTask, j *ju.Job, flag bool) (string, bool) {
@@ -609,240 +627,3 @@ func GetPCDByDistrictDFA(province, city, district, acd string, e *ExtractTask, j
 
 	return province, city, district
 }
-
-//func (e *ExtractTask) TransmitData(resulttmp map[string]interface{}, id string) (bres bool, p, c, d string) {
-//	defer qu.Catch()
-//	province := fmt.Sprint(resulttmp["area"])
-//	city := fmt.Sprint(resulttmp["city"])
-//	fieldval := make([]string, 0)
-//	for _, f := range SortField { //
-//		val := resulttmp[f]
-//		if val == nil {
-//			fieldval = append(fieldval, "")
-//		} else {
-//			fieldval = append(fieldval, fmt.Sprint(val))
-//		}
-//	}
-//	//qu.Debug("fieldval========", fieldval)
-//	bres, c, p = e.ExtractProvinceCity(province, city, id, fieldval) //抽取省和市
-//	//qu.Debug("b--------", bres, "p---------", p, "c-------------", c)
-//	bres, p, c, d = e.ExtractDistrict(fieldval, bres, c, p, id) //抽取区或县
-//	//qu.Debug("bres========", bres, "p===========", p, "c=========", c, "d=============", d)
-//	return
-//}
-
-//抽取城市、省份
-//func (e *ExtractTask) ExtractProvinceCity(province, city, id string, text []string) (bres bool, c, p string) {
-//	defer qu.Catch()
-//	bc := true //是否继续抽取
-//	if city != "" {
-//		lock.Lock()
-//		citybriefmap := e.CityBriefMap[city]
-//		//log.Println("citybriefmap========", citybriefmap)
-//		lock.Unlock()
-//		if citybriefmap == nil { //简称不存在
-//			log.Println("city err:", city, id)
-//		} else { //简称存在
-//			lock.Lock()
-//			pbrief := e.CityBriefMap[city].P.Brief
-//			//log.Println("pbrief========", pbrief)
-//			lock.Unlock()
-//			if province != pbrief { //省份不配对
-//				log.Println("province err:", city, province, id)
-//			} else {
-//				bc = false
-//				//城市省份都正确
-//			}
-//		}
-//	}
-//	//有省份
-//	bp := false
-//	lock.Lock()
-//	provincebriefmap := e.ProvinceBriefMap[province]
-//	//log.Println("provincebriefmap========", provincebriefmap)
-//	lock.Unlock()
-//	if provincebriefmap != nil { //省份简称正确
-//		bp = true
-//	} else { //没有省份,先识别省份
-//		for _, str := range text { //没有省的简称,从配置的字段信息中抽取省
-//			word := e.ProvinceAllGet.CheckSensitiveWord(str) //省全称DFA中匹配
-//			if word != "" {
-//				lock.Lock()
-//				province = e.ProvinceMap[word]
-//				lock.Unlock()
-//				bp = true
-//				break
-//			}
-//		}
-//	}
-//	//匹配城市
-//	if bc { //城市简称不存在CityBrief[city]==nil,或城市简称存在但省份不配对,继续抽取
-//		for pos, GET := range []*ju.DFA{e.CityAllGet, e.CitySimGet} { //AreaGet市全称,AreaSimGet省全称和简称
-//			ws := make([]string, 5)
-//			for n, str := range text {
-//				if str != "" {
-//					word := GET.CheckSensitiveWord(str)
-//					if pos == 1 { //用简称 后辍为路、集团替换
-//						str1 := strings.Replace(str, word+"路", "", 1)
-//						if str1 != str {
-//							word = GET.CheckSensitiveWord(str1)
-//						}
-//					}
-//					ws[n] = word
-//					if word != "" {
-//						lock.Lock()
-//						res := e.AreaToCityMap[word]
-//						lock.Unlock()
-//						if len(res) == 1 {
-//							//判断省份
-//							if !bp || province == res[0].P.Brief { //省份不存在或一致直接返回(!bp:省的简称)
-//								bres = true
-//								c = res[0].Brief
-//								p = res[0].P.Brief
-//								break
-//							} else { //不一致时。。暂时不处理
-//							}
-//						} else { //多个时(出现这种情况是多个省中的市,市名相同。现在的配置文件中已经将市名,县名重复的全部去掉)
-//						}
-//					}
-//				}
-//			}
-//			if !bres { //没有匹配到
-//				mc := map[string]int{}
-//				for _, w := range ws {
-//					lock.Lock()
-//					res := e.AreaToCityMap[w]
-//					lock.Unlock()
-//					for _, ct := range res {
-//						if ct == nil {
-//							continue
-//						}
-//						if bp { //有省份
-//							if ct.P != nil && ct.P.Brief == province {
-//								mc[ct.Brief]++
-//							}
-//						} else { //没有省份
-//							mc[ct.Brief]++
-//						}
-//					}
-//				}
-//				//计算mc中最大值且大于1
-//				max := 1
-//				v := ""
-//				for mk, mv := range mc {
-//					if mv > max {
-//						v = mk
-//					}
-//				}
-//				if v != "" {
-//					bres = true
-//					lock.Lock()
-//					ctb := e.CityBriefMap[v]
-//					lock.Unlock()
-//					c = ctb.Brief
-//					p = ctb.P.Brief
-//				} else if len(mc) > 0 {
-//					//取级别更大的
-//					v := ""
-//					for mk, _ := range mc {
-//						lock.Lock()
-//						cb := e.CityBriefMap[mk]
-//						lock.Unlock()
-//						if cb.P.Cap == mk {
-//							bres = true
-//							c = cb.Brief
-//							p = cb.P.Brief
-//							break
-//						} else {
-//							v = mk
-//						}
-//					}
-//					if !bres {
-//						bres = true
-//						lock.Lock()
-//						cbb := e.CityBriefMap[v]
-//						c = cbb.Brief
-//						p = cbb.P.Brief
-//						lock.Unlock()
-//					}
-//				}
-//			}
-//			if bres {
-//				break
-//			}
-//		}
-//	} else {
-//		return
-//	}
-//	if !bres {
-//		//取默认省会
-//		lock.Lock()
-//		pbp := e.ProvinceBriefMap[province]
-//		lock.Unlock()
-//		if pbp != nil {
-//			bres = true
-//			c = pbp.Cap
-//			p = province
-//		}
-//	}
-//	return
-//}
-//抽取区或县(从配置的字段信息中抽取区或县)
-//func (e *ExtractTask) ExtractDistrict(field []string, bres bool, c, p, id string) (bool, string, string, string) {
-//	d := ""
-//	for _, str := range field {
-//		for pos, GET := range []*ju.DFA{e.DistrictGet, e.StreetGet} { //先匹配区或县再匹配街道
-//			word := GET.CheckSensitiveWord(str)
-//			if word != "" {
-//				if pos == 0 { //区或县匹配
-//					//log.Println("县直接匹配到====", word)
-//					lock.Lock()
-//					city := e.DistrictCityMap[word]
-//					lock.Unlock()
-//					//log.Println("city================", city)
-//					if city != nil {
-//						d = word
-//						ctmp := city.Brief
-//						ptmp := city.P.Brief
-//						//log.Println("ctmpptmp================", ptmp, ctmp, bres)
-//						if !bres { //城市省份没有抽到,通过区或县定位市和省
-//							c = ctmp
-//							p = ptmp
-//							bres = true
-//						} else { //对比抽到的城市省份是否一致
-//							if c != ctmp || p != ptmp {
-//								//log.Println("str---", str, "====", word)
-//								c = ctmp
-//								p = ptmp
-//							}
-//						}
-//					}
-//				} else { //街道匹配
-//					//log.Println("匹配到街道====", word)
-//					lock.Lock()
-//					district := e.StreetDistrictMap[word]
-//					lock.Unlock()
-//					//log.Println("district================", district)
-//					if district != nil {
-//						d = district.Name
-//						ctmp := district.C.Brief
-//						ptmp := district.C.P.Brief
-//						//log.Println("districtptmp================", ctmp, ptmp)
-//						if !bres { //城市省份没有抽到,通过区或县定位市和省
-//							c = ctmp
-//							p = ptmp
-//							bres = true
-//						} else { //对比抽到的城市省份是否一致
-//							if c != ctmp || p != ptmp {
-//								c = ctmp
-//								p = ptmp
-//							}
-//						}
-//					}
-//				}
-//				return bres, p, c, d
-//			}
-//		}
-//	}
-//	return bres, p, c, ""
-//}

+ 1 - 7
src/jy/extract/extractudp.go

@@ -79,13 +79,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			}
 		}
 	case mu.OP_NOOP: //下个节点回应
-		var rep map[string]interface{}
-		err := json.Unmarshal(data, &rep)
-		if err != nil {
-			log.Debug(err)
-		} else {
-			log.Debug(rep)
-		}
+		log.Debug(string(data))
 	}
 }
 

+ 1 - 1
src/main_test.go

@@ -29,7 +29,7 @@ func Test_han(t *testing.T) {
 func Test_task(t *testing.T) {
 	Mgo = MgoFactory(1, 3, 120, "192.168.3.207:27082", "extract_kf")
 	//extract.StartExtractTaskId("5b8f804025e29a290415aee1")5c528686698414055c47b115
-	extract.StartExtractTestTask("5cdd3025698414032c8322b1", "5d25a1dda5cb26b9b7402ae5", "1", "mxs_v1", "mxs_v2")
+	extract.StartExtractTestTask("5cdd3025698414032c8322b1", "5736324a61a0721f15f73188", "1", "mxs_v1", "mxs_v1")
 	//extract.StartExtractTestTask("5c3d75c96984142998eb00e1", "5c2a3d28a5cb26b9b76144dd", "100", "mxs_v3", "mxs_v3")
 	time.Sleep(5 * time.Second)
 }