Jianghan 4 年之前
父节点
当前提交
74e84bba8a

+ 15 - 14
fullproject/src_v1/init.go

@@ -265,22 +265,22 @@ type Info struct {
 	ContractGuarantee   bool                     `json:"contract_guarantee"`
 	BidGuarantee        bool                     `json:"bid_guarantee"`
 	Qualifies           []map[string]interface{} `json:"qualifies"`
+	EntIdList           []string                 `json:"entidlist"`
 	HasPackage          bool                     // `json:"haspackage"`
 	Package             map[string]interface{}   `json:"package"`
-	//PNum          string                 `json:"pnum"`
-	Topscopeclass []string `json:"topscopeclass"`
-	Subscopeclass []string `json:"subscopeclass"`
-	Buyerclass    string   `json:"buyerclass"`
-	Bidopentime   int64    `json:"bidopentime"`
-	Budget        float64  `json:"budget"`
-	Bidamount     float64  `json:"bidamount"`
-	Winners       []string
-	dealtype      int
-	PTC           string //从标题中抽的项目编号
-	pnbval        int    //项目名称、编号、采购单位存在的个数
-	LenPC         int    //项目编号长度
-	LenPN         int    //项目名称长度
-	LenPTC        int    //标题抽的项目编号长度
+	Topscopeclass       []string                 `json:"topscopeclass"`
+	Subscopeclass       []string                 `json:"subscopeclass"`
+	Buyerclass          string                   `json:"buyerclass"`
+	Bidopentime         int64                    `json:"bidopentime"`
+	Budget              float64                  `json:"budget"`
+	Bidamount           float64                  `json:"bidamount"`
+	Winners             []string
+	dealtype            int
+	PTC                 string //从标题中抽的项目编号
+	pnbval              int    //项目名称、编号、采购单位存在的个数
+	LenPC               int    //项目编号长度
+	LenPN               int    //项目名称长度
+	LenPTC              int    //标题抽的项目编号长度
 	//以下三个元素做对比,计算包含时候使用
 	PNBH  int //0初始,+包含,-被包含
 	PCBH  int
@@ -329,6 +329,7 @@ type ProjectInfo struct {
 	ContractGuarantee  bool                   `json:"contract_guarantee"`      //履约保证金 是否支持包含
 	BidGuarantee       bool                   `json:"bid_guarantee"`           //投标保证金 是否支持包含
 	Qualifies          string                 `json:"qualifies"`               //资质条件
+	EntIdList          []string               `json:"entidlist"`               //企业id
 	score              int
 	comStr             string
 	resVal, pjVal      int

+ 4 - 0
fullproject/src_v1/mgotool.go

@@ -123,6 +123,10 @@ func (ms *MgoSess) Iter() *MgoIter {
 	return it
 }
 
+func (ms *MgoSess) Count() (int64, error) {
+	return ms.M.C.Database(ms.Db).Collection(ms.Coll).CountDocuments(ms.M.Ctx, ms.Query)
+}
+
 type MongodbSim struct {
 	MongodbAddr string
 	Size        int

+ 11 - 0
fullproject/src_v1/project.go

@@ -226,6 +226,7 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 
 	if !bFindProject {
 		if !IsCreatePro(info) {
+			qu.Debug("舍弃数据---", info.Id)
 			return
 		}
 		id, p1 := p.NewProject(tmp, info)
@@ -693,6 +694,7 @@ var INFOFIELDS = []string{
 	"contract_guarantee",
 	"bid_guarantee",
 	"qualifies",
+	"entidlist",
 }
 
 //项目中list的信息
@@ -736,6 +738,7 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 		Bidstatus:     bidstatus,
 		Bidtype:       bidtype,
 		Winners:       thisinfo.Winners,
+		EntIdList:     thisinfo.EntIdList,
 	}
 	if thisinfo.LenPTC > 5 {
 		p1.MPC = append(p1.MPC, thisinfo.PTC)
@@ -1069,6 +1072,14 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 			set["qualifies"] = strings.Join(str, ",")
 		}
 	}
+	if len(thisinfo.EntIdList) > 0 {
+		for _, v := range thisinfo.EntIdList{
+			if BinarySearch(pInfo.EntIdList, v) == -1 {
+				pInfo.EntIdList = append(pInfo.EntIdList, v)
+			}
+		}
+		set["entidlist"] = pInfo.EntIdList
+	}
 
 	if thisinfo.HasPackage { //多包处理
 		set["multipackage"] = 1

+ 6 - 5
fullproject/src_v1/task.go

@@ -457,7 +457,7 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 		p.Brun = false
 	}()
 	p.Brun = true
-	count, taskcount := 0, 0
+	count := 0
 	countRepeat := 0
 
 	pool := make(chan bool, p.thread)
@@ -472,7 +472,6 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 			select {
 			case tmp := <-infoPool:
 				pool <- true
-				taskcount++
 				go func(tmp map[string]interface{}) {
 					defer func() {
 						<-pool
@@ -481,7 +480,6 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 						if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 1 {
 							//增量	dataging为1不参与合并
 							util.Debug("增量   dataging == 1 ", tmp["_id"])
-							countRepeat++
 							return
 						}
 
@@ -504,12 +502,15 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	//	"projectcode": 1, "buyerclass": 1, "winner": 1, "buyer": 1, "buyerperson": 1, "buyertel": 1, "infoformat": 1, "toptype": 1, "subtype": 1, "spidercode": 1, "projectscope": 1, "contractcode": 1,
 	//	"site": 1, "topscopeclass": 1, "subscopeclass": 1, "bidamount": 1, "budget": 1, "agency": 1, "package": 1, "jsondata": 1, "review_experts": 1, "purchasing": 1, "winnerorder": 1}
 	fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0}
+	if p.currentType == "project" {
+		c, _ := sess.DB(db).C(coll).Find(q).Count()
+		util.Debug("共查询:", c, "条")
+	}
 	ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
 	if Sysconfig["hints"] != nil {
 		ms.Hint(Sysconfig["hints"])
 	}
 	query := ms.Iter()
-	//query := sess.DB(db).C(coll).Find(q).Sort("publishtime").Iter()
 	var lastid interface{}
 L:
 	for {
@@ -539,7 +540,7 @@ L:
 	for n := 0; n < p.thread; n++ {
 		pool <- true
 	}
-	log.Println("所有线程执行完成...", count, taskcount, countRepeat)
+	log.Println("所有线程执行完成...", count, countRepeat)
 
 }
 

+ 120 - 30
qyxy/src/task.go

@@ -210,16 +210,6 @@ func QyxyStandard() bool {
 							esMap[field] = "其他"
 						}
 					}
-				} else if field == "history_name" {
-					var nameArr []string
-					for _, v := range strings.Split(qu.ObjToString(tmp[field]), ";") {
-						if v != "" {
-							nameArr = append(nameArr, v)
-						}
-					}
-					if len(nameArr) > 0 {
-						esMap[field] = nameArr
-					}
 				} else if strings.Contains(field, "date") || strings.Contains(field, "time") { //时间处理
 					if tmp[field] != nil {
 						if timeTmp, ok := tmp[field].(primitive.DateTime); ok {
@@ -384,7 +374,6 @@ func QyxyStandard() bool {
 					continue
 				}
 				mgoMap[k] = v
-
 			}
 			//es数据过滤
 			EsSaveFlag := true
@@ -398,6 +387,12 @@ func QyxyStandard() bool {
 					EsSaveFlag = false
 				}
 			}
+			if EsSaveFlag {
+				status := qu.ObjToString(esMap["company_status"])
+				if status != "正常" {
+					EsSaveFlag = false
+				}
+			}
 			if EsSaveFlag {
 				credit_no := strings.TrimSpace(qu.ObjToString(esMap["credit_no"]))
 				company_code := strings.TrimSpace(qu.ObjToString(esMap["company_code"]))
@@ -405,14 +400,24 @@ func QyxyStandard() bool {
 					EsSaveFlag = false
 				}
 			}
-			// qu.Debug("mgoMap---", mgoMap)
-			// return
 			lock.Lock()
 			if EsSaveFlag {
+				if esMap["history_name"] != nil {
+						var nameArr []string
+						for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
+							if v != "" {
+								nameArr = append(nameArr, v)
+							}
+						}
+						if len(nameArr) > 0 {
+							esMap["history_name"] = nameArr
+						}
+				}
 				EsSaveCache <- esMap //过滤后数据保存
 			}
 			EsSaveAllCache <- esMap //所有数据保存
 			update = append(update, map[string]interface{}{"$set": mgoMap})
+			SaveHistoryName(tmp)
 			if len(update) == 2 {
 				arr = append(arr, update)
 			}
@@ -436,7 +441,7 @@ func QyxyStandard() bool {
 }
 
 //所有企业数据标准化
-func HistoryQyxyStandard() bool {
+func HistoryQyxyStandard() {
 	qu.Debug("--------History--------")
 	defer qu.Catch()
 	sess := Mgo.GetMgoConn()
@@ -445,16 +450,13 @@ func HistoryQyxyStandard() bool {
 	pool := make(chan bool, 20) //控制线程数
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{} //控制读写
-	//	arr := [][]map[string]interface{}{}
-	// count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
-	// log.Println("共查询:", count, "条")
-	// if count == 0 {
-	// 	return false
-	// }
+	arr := [][]map[string]interface{}{}
+	//count, _ := sess.DB(Dbname).C(Dbcoll).Find(nil).Count()
+	//log.Println("共查询:", count, "条")
 	it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
 	sum := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
-		if sum%10000 == 0 {
+		if sum%100 == 0 {
 			log.Println("current:", sum)
 		}
 		pool <- true
@@ -657,8 +659,8 @@ func HistoryQyxyStandard() bool {
 			tmpArrMgo := []map[string]interface{}{}
 			if annual_reports, ok := tmp["annual_reports"].([]interface{}); ok && len(annual_reports) > 0 {
 				for _, annual_report := range annual_reports {
-					tmpMapMgo := map[string]interface{}{}
-					tmpMap := map[string]interface{}{}
+					tmpMapMgo := map[string]interface{}{} //记录每个年报信息标准化到mgo的数据
+					tmpMap := map[string]interface{}{}    //只记录每个年报信息的company_email和company_phone
 					report_year := ""
 					m := annual_report.(map[string]interface{})
 					for i, tmpArr := range AnnualReportsArr {
@@ -717,15 +719,75 @@ func HistoryQyxyStandard() bool {
 					esMap[k] = v
 				}
 			}
+			//合并
+			for k, v := range esMap {
+				if k == "partners" {
+					continue
+				}
+				mgoMap[k] = v
+			}
+			//es数据过滤
+			EsSaveFlag := true
+			company_name := qu.ObjToString(esMap["company_name"])
+			if len([]rune(company_name)) < 8 {
+				EsSaveFlag = false
+			}
+			if EsSaveFlag {
+				company_type := qu.ObjToString(esMap["company_type"])
+				if company_type == "" || company_type == "个体工商户" {
+					EsSaveFlag = false
+				}
+			}
+			if EsSaveFlag {
+				status := qu.ObjToString(esMap["company_status"])
+				if status != "正常" {
+					EsSaveFlag = false
+				}
+			}
+			if EsSaveFlag {
+				credit_no := strings.TrimSpace(qu.ObjToString(esMap["credit_no"]))
+				company_code := strings.TrimSpace(qu.ObjToString(esMap["company_code"]))
+				if credit_no == "" && company_code == "" {
+					EsSaveFlag = false
+				}
+			}
 			lock.Lock()
-			EsSaveCache <- esMap
+			if EsSaveFlag {
+				if esMap["history_name"] != nil {
+					var nameArr []string
+					for _, v := range strings.Split(qu.ObjToString(esMap["history_name"]), ";") {
+						if v != "" {
+							nameArr = append(nameArr, v)
+						}
+					}
+					if len(nameArr) > 0 {
+						esMap["history_name"] = nameArr
+					}
+				}
+				EsSaveCache <- esMap //过滤后数据保存
+			}
+			EsSaveAllCache <- esMap //所有数据保存
+			SaveHistoryName(tmp)
+			update = append(update, map[string]interface{}{"$set": mgoMap})
+			if len(update) == 2 {
+				arr = append(arr, update)
+			}
+			if len(arr) > 500 {
+				tmps := arr
+				Mgo.UpSertBulk(Savecoll, tmps...)
+				arr = [][]map[string]interface{}{}
+			}
 			lock.Unlock()
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
+	lock.Lock()
+	if len(arr) > 0 {
+		Mgo.UpSertBulk(Savecoll, arr...)
+	}
+	lock.Unlock()
 	log.Println("Run Over...Count:", sum)
-	return true
 }
 
 //过滤后数据存库
@@ -765,6 +827,34 @@ func SaveEs() {
 	}
 }
 
+func SaveHistoryName(tmp map[string]interface{}) {
+	if qu.ObjToString(tmp["company_name"]) != "" {
+		set := make(map[string]interface{})
+		set["company_name"] = qu.ObjToString(tmp["company_name"])
+		set["credit_no"] = tmp["credit_no"]
+		set["org_code"] = tmp["org_code"]
+		set["company_id"] = tmp["company_id"]
+		set["company_type"] = tmp["company_type"]
+		set["company_status"] = tmp["company_status"]
+		set["company_code"] = tmp["company_code"]
+		Mgo.Update("qyxy_historyname", map[string]interface{}{"company_name": qu.ObjToString(tmp["company_name"])}, map[string]interface{}{"$set": set}, true, false)
+		if qu.ObjToString(tmp["history_name"]) != "" {
+			for _, v := range strings.Split(qu.ObjToString(tmp["history_name"]), ";") {
+				if v != "" {
+					set["company_name"] = v
+					set["credit_no"] = tmp["credit_no"]
+					set["org_code"] = tmp["org_code"]
+					set["company_id"] = tmp["company_id"]
+					set["company_type"] = tmp["company_type"]
+					set["company_status"] = tmp["company_status"]
+					set["company_code"] = tmp["company_code"]
+					Mgo.Update("qyxy_historyname", map[string]interface{}{"company_name": v}, map[string]interface{}{"$set": set}, true, false)
+				}
+			}
+		}
+	}
+}
+
 //所有数据存库
 func SaveAllEs() {
 	log.Println("Es SaveAll...")
@@ -806,13 +896,13 @@ func InitAddress() {
 	log.Println("Init Address...")
 	AddressMap = map[string]*City{}
 	AddressOldMap = map[string]*City{}
-	address, _ := Mgo.Find("address", nil, nil, nil, false, -1, -1)
+	address, _ := Mgo.Find("address_new_2020", nil, nil, nil, false, -1, -1)
 	for _, tmp := range *address {
 		code := qu.ObjToString(tmp["code"])
-		// codeLen := len(code)
-		// if t_code := CodeMap[codeLen]; t_code != "" { //新的address表补齐code
-		// 	code = code + t_code
-		// }
+		codeLen := len(code)
+		if t_code := CodeMap[codeLen]; t_code != "" { //新的address表补齐code
+			code = code + t_code
+		}
 		remark := fmt.Sprint(tmp["Remarks"])
 		city := &City{}
 		tmpjson, err := json.Marshal(tmp)

+ 1 - 7
udpcreateindex/src/biddingindex.go

@@ -127,7 +127,6 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 // }
 
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
-	qutil.Debug(len(infos))
 	n1, n2 := 0, 0
 	//线程池
 	UpdatesLock := sync.Mutex{}
@@ -190,7 +189,6 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//下面可以多线程跑的--->
 		//处理分类
 		if compare != nil { //extract
-			qutil.Debug("111111111111111111")
 			subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
 			if subscopeclass != nil {
 				//str := ","
@@ -255,7 +253,6 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		if bkey == "" {
 			DealInfo(&tmp, &update)
 		}
-		qutil.Debug(2222222222222222)
 		//同时保存到elastic
 		for tk, tv := range update {
 			tmp[tk] = tv
@@ -265,13 +262,12 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			var cid []string
 			for _, w := range sWinnerarr {
 				if w != "" {
-					ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"compnay_name": w})
+					ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
 					if len(*ent) > 0 {
 						cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
 					}
 				}
 			}
-			qutil.Debug(5555555555555)
 			if len(cid) > 0 {
 				tmp["entidlist"] = cid
 				update["entidlist"] = cid
@@ -315,7 +311,6 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//		}
 		go IS.Add("bidding")
 		if qutil.IntAll(update["extracttype"]) != -1 {
-			qutil.Debug(44444444444444)
 			newTmp := map[string]interface{}{}                //最终生索引的数据
 			for field, ftype := range biddingIndexFieldsMap { //
 				if tmp[field] != nil { //
@@ -500,7 +495,6 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			// 	}
 			// }
 			arrEs = append(arrEs, newTmp)
-			qutil.Debug(newTmp)
 		}
 		if len(update) > 0 {
 			delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除