瀏覽代碼

Merge branch 'dev3.4' of http://39.105.157.10:10080/qmx/jy-data-extract into dev3.4

zhangjinkun 5 年之前
父節點
當前提交
624a921586

+ 1 - 1
src/jy/extract/score_jsondata.go

@@ -12,7 +12,7 @@ import (
 )
 
 func JsonDataMergeProcessing(j *util.Job, e *ExtractTask) map[string][]*util.ExtField {
-	if len(j.Result) <= 0 || j.Jsondata == nil || len(*j.Jsondata) <= 0 || j.Site =="中国政府采购网"{
+	if len(j.Result) <= 0 || j.Jsondata == nil || len(*j.Jsondata) <= 0 {
 		return j.Result
 	}
 	jdextweight := util2.IntAll((*j.Jsondata)["extweight"])

+ 1 - 0
udp_winner/config.json

@@ -4,6 +4,7 @@
   "elasticsearch_type": "winnerent",
   "udpport": "127.0.0.1:12311",
   "port": "12311",
+  "pool_size": "10",
   "mgoinit": "192.168.3.207:27081",
   "mgodb_bidding": "qfw",
   "mgodb_mgoinit_c": "bidding",

+ 215 - 218
udp_winner/main.go

@@ -1,14 +1,11 @@
 package main
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
-	"go.mongodb.org/mongo-driver/mongo"
-	"go.mongodb.org/mongo-driver/mongo/options"
 	es "gopkg.in/olivere/elastic.v1"
 	"log"
 	mu "mfw/util"
@@ -23,11 +20,10 @@ import (
 	"time"
 )
 
-var
-(
+var (
 	Config                                = make(map[string]string)
 	Fields                                []string
-	SourceClient, FClient                 *mongo.Client
+	SourceClient, FClient                 *MongodbSim
 	RedisPool                             redis.Pool
 	Addrs                                 = make(map[string]interface{}, 0) //省市县
 	udpclient                             mu.UdpClient                      //udp对象
@@ -38,10 +34,11 @@ var
 	EsConn                                *es.Client
 	Updport                               int
 )
+
 /**
 新增
 初始化
- */
+*/
 func init() {
 	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
 	util.ReadConfig(&Config)
@@ -50,50 +47,37 @@ func init() {
 		"establish_date", "legal_person", "company_type", "district", "city", "province", "area_code", "credit_no",
 		"company_name", "history_name", "topscopeclass", "wechat_accounts", "alias", "website", "report_websites"}
 	var err error
-	//mongo init
-	SourceClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgoinit"]).SetMaxPoolSize(20).SetMaxConnIdleTime(time.Hour * 24))
-	if err != nil {
-		log.Fatalln(err)
-	}
-	c1, _ := context.WithTimeout(context.Background(), 9999*time.Hour)
+	pool_size, _ := strconv.Atoi(Config["pool_size"])
 
-	err = SourceClient.Connect(c1)
-	//defer SourceClient.Disconnect(c1)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	FClient, err = mongo.NewClient(options.Client().ApplyURI("mongodb://" + Config["mgourl"]).SetMaxPoolSize(20).SetMaxConnIdleTime(time.Hour * 999999))
-	if err != nil {
-		log.Fatalln(err)
-	}
-	err = FClient.Connect(c1)
-	//defer FClient.Disconnect(cc)
-	if err != nil {
-		log.Fatalln(err)
-	}
+	//mongo init
+	SourceClient = new(MongodbSim)
+	SourceClient.MongodbAddr = Config["mgoinit"]
+	SourceClient.Size = pool_size
+	//mongodbSim.DbName = "qfw"
+	SourceClient.InitPool()
+	SourceClientmgoConn := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientmgoConn)
+	FClient = new(MongodbSim)
+	FClient.MongodbAddr = Config["mgourl"]
+	FClient.Size = pool_size
+	FClient.DbName =Config["mgodb_extract_kf"]
+	//mongodbSim.DbName = "qfw"
+	FClient.InitPool()
+	FClientmgoConn := FClient.GetMgoConn()
+	defer FClient.DestoryMongoConn(FClientmgoConn)
 	//加载省市县代码
-	cursor2, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("address").Find(c1, bson.M{},
-		options.Find().SetProjection(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}))
+	cursor2 := FClientmgoConn.DB(Config["mgodb_extract_kf"]).C("address").Find(bson.M{}).Select(bson.M{"province": 1, "code": 1, "city": 1, "district": 1}).Iter()
 	//defer FClient.Connect(cc)
-	if err != nil {
-		log.Fatalln(err)
+	if cursor2 == nil {
+		log.Fatalln(cursor2)
 	}
-	for cursor2.Next(context.TODO()) {
-		if err := cursor2.Err(); err != nil {
-			log.Println("cursor.Err();", err)
-		}
-		tmp := make(map[string]interface{})
-		if err := cursor2.Decode(&tmp); err != nil {
-			log.Println(err)
-			continue
-		} else {
-			code := tmp["code"]
-			if code != nil && strings.TrimSpace(code.(string)) != "" {
-				Addrs[fmt.Sprint(code)] = tmp
-			}
+	tmp := make(map[string]interface{})
+	for cursor2.Next(&tmp) {
+		code := tmp["code"]
+		if code != nil && strings.TrimSpace(code.(string)) != "" {
+			Addrs[fmt.Sprint(code)] = tmp
 		}
 	}
-	defer cursor2.Close(context.TODO())
 	log.Println(len(Addrs))
 	//es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
 	//es init
@@ -114,7 +98,7 @@ func init() {
 				return nil, err
 			}
 			return conn, nil
-		},}
+		}}
 	c := RedisPool.Get()
 	if _, err := c.Do("PING"); err != nil {
 		log.Fatalln("redis err:", err)
@@ -129,7 +113,7 @@ func main() {
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
 	log.Println("Udp服务监听", updport)
-	log.Println("发送端口port:",Updport)
+	log.Println("发送端口port:", Updport)
 	go TimedTask() //定时任务
 	c := make(chan int, 1)
 	<-c
@@ -148,7 +132,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
 			return
 		} else if tmp != nil {
-			udpclient.WriteUdp([]byte("ok,run"), mu.OP_NOOP, ra)
+			if key,ok := (*tmp)["key"].(string);ok{
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+			}else {
+				udpclient.WriteUdp([]byte("udpok"), mu.OP_NOOP, ra)
+			}
 			go task(tmp)
 		}
 	case mu.OP_NOOP: //下个节点回应
@@ -172,205 +160,214 @@ func task(mapinfo *map[string]interface{}) {
 	// topscopeclass项目类型-industry行业类型&&topscopeclass联系人项目类型
 	// (area地区-province省份 city城市-city城市 district区县-district区县)
 	// winneraddr-company_address企业地址
-	cursor, err := SourceClient.Database(Config["mgodb_bidding"]).Collection(Config["mgodb_mgoinit_c"]).Find(context.Background(), bson.M{
+	SourceClientcc := SourceClient.GetMgoConn()
+	defer SourceClient.DestoryMongoConn(SourceClientcc)
+	cursor := SourceClientcc.DB(Config["mgodb_bidding"]).C(Config["mgodb_mgoinit_c"]).Find(bson.M{
 		"_id": bson.M{
 			"$gte": GId,
 			"$lte": LtId,
 		},
-	}, options.Find().SetProjection(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
-		"topscopeclass": 1, "winneraddr": 1}).SetSort(bson.M{"_id": 1}).SetBatchSize(60000000).SetMaxTime(time.Hour*24))
-	if err != nil {
-		log.Println(err)
+	}).Select(bson.M{"winner": 1, "winnertel": 1, "winnerperson": 1,
+		"topscopeclass": 1, "winneraddr": 1}).Iter()
+	if cursor == nil {
+		log.Println(cursor)
 		return
 	}
-	overid := ""
-	c2, _ := context.WithTimeout(context.Background(), 9999*time.Hour)
-	for cursor.Next(c2) {
-		if err := cursor.Err(); err != nil {
-			log.Println("cursor.Err();", err)
+	overid := gtid
+	tmp := map[string]interface{}{}
+	for cursor.Next(&tmp) {
+		overid = tmp["_id"].(primitive.ObjectID).Hex()
+		log.Println(tmp["_id"])
+		if tmp["winner"] == nil || tmp["winner"] == "" {
+			continue
 		}
-		tmp := map[string]interface{}{}
-		if err := cursor.Decode(&tmp); err == nil {
-			if tmp["winner"] == nil || tmp["winner"] == "" {
-				continue
+		//redis查询是否存在
+		rdb := RedisPool.Get()
+		if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
+			//redis不存在存到临时表,定时任务处理
+			FClient.DbName = Config["mgodb_extract_kf"]
+			if tmpid := FClient.Save("winner_new", tmp) ;tmpid==nil{
+				log.Println("FClient.Save err",tmpid)
 			}
-			//redis查询是否存在
-			rdb := RedisPool.Get()
-			if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err != nil {
-				//redis不存在存到临时表,定时任务处理
-				if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").InsertOne(context.TODO(), tmp); err != nil {
-					log.Println(err, tmp)
-				}
-				//log.Println("get redis id err:定时任务处理", err, tmp)
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
+			//log.Println("get redis id err:定时任务处理", err, tmp)
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			continue
+		} else {
+			//log.Println("redis get :", reply)
+			//redis存在
+			//log.Println(reply)
+			//reply = "5e0316b998a9abaf6535df3d"
+			//id, err := primitive.ObjectIDFromHex(reply)
+			//if err != nil {
+			//	log.Println("get redis id  Hex err:", err, tmp)
+			//	if err := rdb.Close(); err != nil {
+			//		log.Println(err)
+			//	}
+			//	continue
+			//}
+			if err := rdb.Close(); err != nil {
+				log.Println(err)
+			}
+			//拿到合并后的qyk
+			FClient.DbName = Config["mgodb_extract_kf"]
+			oldTmp := FClient.FindById(Config["mgo_qyk_c"], reply)
+			if oldTmp == nil{
+				log.Println("redis id 不存在")
 				continue
-			} else {
-				log.Println("redis get :", reply)
-				//redis存在
-				//log.Println(reply)
-				//reply = "5e0316b998a9abaf6535df3d"
-				id, err := primitive.ObjectIDFromHex(reply)
-				if err != nil {
-					log.Println("get redis id  Hex err:", err, tmp)
-					if err := rdb.Close(); err != nil {
-						log.Println(err)
-					}
-					continue
-				}
-				if err := rdb.Close(); err != nil {
-					log.Println(err)
-				}
-				//拿到合并后的qyk
-				oldTmp := make(map[string]interface{})
-				err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-					FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
-				if err != nil {
-					log.Println("qyk id err:", err, id)
-					continue
-				}
-				//比较合并
-				//行业类型
-				tmpTopscopeclass := []string{}
-				tmpTopscopeclassMap := make(map[string]bool)
-				log.Println(tmp["_id"])
-				overid = tmp["_id"].(primitive.ObjectID).Hex()
-				if oldTmp["industry"] == nil {
-					//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
-					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-							}
-						}
-						for k := range tmpTopscopeclassMap {
-							tmpTopscopeclass = append(tmpTopscopeclass, k)
-						}
-					}
-				} else {
-					if v, ok := oldTmp["industry"].(primitive.A); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok {
-								tmpTopscopeclassMap[vvv] = true
-							}
-						}
-					}
-					if v, ok := tmp["topscopeclass"].(primitive.A); ok {
-						for _, vv := range v {
-							if vvv, ok := vv.(string); ok && len(vvv) > 1 {
-								tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
-							}
-						}
-						for k := range tmpTopscopeclassMap {
-							tmpTopscopeclass = append(tmpTopscopeclass, k)
+			}
+			//err = FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+			//	FindOne(context.TODO(), bson.M{"_id": id}).Decode(&oldTmp)
+			//if err != nil {
+			//	log.Println("qyk id err:", err, id)
+			//	continue
+			//}
+			//比较合并
+			//行业类型
+			tmpTopscopeclass := []string{}
+			tmpTopscopeclassMap := make(map[string]bool)
+
+			if oldTmp["industry"] == nil {
+				//log.Println(reflect.ValueOf(tmp["topscopeclass"]))
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
 						}
 					}
-				}
-				sort.Strings(tmpTopscopeclass)
-				oldTmp["industry"] = tmpTopscopeclass
-				esId := oldTmp["_id"].(primitive.ObjectID).Hex()
-				//更新行业类型
-				if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
-					oldTmp["updatatime"] = time.Now().Unix()
-					//mongo更新
-					if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-						UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-						log.Println("mongo更新err:", err)
-					}
-					//es更新
-					delete(oldTmp, "_id")
-					//esConn := elastic.GetEsConn()
-					//defer elastic.DestoryEsConn(esConn)
-					if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-						log.Println("update es err:", err)
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
 					}
-					//log.Println( err2,err3)
-					continue
 				}
-				//联系方式合并
-				var tmpperson, winnertel string
-				tmpperson = tmp["winnerperson"].(string)
-				if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
-					winnertel = ""
-				} else {
-					if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
-						winnertel = ""
-					} else {
-						winnertel = util.ObjToString(tmp["winnertel"])
+			} else {
+				if v, ok := oldTmp["industry"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok {
+							tmpTopscopeclassMap[vvv] = true
+						}
 					}
 				}
