Browse Source

增量数据添加删除功能

maxiaoshan 5 years ago
parent
commit
c96bf32ad4
2 changed files with 37 additions and 253 deletions
  1. 31 251
      udpcreateindex/src/biddingindex.go
  2. 6 2
      udps/main.go

+ 31 - 251
udpcreateindex/src/biddingindex.go

@@ -31,6 +31,7 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
+
 	//连接信息
 	c, _ := bidding["collect"].(string)
 	extractc, _ := bidding["extractcollect"].(string)
@@ -81,259 +82,38 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 		mgo.DestoryMongoConn(session)
 	}
 	log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2)
-}
-
-// func doIndex1(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
-// 	n1, n2 := 0, 0
-// 	//线程池
-// 	UpdatesLock := sync.Mutex{}
-// 	fields := strings.Split(bidding["fields"].(string), ",")
-// 	//更新数组
-// 	arr := [][]map[string]interface{}{}
-// 	arrEs := []map[string]interface{}{}
-// 	//对比两张表数据,减少查询次数
-// 	var compare bson.M
-// 	log.Println("开始迭代..")
-// 	for n, tmp := range infos {
-// 		n1++
-// 		// if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引
-// 		// 	tmp = make(map[string]interface{})
-// 		// 	continue
-// 		// }
-// 		update := map[string]interface{}{} //要更新的mongo数据
-// 		//对比方法----------------
-// 		tid := qutil.BsonIdToSId(tmp["_id"])
-// 		if eMap[tid] != nil {
-// 			compare = eMap[tid]
-// 			if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引
-// 				tmp = make(map[string]interface{})
-// 				compare = nil
-// 				continue
-// 			}
-// 			delete(eMap, tid)
-// 			//更新bidding表,生成索引
-// 			for _, k := range fields {
-// 				v1 := compare[k] //extract
-// 				v2 := tmp[k]     //bidding
-// 				if v2 == nil && v1 != nil {
-// 					update[k] = v1
-// 				} else if v2 != nil && v1 != nil {
-// 					//update[k+"_b"] = v2
-// 					update[k] = v1
-// 				} else if v2 != nil && v1 == nil {
-// 					//update[k+"_b"] = v2
-// 				}
-// 			}
-// 			if qutil.IntAll(compare["repeat"]) == 1 {
-// 				update["extracttype"] = -1
-// 			} else {
-// 				update["extracttype"] = 1
-// 			}
-// 		} else {
-// 			compare = nil
-// 		}
-// 		//下面可以多线程跑的--->
-// 		//处理分类
-// 		if compare != nil { //extract
-// 			subscopeclass, _ := compare["subscopeclass"].([]interface{})
-// 			if subscopeclass != nil {
-// 				//str := ","
-// 				m1 := map[string]bool{}
-// 				newclass := []string{}
-// 				for _, sc := range subscopeclass {
-// 					sclass, _ := sc.(string)
-// 					if !m1[sclass] {
-// 						m1[sclass] = true
-// 						//str += sclass + ","
-// 						newclass = append(newclass, sclass)
-// 					}
-// 				}
-// 				update["s_subscopeclass"] = strings.Join(newclass, ",")
-// 				update["subscopeclass"] = newclass
-// 			}
-// 			//处理中标企业
-// 			//			winner, _ := compare["winner"].(string)
-// 			//			m1 := map[string]bool{}
-// 			//			if winner != "" {
-// 			//				m1[winner] = true
-// 			//			}
-// 			//			package1 := compare["package"]
-// 			//			if package1 != nil {
-// 			//				packageM, _ := package1.(map[string]interface{})
-// 			//				for _, p := range packageM {
-// 			//					pm, _ := p.(map[string]interface{})
-// 			//					pw, _ := pm["winner"].(string)
-// 			//					if pw != "" {
-// 			//						m1[pw] = true
-// 			//					}
-// 			//				}
-// 			//			}
-// 			compare = nil
-// 			//			if len(m1) > 0 {
-// 			//				//str := ","
-// 			//				winnerarr := []string{}
-// 			//				for k, _ := range m1 {
-// 			//					//str += k + ","
-// 			//					winnerarr = append(winnerarr, k)
-// 			//				}
-// 			//				update["s_winner"] = strings.Join(winnerarr, ",")
-// 			//			}
-// 		}
-// 		//------------------对比结束
 