-				contactMaps := make([]interface{}, 0)
-				if oldTmp["contact"] == nil {
-					tmpContact := make(map[string]interface{})
-					tmpContact["contact_person"] = tmpperson
-					tmpContact["contact_type"] = "项目联系人"
-					tmpContact["phone"] = winnertel
-					tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-					tmpContact["updatetime"] = time.Now().Unix()
-					contactMaps = append(contactMaps, tmpContact)
-				} else {
-					//对比前四项,相等丢弃
-					if v, ok := oldTmp["contact"].(primitive.A); ok {
-						var isNotUpdate bool
-						for _, vv := range v {
-							if vvv, ok := vv.(map[string]interface{}); ok {
-								if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
-									vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
-									isNotUpdate = true
-									vvv["updatetime"] = time.Now().Unix()
-								}
-								contactMaps = append(contactMaps, vvv)
-							}
-						}
-						if !isNotUpdate {
-							vvv := make(map[string]interface{})
-							vvv["contact_person"] = tmp["winnerperson"]
-							vvv["contact_type"] = "项目联系人"
-							vvv["phone"] = winnertel
-							vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
-							vvv["updatetime"] = time.Now().Unix()
-							contactMaps = append(contactMaps, vvv)
+				if v, ok := tmp["topscopeclass"].(primitive.A); ok {
+					for _, vv := range v {
+						if vvv, ok := vv.(string); ok && len(vvv) > 1 {
+							tmpTopscopeclassMap[vvv[:len(vvv)-1]] = true
 						}
 					}
+					for k := range tmpTopscopeclassMap {
+						tmpTopscopeclass = append(tmpTopscopeclass, k)
+					}
 				}
-				oldTmp["contact"] = contactMaps
-				//mongo更新
+			}
+			sort.Strings(tmpTopscopeclass)
+			oldTmp["industry"] = tmpTopscopeclass
+			esId := oldTmp["_id"].(primitive.ObjectID).Hex()
+			//更新行业类型
+			if tmp["winnerperson"] == nil || tmp["winnerperson"] == "" || Reg_xing.MatchString(util.ObjToString(tmp["winnerperson"])) {
 				oldTmp["updatatime"] = time.Now().Unix()
-				if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-					UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
-					log.Println("mongo更新 err :", err)
+				//mongo更新
+				FClient.DbName =Config["mgodb_extract_kf"]
+				if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
+					log.Println("mongo更新err",esId)
 				}
+				//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+				//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+				//	log.Println("mongo更新err:", err)
+				//}
 				//es更新
 				delete(oldTmp, "_id")
 				//esConn := elastic.GetEsConn()
 				//defer elastic.DestoryEsConn(esConn)
 				if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
-					log.Println("EsConn err :", err)
+					log.Println("update es err:", err)
 				}
 				//log.Println( err2,err3)
+				continue
 			}
-		} else {
-			log.Println(tmp)
-			continue
-		}
-	}
-	defer cursor.Close(c2)
-	log.Println("合并执行完成", gtid, lteid ,overid)
-	if gtid != lteid{
-		by, _ := json.Marshal(map[string]interface{}{
-			"gtid":  overid,
-			"lteid": lteid,
-			"stype": "",
-		})
-		if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-			IP:   net.ParseIP("127.0.0.1"),
-			Port: Updport,
-		});e != nil{
-			log.Println(e)
+			//联系方式合并
+			var tmpperson, winnertel string
+			tmpperson = tmp["winnerperson"].(string)
+			if tmp["winnertel"] == nil || tmp["winnertel"] == "" {
+				winnertel = ""
+			} else {
+				if Reg_xing.MatchString(util.ObjToString(tmp["winnertel"])) || !Reg_tel.MatchString(util.ObjToString(tmp["winnertel"])) {
+					winnertel = ""
+				} else {
+					winnertel = util.ObjToString(tmp["winnertel"])
+				}
+			}
+			contactMaps := make([]interface{}, 0)
+			if oldTmp["contact"] == nil {
+				tmpContact := make(map[string]interface{})
+				tmpContact["contact_person"] = tmpperson
+				tmpContact["contact_type"] = "项目联系人"
+				tmpContact["phone"] = winnertel
+				tmpContact["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+				tmpContact["updatetime"] = time.Now().Unix()
+				contactMaps = append(contactMaps, tmpContact)
+			} else {
+				//对比前四项,相等丢弃
+				if v, ok := oldTmp["contact"].(primitive.A); ok {
+					var isNotUpdate bool
+					for _, vv := range v {
+						if vvv, ok := vv.(map[string]interface{}); ok {
+							if vvv["contact_person"] == tmpperson && vvv["contact_type"] == "项目联系人" &&
+								vvv["phone"] == winnertel && vvv["topscopeclass"] == strings.Join(tmpTopscopeclass, ";") {
+								isNotUpdate = true
+								vvv["updatetime"] = time.Now().Unix()
+							}
+							contactMaps = append(contactMaps, vvv)
+						}
+					}
+					if !isNotUpdate {
+						vvv := make(map[string]interface{})
+						vvv["contact_person"] = tmp["winnerperson"]
+						vvv["contact_type"] = "项目联系人"
+						vvv["phone"] = winnertel
+						vvv["topscopeclass"] = strings.Join(tmpTopscopeclass, ";")
+						vvv["updatetime"] = time.Now().Unix()
+						contactMaps = append(contactMaps, vvv)
+					}
+				}
+			}
+			oldTmp["contact"] = contactMaps
+			//mongo更新
+			oldTmp["updatatime"] = time.Now().Unix()
+			FClient.DbName=Config["mgodb_extract_kf"]
+			if !FClient.UpdateById(Config["mgo_qyk_c"],esId,bson.M{"$set": oldTmp}){
+				log.Println("mongo更新 err",esId,oldTmp)
+			}
+			//if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
+			//	UpdateOne(context.TODO(), bson.M{"_id": oldTmp["_id"]}, bson.M{"$set": oldTmp}); err != nil {
+			//	log.Println("mongo更新 err :", err)
+			//}
+			//es更新
+			delete(oldTmp, "_id")
+			//esConn := elastic.GetEsConn()
+			//defer elastic.DestoryEsConn(esConn)
+			if _, err := EsConn.Update().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(esId).Doc(oldTmp).Refresh(true).Do(); err != nil {
+				log.Println("EsConn err :", err)
+			}
+			//log.Println( err2,err3)
 		}
-		log.Println("重新发送udp:",string(by))
 	}