-// 		//处理key descript
-// 		if bkey == "" {
-// 			DealInfo(&tmp, &update)
-// 		}
-// 		//同时保存到elastic
-// 		for tk, tv := range update {
-// 			tmp[tk] = tv
-// 		}
-// 		if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引
-// 			if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型
-// 				delete(tmp, "supervisorrate")
-// 			}
-// 		}
-// 		//对projectscope字段的索引处理
-// 		ps, _ := tmp["projectscope"].(string)
-// 		//		if ps == "" {
-// 		//			tmp["projectscope"] = "" //= tmp["detail"]
-// 		//		}
-// 		if len(ps) > ESLEN {
-// 			tmp["projectscope"] = string(([]rune(ps))[:4000])
-// 		}
-// 		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-// 		//			tmp["budget"] = nil
-// 		//		} else if sbd, ok := tmp["budget"].(string); ok {
-// 		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-// 		//		}
-// 		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-// 		//			tmp["bidamount"] = nil
-// 		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
-// 		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-// 		//		}
-// 		UpdatesLock.Lock()
-// 		//		for k1, _ := range tmp {
-// 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
-// 		//				delete(tmp, k1)
-// 		//			}
-// 		//		}
-// 		go IS.Add("bidding")
-// 		if qutil.IntAll(update["extracttype"]) != -1 {
-// 			newTmp := map[string]interface{}{}     //最终生索引的数据
-// 			for _, v := range biddingIndexFields { //索引字段
-// 				if tmp[v] != nil {
-// 					if "projectinfo" == v {
-// 						mp, _ := tmp[v].(map[string]interface{})
-// 						if mp != nil {
-// 							newmap := map[string]interface{}{}
-// 							for _, v1 := range projectinfoFields {
-// 								if mp[v1] != nil {
-// 									newmap[v1] = fmt.Sprint(mp[v1])
-// 								}
-// 							}
-// 							if len(newmap) > 0 {
-// 								newTmp[v] = newmap
-// 							}
-// 							// attachments := mp["attachments"]
-// 							// con := ""
-// 							// if attachments != nil {
-// 							// 	am, _ := attachments.(map[string]interface{})
-// 							// 	if am != nil {
-// 							// 		for _, v1 := range am {
-// 							// 			vm, _ := v1.(map[string]interface{})
-// 							// 			if vm != nil {
-// 							// 				c, _ := vm["content"].(string)
-// 							// 				con += c
-// 							// 			}
-// 							// 		}
-// 							// 	}
-// 							// }
-// 							// con = FilterDetailSpace(con)
-// 							// if con != "" {
-// 							// 	newTmp["attachments"] = con
-// 							// }
-// 						}
-// 					} else if v == "purchasinglist" { //标的物处理
-// 						purchasinglist_new := []map[string]interface{}{}
-// 						if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 {
-// 							for _, ls := range pcl {
-// 								lsm_new := make(map[string]interface{})
-// 								lsm := ls.(map[string]interface{})
-// 								for _, pf := range purchasinglistFields {
-// 									if lsm[pf] != nil {
-// 										lsm_new[pf] = lsm[pf]
-// 									}
-// 								}
-// 								if lsm_new != nil && len(lsm_new) > 0 {
-// 									purchasinglist_new = append(purchasinglist_new, lsm_new)
-// 								}
-// 							}
-// 						}
-// 						if len(purchasinglist_new) > 0 {
-// 							newTmp[v] = purchasinglist_new
-// 						}
+	go delEs(mapInfo, index, itype) //删除索引
+}
 