-	log.Println("合并执行完成 ok", gtid, lteid,overid)
+	//defer cursor.Close(context.Background())
+	//log.Println("合并执行完成", gtid, lteid, overid)
+	//if overid != lteid {
+	//	by, _ := json.Marshal(map[string]interface{}{
+	//		"gtid":  overid,
+	//		"lteid": lteid,
+	//		"stype": "",
+	//	})
+	//	if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+	//		IP:   net.ParseIP("127.0.0.1"),
+	//		Port: Updport,
+	//	}); e != nil {
+	//		log.Println(e)
+	//	}
+	//	log.Println("重新发送udp:", string(by))
+	//	return
+	//}
+	log.Println("合并执行完成 ok", gtid, lteid, overid)
 
 }

+ 321 - 0
udp_winner/mgo.go

@@ -0,0 +1,321 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+var Mgo *MongodbSim
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint16(m.Size))
+	m.pool = make(chan bool, m.Size)
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		log.Println(err)
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	if e := r.Decode(&v);e != nil{
+		log.Println(e)
+		return nil
+	}
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 57 - 57
udp_winner/timedTask.go

@@ -1,74 +1,73 @@
 package main
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/garyburd/redigo/redis"
 	"go.mongodb.org/mongo-driver/bson/primitive"
-	"go.mongodb.org/mongo-driver/mongo/options"
 	"gopkg.in/mgo.v2/bson"
 	"log"
+	mu "mfw/util"
 	"net"
 	"sort"
 	"strings"
 	"time"
-	mu "mfw/util"
 )
 
 //定时任务
 //1.存异常表
 //2.合并原始库新增
 func TimedTask() {
+	//time.Sleep(time.Hour*70)
 	t2 := time.NewTimer(time.Second * 5)
 	for range t2.C {
+		Fcconn := FClient.GetMgoConn()
+		defer FClient.DestoryMongoConn(Fcconn)
 		tmpLast := map[string]interface{}{}
-		if err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").FindOne(context.TODO(), bson.M{}, options.FindOne().SetSort(bson.M{"_id": -1})).Decode(&tmpLast); err != nil {
-			//临时表无数据
-			log.Println("临时表无数据:", err)
-			t2.Reset(time.Minute * 5)
-			continue
-		} else {
-			//临时表有数据
-			log.Println("临时表有数据:", tmpLast)
-			cursor, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").Find(context.TODO(), bson.M{
-				"_id": bson.M{
-					"$lte": tmpLast["_id"],
-				},
-			}, options.Find().SetSort(bson.M{"_id": 1}))
-			if err != nil {
-				log.Println(err)
-				t2.Reset(time.Second * 5)
+		if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
+			if !iter.Next(&tmpLast) {
+				//临时表无数据
+				log.Println("临时表无数据:")
+				t2.Reset(time.Minute * 5)
 				continue
-			}
-			//遍历临时表数据,匹配不到原始库存入异常表
-			for cursor.Next(context.TODO()) {
-				if err := cursor.Err(); err != nil {
-					log.Println("cursor.Err();", err)
+			} else {
+				log.Println("临时表有数据:", tmpLast)
+				fconn := FClient.GetMgoConn()
+				defer FClient.DestoryMongoConn(fconn)
+				cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
+					"_id": bson.M{
+						"$lte": tmpLast["_id"],
+					},
+				}).Sort("_id").Iter()
+				if cursor == nil {
+					log.Println("查询失败")
+					t2.Reset(time.Second * 5)
+					continue
 				}
+				//遍历临时表数据,匹配不到原始库存入异常表
 				tmp := make(map[string]interface{})
-				if err := cursor.Decode(&tmp); err == nil {
+				for cursor.Next(&tmp) {
+					tmpId := tmp["_id"].(primitive.ObjectID).Hex()
 					//再重新查找redis,存在发udp处理,不存在走新增合并
 					rdb := RedisPool.Get()
-					if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
+					if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
 						//{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
 						//redis存在发送udp进行处理
-						go func(reply string) {
-							by, _ := json.Marshal(map[string]interface{}{
-								"gtid":  reply,
-								"lteid": reply,
-								"stype": "",
-							})
-							if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
-								IP:   net.ParseIP("127.0.0.1"),
-								Port: Updport,
-							});e != nil{
-								log.Println(e)
-							}
-						}(reply)
+						by, _ := json.Marshal(map[string]interface{}{
+							"gtid":  tmpId,
+							"lteid": tmpId,
+							"stype": "",
+						})
+						if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
+							IP:   net.ParseIP("127.0.0.1"),
+							Port: Updport,
+						}); e != nil {
+							log.Println(e)
+						}
 						//存在的话删除tmp mongo表
-						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp);err != nil{
-							log.Println("删除临时表err:",err)
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
+							log.Println("删除临时表err:", DeletedCount)
 						}
 						if err := rdb.Close(); err != nil {
 							log.Println(err)
@@ -80,16 +79,18 @@ func TimedTask() {
 						}
 					}
 					//查询redis不存在新增
-					resulttmp := make(map[string]interface{})
-					r := FClient.Database(Config["mgodb_enterprise"]).Collection(Config["mgodb_enterprise_c"]).FindOne(context.TODO(), bson.M{"company_name": tmp["winner"]}).Decode(&resulttmp)
-					if r != nil {
+					FClient.DbName = Config["mgodb_enterprise"]
+					resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
+					if resulttmp != nil {
 						//log.Println(r)
 						//匹配不到原始库,存入异常表删除临时表
-						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_err").InsertOne(context.TODO(), tmp); err != nil {
-							log.Println(err)
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if saveid := FClient.Save("winner_err", tmp); saveid == nil {
+							log.Println("存入异常表错误", tmp)
 						}
-						if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
-							log.Println(err)
+						FClient.DbName = Config["mgodb_extract_kf"]
+						if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+							log.Println("删除临时表错误", deleteNum)
 						}
 						continue
 					} else {
@@ -222,13 +223,13 @@ func TimedTask() {
 						//tmps = append(tmps, savetmp)
 						savetmp["updatatime"] = time.Now().Unix()
 						//保存mongo
-						result, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
-							InsertOne(context.TODO(), savetmp)
-						if err == nil {
+						FClient.DbName = Config["mgodb_extract_kf"]
+						saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
+						if saveid != nil {
 							//保存redis
 							rc := RedisPool.Get()
 							var _id string
-							if v, ok := result.InsertedID.(primitive.ObjectID); ok {
+							if v, ok := saveid.(primitive.ObjectID); ok {
 								_id = v.Hex()
 							}
 							if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
@@ -249,19 +250,18 @@ func TimedTask() {
 									log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
 								} else {
 									//删除临时表
-									if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
-										log.Println(err)
+									FClient.DbName = Config["mgodb_extract_kf"]
+									if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
+										log.Println("删除临时表失败", deleteNum)
 									}
 								}
 							}
 						} else {
-							log.Println("save mongo err:", err, tmp["_id"])
+							log.Println("save mongo err:", saveid, tmp["_id"])
 						}
 					}
 				}
-
 			}
-			defer cursor.Close(context.TODO())
 		}
 		t2.Reset(time.Minute)
 	}

+ 66 - 72
udpcreateindex/src/biddingall.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -67,7 +66,11 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 	var compare bson.M
 	bnil := false
 	for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
+		if deldata := qutil.IntAll(tmp["del"]); deldata == 1 { //临时:重复数据不生索引
+			continue
+		}
 		update := map[string]interface{}{}