-// 					} else {
-// 						if v == "detail" {
-// 							detail, _ := tmp[v].(string)
-// 							newTmp[v] = FilterDetail(detail)
-// 						} else {
-// 							newTmp[v] = tmp[v]
-// 						}
-// 					}
-// 				}
-// 			}
-// 			arrEs = append(arrEs, newTmp)
-// 		}
-// 		if len(update) > 0 {
-// 			arr = append(arr, []map[string]interface{}{
-// 				map[string]interface{}{
-// 					"_id": tmp["_id"],
-// 				},
-// 				map[string]interface{}{
-// 					"$set": update,
-// 				},
-// 			})
-// 		}
-// 		if len(arr) >= BulkSize-1 {
-// 			mgo.UpdateBulkAll(db, c, arr...)
-// 			arr = [][]map[string]interface{}{}
-// 		}
-// 		if len(arrEs) >= BulkSize-1 {
-// 			tmps := arrEs
-// 			elastic.BulkSave(index, itype, &tmps, true)
-// 			if other_index != "" && other_itype != "" {
-// 				bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-// 			}
-// 			if len(multiIndex) == 2 {
-// 				elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-// 			}
-// 			arrEs = []map[string]interface{}{}
-// 		}
-// 		UpdatesLock.Unlock()
-// 		if n%100 == 0 {
-// 			log.Println("current:", n)
-// 		}
-// 		tmp = make(map[string]interface{})
-// 	}
-// 	UpdatesLock.Lock()
-// 	if len(arr) > 0 {
-// 		mgo.UpdateBulkAll(db, c, arr...)
-// 	}
-// 	if len(arrEs) > 0 {
-// 		tmps := arrEs
-// 		elastic.BulkSave(index, itype, &tmps, true)
-// 		if other_index != "" && other_itype != "" {
-// 			bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
-// 		}
-// 		if len(multiIndex) == 2 {
-// 			elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
-// 		}
-// 	}
-// 	UpdatesLock.Unlock()
-// 	return n1, n2
-// }
+//删除索引
+func delEs(mapInfo map[string]interface{}, index, itype string) {
+	defer qutil.Catch()
+	other_delete := false
+	if other_index != "" && other_itype != "" {
+		other_delete = true
+	}
+	ids := qutil.ObjToString(mapInfo["ids"])
+	idsarr := strings.Split(ids, ",")
+	log.Println("delete ids count:", len(idsarr))
+	n1 := 0
+	for _, id := range idsarr {
+		if id != "" {
+			if elastic.DelById(index, itype, id) {
+				n1++
+			}
+			if other_delete {
+				bidding_other_es.DelById(other_index, other_itype, id)
+			}
+			// go func(id string) { //删除备份库的数据
+			// 	if other_delete {
+			// 		bidding_other_es.DelById(other_index, other_itype, id)
+			// 	}
+			// }(id)
+		}
+	}
+	log.Println("result delete bidding index...over", "all:", n1)
+}
 
 func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) {
 	n1, n2 := 0, 0

+ 6 - 2
udps/main.go

@@ -17,8 +17,8 @@ import (
 var startDate, endDate string
 
 func main() {
-	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param := "", 0, 0, "", "", "", "", "", "", ""
-	flag.StringVar(&startDate, "start", "2020-04-30", "开始日期2006-01-02")
+	ip, p, tmptime, tmpkey, id1, id2, stype, q, bkey, param, ids := "", 0, 0, "", "", "", "", "", "", "", ""
+	flag.StringVar(&startDate, "start", "", "开始日期2006-01-02")
 	flag.StringVar(&endDate, "end", "", "结束日期2006-01-02")
 	flag.StringVar(&ip, "ip", "127.0.0.1", "ip")
 	flag.IntVar(&p, "p", 0, "端口")
@@ -26,6 +26,7 @@ func main() {
 	flag.StringVar(&tmpkey, "tmpkey", "", "时间字段")
 	flag.StringVar(&id1, "gtid", "", "gtid")
 	flag.StringVar(&id2, "lteid", "", "lteid")
+	flag.StringVar(&ids, "ids", "", "id1,id2")
 	flag.StringVar(&stype, "stype", "", "stype,传递类型")
 	flag.StringVar(&bkey, "bkey", "", "bkey,加上此参数表示不生关键词和摘要")
 	flag.StringVar(&q, "q", "", "q查询语句\"{'':''}\",有q就不要gtid,lteid")
@@ -59,6 +60,9 @@ func main() {
 		if bkey != "" {
 			m1["bkey"] = bkey
 		}
+		if ids != "" {
+			m1["ids"] = ids
+		}
 		if q != "" {
 			m1["query"] = mongodb.ObjToMQ(q, true) //qutil.ObjToMap(q)
 		}