+		del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段
 		//对比方法----------------
 		for {
 			if compare == nil {
@@ -84,20 +87,22 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 					bnil = false
 					//更新bidding表,生成索引
 					for _, k := range fields { //fields更新到mongo的字段
-						v1 := compare[k]
-						v2 := tmp[k]
+						v1 := compare[k] //extract
+						v2 := tmp[k]     //bidding
 						if v2 == nil && v1 != nil {
 							update[k] = v1
 						} else if v2 != nil && v1 != nil {
-							//update[k+"_b"] = v2
 							update[k] = v1
-						} else if v2 != nil && v1 == nil {
-							//update[k+"_b"] = v2
+						} else if v2 != nil && v1 == nil { //
+							if k == "s_subscopeclass" && del["subscopeclass"] == nil {
+								continue
+							}
+							del[k] = 1
+							//qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2)
 						}
 					}
 					if qutil.IntAll(compare["repeat"]) == 1 {
 						update["extracttype"] = -1
-						//} else if qutil.IntAll(tmp["extracttype"]) == -1 {
 					} else {
 						update["extracttype"] = 1
 					}
@@ -120,21 +125,19 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 		//下面可以多线程跑的--->
 		//处理分类
 		mpool <- true
-		go func(tmp, update, compare map[string]interface{}, bnil bool) {
+		go func(tmp, update, compare, del map[string]interface{}, bnil bool) {
 			defer func() {
 				<-mpool
 			}()
 			if !bnil && compare != nil {
 				subscopeclass, _ := compare["subscopeclass"].([]interface{})
 				if subscopeclass != nil {
-					//str := ","
 					m1 := map[string]bool{}
 					newclass := []string{}
 					for _, sc := range subscopeclass {
 						sclass, _ := sc.(string)
 						if !m1[sclass] {
 							m1[sclass] = true
-							//str += sclass + ","
 							newclass = append(newclass, sclass)
 						}
 					}
@@ -142,72 +145,65 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 					update["subscopeclass"] = newclass
 				}
 				//处理中标企业
-				winner, _ := compare["winner"].(string)
-				m1 := map[string]bool{}
-				if winner != "" {
-					m1[winner] = true
-				}
-				package1 := compare["package"]
-				if package1 != nil {
-					packageM, _ := package1.(map[string]interface{})
-					for _, p := range packageM {
-						pm, _ := p.(map[string]interface{})
-						pw, _ := pm["winner"].(string)
-						if pw != "" {
-							m1[pw] = true
-						}
-					}
-				}
+				//				winner, _ := compare["winner"].(string)
+				//				m1 := map[string]bool{}
+				//				if winner != "" {
+				//					m1[winner] = true
+				//				}
+				//				package1 := compare["package"]
+				//				if package1 != nil {
+				//					packageM, _ := package1.(map[string]interface{})
+				//					for _, p := range packageM {
+				//						pm, _ := p.(map[string]interface{})
+				//						pw, _ := pm["winner"].(string)
+				//						if pw != "" {
+				//							m1[pw] = true
+				//						}
+				//					}
+				//				}
 				compare = nil
-				if len(m1) > 0 {
-					//str := ","
-					winnerarr := []string{}
-					for k, _ := range m1 {
-						//str += k + ","
-						winnerarr = append(winnerarr, k)
-					}
-					update["s_winner"] = strings.Join(winnerarr, ",")
-				}
+				//				if len(m1) > 0 {
+				//					//str := ","
+				//					winnerarr := []string{}
+				//					for k, _ := range m1 {
+				//						//str += k + ","
+				//						winnerarr = append(winnerarr, k)
+				//					}
+				//					update["s_winner"] = strings.Join(winnerarr, ",")
+				//				}
 			}
 			//------------------对比结束
-
-			//处理key descript
-			//		if bkey == "" {
-			//			DealInfo(&tmp, &update)
-			//		}
 			//同时保存到elastic
 			for tk, tv := range update {
 				tmp[tk] = tv
 			}
 			//对projectscope字段的索引处理
 			ps, _ := tmp["projectscope"].(string)
-			if ps == "" {
-				tmp["projectscope"] = "" //= tmp["detail"]
-			}
+			//			if ps == "" {
+			//				tmp["projectscope"] = ""
+			//			}
 			if len(ps) > ESLEN {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
-			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-				tmp["budget"] = nil
-			} else if sbd, ok := tmp["budget"].(string); ok {
-				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-				tmp["bidamount"] = nil
-			} else if sbd, ok := tmp["bidamount"].(string); ok {
-				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			//		for k1, _ := range tmp {
-			//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
-			//				delete(tmp, k1)
+
+			//预算和中标金额
+			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+			//				tmp["budget"] = nil
+			//			} else if sbd, ok := tmp["budget"].(string); ok {
+			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
 			//			}
-			//		}
+			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+			//				tmp["bidamount"] = nil
+			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
+			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
+
 			//go IS.Add("bidding")
 			UpdatesLock.Lock()
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}
 				for _, v := range biddingIndexFields { //
-					if tmp[v] != nil {
+					if tmp[v] != nil && del[v] == nil { //
 						if "projectinfo" == v {
 							mp, _ := tmp[v].(map[string]interface{})
 							if mp != nil {
@@ -232,8 +228,8 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 										}
 									}
 								}
+								con = FilterDetailSpace(con)
 								if con != "" {
-									con = FilterDetailSpace(con)
 									newTmp["attachments"] = con
 								}
 							}
@@ -245,21 +241,19 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 								newTmp[v] = tmp[v]
 							}
 						}
-					} else if v == "budget" || v == "bidamount" {
+					} /*else if v == "budget" || v == "bidamount" {
 						newTmp[v] = nil
-					}
+					}*/
 				}
 				arrEs = append(arrEs, newTmp)
 			}
 			if len(update) > 0 {
-				arr = append(arr, []map[string]interface{}{
-					map[string]interface{}{
-						"_id": tmp["_id"],
-					},
-					map[string]interface{}{
-						"$set": update,
-					},
-				})
+				queryId := map[string]interface{}{"_id": tmp["_id"]}
+				set := map[string]interface{}{"$set": update}
+				if len(del) > 0 { //删除的数据
+					set["$unset"] = del
+				}
+				arr = append(arr, []map[string]interface{}{queryId, set})
 			}
 			if len(arr) >= BulkSize {
 				mgo.UpdateBulkAll(db, c, arr...)
@@ -274,9 +268,9 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				arrEs = []map[string]interface{}{}
 			}
 			UpdatesLock.Unlock()
-		}(tmp, update, compare, bnil)
-		if n%100 == 0 {
-			log.Println("current:", n)
+		}(tmp, update, compare, del, bnil)
+		if n%1000 == 0 {
+			log.Println("current:", n, tmp["_id"])
 		}
 		tmp = make(map[string]interface{})
 	}

+ 37 - 36
udpcreateindex/src/biddingdata.go

@@ -1,7 +1,7 @@
 package main
 
 import (
-	"fmt"
+	//	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -48,6 +48,7 @@ var indexfield = []string{
 	"buyerclass",
 	"district",
 	"topscopeclass",
+	"attachments",
 }
 
 //招标数据表和抽取表一一对应开始更新
@@ -179,32 +180,32 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 					update["subscopeclass"] = newclass
 				}
 				//处理中标企业
-				winner, _ := compare["winner"].(string)
-				m1 := map[string]bool{}
-				if winner != "" {
-					m1[winner] = true
-				}
-				package1 := compare["package"]
-				if package1 != nil {
-					packageM, _ := package1.(map[string]interface{})
-					for _, p := range packageM {
-						pm, _ := p.(map[string]interface{})
-						pw, _ := pm["winner"].(string)
-						if pw != "" {
-							m1[pw] = true
-						}
-					}
-				}
+				//				winner, _ := compare["winner"].(string)
+				//				m1 := map[string]bool{}
+				//				if winner != "" {
+				//					m1[winner] = true
+				//				}
+				//				package1 := compare["package"]
+				//				if package1 != nil {
+				//					packageM, _ := package1.(map[string]interface{})
+				//					for _, p := range packageM {
+				//						pm, _ := p.(map[string]interface{})
+				//						pw, _ := pm["winner"].(string)
+				//						if pw != "" {
+				//							m1[pw] = true
+				//						}
+				//					}
+				//				}
 				compare = nil
-				if len(m1) > 0 {
-					//str := ","
-					winnerarr := []string{}
-					for k, _ := range m1 {
-						//str += k + ","
-						winnerarr = append(winnerarr, k)
-					}
-					update["s_winner"] = strings.Join(winnerarr, ",")
-				}
+				//				if len(m1) > 0 {
+				//					//str := ","
+				//					winnerarr := []string{}
+				//					for k, _ := range m1 {
+				//						//str += k + ","
+				//						winnerarr = append(winnerarr, k)
+				//					}
+				//					update["s_winner"] = strings.Join(winnerarr, ",")
+				//				}
 			}
 			//------------------对比结束
 
@@ -220,16 +221,16 @@ func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
 			if len(ps) > ESLEN {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
-			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-				tmp["budget"] = nil
-			} else if sbd, ok := tmp["budget"].(string); ok {
-				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
-			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-				tmp["bidamount"] = nil
-			} else if sbd, ok := tmp["bidamount"].(string); ok {
-				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-			}
+			//			if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+			//				tmp["budget"] = nil
+			//			} else if sbd, ok := tmp["budget"].(string); ok {
+			//				tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
+			//			if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+			//				tmp["bidamount"] = nil
+			//			} else if sbd, ok := tmp["bidamount"].(string); ok {
+			//				tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+			//			}
 
 			if qutil.IntAll(update["extracttype"]) != -1 {
 				newTmp := map[string]interface{}{}

+ 41 - 42
udpcreateindex/src/biddingindex.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -143,32 +142,32 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 				update["subscopeclass"] = newclass
 			}
 			//处理中标企业
-			winner, _ := compare["winner"].(string)
-			m1 := map[string]bool{}
-			if winner != "" {
-				m1[winner] = true
-			}
-			package1 := compare["package"]
-			if package1 != nil {
-				packageM, _ := package1.(map[string]interface{})
-				for _, p := range packageM {
-					pm, _ := p.(map[string]interface{})
-					pw, _ := pm["winner"].(string)
-					if pw != "" {
-						m1[pw] = true
-					}
-				}
-			}
+			//			winner, _ := compare["winner"].(string)
+			//			m1 := map[string]bool{}
+			//			if winner != "" {
+			//				m1[winner] = true
+			//			}
+			//			package1 := compare["package"]
+			//			if package1 != nil {
+			//				packageM, _ := package1.(map[string]interface{})
+			//				for _, p := range packageM {
+			//					pm, _ := p.(map[string]interface{})
+			//					pw, _ := pm["winner"].(string)
+			//					if pw != "" {
+			//						m1[pw] = true
+			//					}
+			//				}
+			//			}
 			compare = nil
-			if len(m1) > 0 {
-				//str := ","
-				winnerarr := []string{}
-				for k, _ := range m1 {
-					//str += k + ","
-					winnerarr = append(winnerarr, k)
-				}
-				update["s_winner"] = strings.Join(winnerarr, ",")
-			}
+			//			if len(m1) > 0 {
+			//				//str := ","
+			//				winnerarr := []string{}
+			//				for k, _ := range m1 {
+			//					//str += k + ","
+			//					winnerarr = append(winnerarr, k)
+			//				}
+			//				update["s_winner"] = strings.Join(winnerarr, ",")
+			//			}
 		}
 		//------------------对比结束
 
@@ -182,22 +181,22 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		}
 		//对projectscope字段的索引处理
 		ps, _ := tmp["projectscope"].(string)
-		if ps == "" {
-			tmp["projectscope"] = "" //= tmp["detail"]
-		}
+		//		if ps == "" {
+		//			tmp["projectscope"] = "" //= tmp["detail"]
+		//		}
 		if len(ps) > ESLEN {
 			tmp["projectscope"] = string(([]rune(ps))[:4000])
 		}
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		} else if sbd, ok := tmp["budget"].(string); ok {
-			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		} else if sbd, ok := tmp["bidamount"].(string); ok {
-			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		} else if sbd, ok := tmp["budget"].(string); ok {
+		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
 		UpdatesLock.Lock()
 		//		for k1, _ := range tmp {
 		//			if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
@@ -236,8 +235,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 									}
 								}
 							}
+							con = FilterDetailSpace(con)
 							if con != "" {
-								con = FilterDetailSpace(con)
 								newTmp["attachments"] = con
 							}
 						}
@@ -249,9 +248,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 							newTmp[v] = tmp[v]
 						}
 					}
-				} else if v == "budget" || v == "bidamount" {
+				} /*else if v == "budget" || v == "bidamount" {
 					newTmp[v] = nil
-				}
+				}*/
 			}
 			arrEs = append(arrEs, newTmp)
 		}

+ 33 - 16
udpcreateindex/src/biddingindexback.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"fmt"
 	"log"
 	qutil "qfw/util"
 	elastic "qfw/util/elastic"
@@ -86,22 +85,22 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 		}
 
 		ps, _ := tmp["projectscope"].(string)
-		if ps == "" {
-			tmp["projectscope"] = "" //= tmp["detail"]
-		}
+		//		if ps == "" {
+		//			tmp["projectscope"] = "" //= tmp["detail"]
+		//		}
 		if len(ps) > ESLEN {
 			tmp["projectscope"] = string(([]rune(ps))[:4000])
 		}
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		} else if sbd, ok := tmp["budget"].(string); ok {
-			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		} else if sbd, ok := tmp["bidamount"].(string); ok {
-			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
-		}
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		} else if sbd, ok := tmp["budget"].(string); ok {
+		//			tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		} else if sbd, ok := tmp["bidamount"].(string); ok {
+		//			tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
+		//		}
 		UpdatesLock.Lock()
 		newTmp := map[string]interface{}{}
 		for _, v := range biddingIndexFields {
@@ -116,6 +115,24 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 							}
 						}
 						newTmp[v] = newmap
+						attachments := mp["attachments"]
+						con := ""
+						if attachments != nil {
+							am, _ := attachments.(map[string]interface{})
+							if am != nil {
+								for _, v1 := range am {
+									vm, _ := v1.(map[string]interface{})
+									if vm != nil {
+										c, _ := vm["content"].(string)
+										con += c
+									}
+								}
+							}
+						}
+						con = FilterDetailSpace(con)
+						if con != "" {
+							newTmp["attachments"] = con
+						}
 					}
 				} else {
 					if v == "detail" {
@@ -125,9 +142,9 @@ func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
 						newTmp[v] = tmp[v]
 					}
 				}
-			} else if v == "budget" || v == "bidamount" {
+			} /* else if v == "budget" || v == "bidamount" {
 				newTmp[v] = nil
-			}
+			}*/
 		}
 		arrEs = append(arrEs, newTmp)
 		if len(arrEs) >= BulkSizeBack {

+ 15 - 16
udpcreateindex/src/config.json

@@ -1,13 +1,13 @@
 {
     "udpport": ":1483",
-    "msg_server": "192.168.3.207:27092",
+    "msg_server": "10.171.112.160:7070",
 	"savedb": {
-        "addr": "192.168.3.207:27092",
-        "size": 6,
-        "db": "extract_v3"
+        "addr": "172.17.4.187:27083",
+        "size": 10,
+        "db": "qfw"
     },
     "jkmail": {
-        "to":"renzheng@topnet.net.cn",
+        "to":"zhangjinkun@topnet.net.cn",
 		"api":"http://10.171.112.160:19281/_send/_mail"
     },
     "winner": {
@@ -30,31 +30,30 @@
     },
     "bidding": {
         "db": "qfw",
-        "collect": "bidding",
-        "index": "bidding",
+        "collect": "bidding_back",
+        "index": "bidding_v1",
         "type": "bidding",
-        "extractdb": "extract_v3",
-        "extractcollect": "result_v3",
-        "indexfields": [
-            "_id","district","topscopeclass","s_winner","winner","buyerclass","title","detail","area","site","bidopendate","bidopentime","buyer","city","comeintime","href","infoformat","projectcode","projectname","publishtime","s_sha","spidercode","subtype","toptype","agency","budget","bidamount","s_subscopeclass","projectscope","bidstatus","projectinfo","buyertel","buyerperson","buyeraddr","buyerzipcode","winnertel","winnerperson","projectid"
+        "extractdb": "qfw",
+        "extractcollect": "result_20200116",
+        "indexfields":[ "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
         ],
-        "fields": "buyerclass,projectname,projectcode,bidamount,budget,agency,amount,winner,buyer,bidopendate,bidopentime,bidstatus,projectscope,buyertel,buyerperson,buyeraddr,buyerzipcode,city,area,district,topscopeclass,winnertel,winnerperson",
+        "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
         "multiIndex": ""
     },
     "project": {
-        "db": "qfw",
-        "collect": "projectset",
+        "db": "extract_kf",
+        "collect": "huawei_project",
         "index": "projectset_v1",
         "type": "projectset"
     },
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
         "pool": 10,
         "db": "qfw"
     },
     "elastic": {
-        "addr": "http://39.96.199.144:9800",
+        "addr": "http://172.17.145.170:9800",
         "pool": 12
     }
 }

+ 10 - 8
udpcreateindex/src/projectindex.go

@@ -1,7 +1,7 @@
 package main
 
 import (
-	"fmt"
+	//"fmt"
 	"log"
 	"qfw/util"
 	elastic "qfw/util/elastic"
@@ -31,19 +31,21 @@ func projectTask(data []byte, mapInfo map[string]interface{}) {
 
 	log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
 	query := session.DB(db).C(c).Find(q).Iter()
-
 	arr := make([]map[string]interface{}, savesizei)
 	var n int
 	i := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
 		delete(tmp, "package")
 		delete(tmp, "winnerorder")
-		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
-			tmp["budget"] = nil
-		}
-		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
-			tmp["bidamount"] = nil
-		}
+		delete(tmp, "infofield")
+		delete(tmp, "budgettag")
+		delete(tmp, "bidamounttag")
+		//		if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
+		//			tmp["budget"] = nil
+		//		}
+		//		if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
+		//			tmp["bidamount"] = nil
+		//		}
 		//go IS.Add("project")
 		arr[i] = tmp
 		n++

+ 6 - 6
udpfilterdup/src/config.json

@@ -1,13 +1,13 @@
 {
-    "udpport": ":1488",
+    "udpport": ":1485",
     "dupdays": 5,
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "127.0.0.1:27080",
         "pool": 10,
-        "db": "extract_kf",
-        "extract": "zk",
+        "db": "qfw",
+        "extract": "extract_v20190111",
         "site": {
-            "dbname": "extract_kf",
+            "dbname": "qfw",
             "coll": "site"
         }
     },
@@ -18,7 +18,7 @@
     "nextNode": [],
     "isMerger": false,
     "threads": 1,
-    "isSort":true,
+    "isSort":false,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
     "specialtitle_2": "项目([0-9a-zA-Z一二三四五六七八九十零123456789])",

+ 1 - 1
udpfilterdup/src/main.go

@@ -117,7 +117,7 @@ func mainT() {
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
 	mapinfo["stop"] = "true"
-	historyTask([]byte{}, mapinfo)
+	task([]byte{}, mapinfo)
 	time.Sleep(10 * time.Second)
 }
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {