Jianghan 1 жил өмнө
parent
commit
5b8ee69ecd

BIN
bidding_data/20220311标签规则.xlsx


+ 9 - 9
bidding_data/config.json

@@ -1,17 +1,17 @@
 {
-  "mgodb": "192.168.3.207:27092",
+  "mgodb": "192.168.3.166:27082",
   "dbsize": 12,
-  "dbname": "wjh",
-  "dbcoll": "f_sourceinfo_20220415_1",
+  "dbname": "guangdongyidong",
+  "dbcoll": "20230116Gdyd_12_qx_1",
   "qyxyColl": "oprd_qyxy",
-  "uname": "SJZY_RWMIX_Other",
-  "upwd": "SJZY@M34I6x7D9ata",
-  "tagFile": "/Users/jianghan/Desktop/20220311标签规则.xlsx",
+  "uname": "",
+  "upwd": "",
+  "tagFile": "/Users/jianghan/Project/GoPro/OprdData/bidding_data/20220311标签规则.xlsx",
   "operators": "移动,中移在线,中移系统,中移,铁通,联通,联合网络通信,电信,天翼视讯,天翼电信,广电网络,中国广电,湖北广电",
   "checkDb": {
-    "addr": "192.168.3.207:27092",
-    "dbname": "wjh",
+    "addr": "192.168.3.166:27082",
+    "dbname": "guangdongyidong",
     "dbsize": 5,
-    "dbcoll": "f_sourceinfo_20220415_1"
+    "dbcoll": "20230116Gdyd_12_qx"
   }
 }

+ 3 - 2
bidding_data/main.go

@@ -23,6 +23,7 @@ var (
 
 	operators    []string
 	TagMatchRule = map[string][]TagMatching{}
+	//TagMatchRule []TagMatching
 )
 
 func init() {
@@ -59,8 +60,8 @@ func init() {
 }
 
 func main() {
-	//go saveMethod()
-	go updateMethod()
+	go saveMethod()
+	//go updateMethod()
 
 	sess := Mgo1.GetMgoConn()
 	defer Mgo1.DestoryMongoConn(sess)

+ 0 - 1
bidding_data/pareExcel.go

@@ -28,7 +28,6 @@ var MatchWayMap = map[string]string{
 	"全文匹配":   "detail",
 	"附件匹配":   "filetext",
 	"项目名称匹配": "projectname",
-	"采购单位行业": "buyerclass",
 }
 
 func initExcel(path string) {

+ 96 - 0
bidding_data/pareExcel1.go

@@ -0,0 +1,96 @@
+package main
+
+//
+//import (
+//	"github.com/tealeg/xlsx"
+//	"qfw/util"
+//	"strings"
+//)
+//
+//func initExcel1(path string) {
+//	xlFile, err := xlsx.OpenFile(path)
+//	if err != nil {
+//		util.Debug(err)
+//		return
+//	}
+//
+//	for _, sheet := range xlFile.Sheet {
+//		//tagArr = append(tagArr, sheet.Name)
+//		for rowIndex, row := range sheet.Rows {
+//			if rowIndex == 0 {
+//				continue
+//			}
+//			tag := TagMatching{}
+//			for cellIndex, cell := range row.Cells {
+//				if cell.String() != "" {
+//					if cellIndex == 0 {
+//						tag.firstTag = cell.String()
+//					} else if cellIndex == 1 {
+//						tag.secondTag = cell.String()
+//					} else if cellIndex == 2 {
+//						tag.matchKey = cell.String()
+//						tag.matchKeyReg = initRegex(cell.String())
+//					} else if cellIndex == 3 {
+//						tag.matchKeyWay = strings.Split(cell.String(), ",")
+//					} else if cellIndex == 4 {
+//						tag.addKey = cell.String()
+//						tag.addKeyReg = initRegex(cell.String())
+//					} else if cellIndex == 5 {
+//						tag.addKeyWay = strings.Split(cell.String(), ",")
+//					} else if cellIndex == 6 {
+//						tag.excludeKey = cell.String()
+//						tag.excludeKeyReg = initRegex(cell.String())
+//					} else if cellIndex == 7 {
+//						tag.excludeKeyWay = strings.Split(cell.String(), ",")
+//					} else if cellIndex == 8 {
+//						tag.clearKey = strings.Split(cell.String(), ",")
+//					} else if cellIndex == 9 {
+//						tag.clearKeyWay = strings.Split(cell.String(), ",")
+//					}
+//				}
+//			}
+//			TagMatchRule = append(TagMatchRule, tag)
+//		}
+//		//TagMatchRule[sheet.Name] = rules
+//	}
+//}
+//
+////func initExcel2(path string) {
+////	xlFile, err := xlsx.OpenFile(path)
+////	if err != nil {
+////		return
+////	}
+////
+////	for _, sheet := range xlFile.Sheet {
+////		tagArr = append(tagArr, sheet.Name)
+////		var rules []TagMatching
+////		for rowIndex, row := range sheet.Rows {
+////			if rowIndex == 0 {
+////				continue
+////			}
+////			tag := TagMatching{}
+////			for cellIndex, cell := range row.Cells {
+////				if cell.String() != "" {
+////					if cellIndex == 0 {
+////						tag.matchKey = cell.String()
+////						tag.matchKeyReg = initRegex(cell.String())
+////					} else if cellIndex == 1 {
+////						tag.matchKeyWay = strings.Split(cell.String(), ",")
+////					} else if cellIndex == 2 {
+////						tag.addKey = cell.String()
+////						tag.addKeyReg = initRegex(cell.String())
+////					} else if cellIndex == 3 {
+////						tag.addKeyWay = strings.Split(cell.String(), ",")
+////					} else if cellIndex == 4 {
+////						tag.excludeKey = cell.String()
+////						tag.excludeKeyReg = initRegex(cell.String())
+////					} else if cellIndex == 5 {
+////						tag.excludeKeyWay = strings.Split(cell.String(), ",")
+////					}
+////				}
+////			}
+////			rules = append(rules, tag)
+////		}
+////		TagMatchRule[sheet.Name] = rules
+////	}
+////}

+ 17 - 22
bidding_data/task.go

@@ -12,22 +12,13 @@ import (
 
 var numReg = regexp.MustCompile("[0-9]+")
 
-func taskinfo1(tmp map[string]interface{}) {
-	info := tmp["v_baseinfo"].(map[string]interface{})
-	delete(info, "field_source")
-	delete(info, "v_fieldscore")
-	id := util.ObjToString(tmp["id"])
-	info["_id"] = mongodb.StringTOBsonId(id)
-	Mgo.SaveByOriID(collSave, info)
-}
-
-func taskinfo(info map[string]interface{}) {
-	id := mongodb.BsonIdToSId(info["_id"])
-	//info, ok := tmp["v_baseinfo"].(map[string]interface{})
-	//if !ok {
-	//	util.Debug("data err ---", id)
-	//	return
-	//}
+func taskinfo(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	info, ok := tmp["v_baseinfo"].(map[string]interface{})
+	if !ok {
+		util.Debug("data err ---", id)
+		return
+	}
 	// 数据标签
 	//var mtkey []string
 	var tag []string
@@ -181,13 +172,17 @@ func taskinfo(info map[string]interface{}) {
 	//info["_id"] = mongodb.StringTOBsonId(id)
 	delete(info, "_id")
 	info["updatetime"] = time.Now().Unix()
-	//savePool <- info
-	update := []map[string]interface{}{{
-		"_id": mongodb.StringTOBsonId(id),
-	},
-		{"$set": info},
+	delete(info, "field_source")
+	if tmp["buyertagname"] != nil {
+		info["buyertagname"] = tmp["buyertagname"]
 	}
-	updatePool <- update
+	savePool <- info
+	//update := []map[string]interface{}{{
+	//	"_id": mongodb.StringTOBsonId(id),
+	//},
+	//	{"$set": info},
+	//}
+	//updatePool <- update
 }
 
 func periodMethod(str interface{}) int {

BIN
bidding_data/场景标签v1.xlsx


+ 11 - 10
es/biddingindex.go

@@ -4,7 +4,6 @@ import (
 	"go.mongodb.org/mongo-driver/bson"
 	"log"
 	qutil "qfw/util"
-	elastic "qfw/util/elastic"
 	"reflect"
 	"strings"
 	"unicode/utf8"
@@ -34,7 +33,6 @@ func biddingTask() {
 	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5db7c324a5cb26b9b78b0f09")}
 	count, _ := session.DB(db).C(coll).Find(nil).Count()
 	index := qutil.ObjToString(bidding["index"])
-	stype := qutil.ObjToString(bidding["type"])
 	//线程池
 	UpdatesLock := sync.Mutex{}
 	qutil.Debug("查询语句:", nil, "同步总数:", count, "elastic库:")
@@ -97,11 +95,11 @@ func biddingTask() {
 				tmp["projectscope"] = string(([]rune(ps))[:4000])
 			}
 			//对标的物为空处理
-			filetext := getFileText(tmp)
-			filetextS := filetext
-			if len([]rune(filetextS)) > 10 { //attach_text
-				tmp["filetext"] = filetext
-			}
+			//filetext := getFileText(tmp)
+			//filetextS := filetext
+			//if len([]rune(filetextS)) > 10 { //attach_text
+			//	tmp["filetext"] = filetext
+			//}
 			if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
 				delete(tmp, "purchasing")
 			}
@@ -212,7 +210,10 @@ func biddingTask() {
 						} else {
 							newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
 						}
-					} else if field == "_id" || field == "topscopeclass" { //不做处理
+					} else if field == "_id" {
+						newTmp[field] = qutil.ObjToString(tmp["id"])
+						newTmp["id"] = qutil.ObjToString(tmp["id"])
+					} else if field == "topscopeclass" { //不做处理
 						newTmp[field] = tmp[field]
 					} else if field == "publishtime" || field == "comeintime" {
 						//字段类型不正确,特别处理
@@ -240,7 +241,7 @@ func biddingTask() {
 					defer func() {
 						<-espool
 					}()
-					elastic.BulkSave(index, stype, &tmps, true)
+					Es.BulkSave(index, tmps)
 				}(tmps)
 				arrEs = []map[string]interface{}{}
 			}
@@ -254,7 +255,7 @@ func biddingTask() {
 	UpdatesLock.Lock()
 	if len(arrEs) > 0 {
 		tmps := arrEs
-		elastic.BulkSave(index, stype, &tmps, true)
+		Es.BulkSave(index, tmps)
 	}
 	UpdatesLock.Unlock()
 	log.Println("create biddingback index...over", n)

+ 10 - 8
es/config.json

@@ -1,14 +1,14 @@
 {
-  "uname": "root",
-  "upwd": "root",
+  "uname": "",
+  "upwd": "",
   "mongodb": {
-    "addr": "192.168.3.207:27092",
+    "addr": "127.0.0.1:27084",
     "pool": 10,
-    "db": "wjh"
+    "db": "oprd_data"
   },
   "bidding": {
-    "db": "wjh",
-    "collect": "oprd_bidding",
+    "db": "oprd_data",
+    "collect": "20230128bidding",
     "index": "oprd_bidding_v1",
     "type": "bidding",
     "extractdb": "wjh",
@@ -49,13 +49,15 @@
   "filelength": 50000,
   "detaillength": 50000,
   "project": {
-    "db": "wjh",
+    "db": "oprd_data",
     "collect": "oprd_projectset",
     "index": "oprd_projectset_v1",
     "type": "projectset"
   },
   "elastic": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://127.0.0.1:19805",
+    "user": "es_all",
+    "password": "TopJkO2E_d1x",
     "pool": 12
   }
 }

+ 423 - 0
es/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 1305 - 0
es/elasticutil.go

@@ -0,0 +1,1305 @@
+package main
+
+import (
+	"context"
+	"runtime"
+	"sync"
+	"time"
+	//	"bytes"
+	"encoding/json"
+	"fmt"
+	"log"
+	"qfw/util"
+	"qfw/util/mongodb"
+	mongodbutil "qfw/util/mongodbutil"
+	"reflect"
+
+	es "gopkg.in/olivere/elastic.v5"
+
+	"strconv"
+	"strings"
+)
+
+//检索库服务地址
+var (
+	addrs     []string
+	LocCity   = map[string]string{}
+	SIZE      = 15
+	pool      chan *es.Client
+	ntimeout  int
+	poolsize  = int32(20)
+	newesLock = &sync.Mutex{}
+	SR        = strings.Replace
+)
+
+const (
+	QStr = `{"query":{"bool":{"must":[$and],"must_not":[],
+	"should":[$or],"minimum_should_match" : 0}}}`
+)
+
+func InitElastic(addr string) {
+	InitElasticSize(addr, 5)
+}
+
+//n倍的池
+func InitElasticSize(addr string, size int) {
+	poolsize = int32(3 * size)
+	pool = make(chan *es.Client, poolsize)
+	for _, s := range strings.Split(addr, ",") {
+		addrs = append(addrs, s)
+	}
+	for i := 0; i < size; i++ {
+		client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
+		pool <- client
+	}
+}
+
+//关闭连接
+func DestoryEsConn(client *es.Client) {
+	select {
+	case pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+var (
+	lastTime     = int64(0)
+	lastTimeLock = &sync.Mutex{}
+)
+
+//获取连接
+func GetEsConn() *es.Client {
+	select {
+	case c := <-pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(pool))
+			client, err := es.NewClient(es.SetURL(addrs...),
+				es.SetMaxRetries(2), es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		ntimeout++
+		lastTimeLock.Lock()
+		defer lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - lastTime
+		if c > 6 {
+			lastTime = time.Now().Unix()
+			log.Println("add client..", len(pool))
+			c, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(addrs...), es.SetMaxRetries(2), es.SetSniff(false))
+					pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+//保存对象
+func Save(index, itype string, obj interface{}) bool {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	defer func() {
+		if r := recover(); r != nil {
+			log.Println("[E]", r)
+			for skip := 1; ; skip++ {
+				_, file, line, ok := runtime.Caller(skip)
+				if !ok {
+					break
+				}
+				go log.Printf("%v,%v\n", file, line)
+			}
+		}
+	}()
+	data := util.ObjToMap(obj)
+	_id := util.BsonIdToSId((*data)["_id"])
+	(*data)["id"] = _id
+	delete((*data), "_id")
+	_, err := client.Index().Index(index).Type(itype).Id(_id).BodyJson(data).Do(context.TODO())
+	if err != nil {
+		log.Println("保存到ES出错", err.Error(), obj)
+		return false
+	} else {
+		return true
+	}
+
+}
+
+//底层查询方法
+func Get(index, itype, query string) *[]map[string]interface{} {
+	//log.Println("query  -- ", query)
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 3000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(*hit.Source, &res[i])
+					if res[i] != nil {
+						res[i]["_id"] = hit.Id
+					}
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+func GetNoLimit(index, itype, query string) *[]map[string]interface{} {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(*hit.Source, &res[i])
+				if res[i] != nil {
+					res[i]["_id"] = hit.Id
+				}
+			}
+		}
+	}
+	return &res
+}
+
+//分页查询
+//{"name":"张三","$and":[{"age":{"$gt":10}},{"age":{"$lte":20}}]}
+//fields直接是 `"_id","title"`
+func GetPage(index, itype, query, order, field string, start, limit int) *[]map[string]interface{} {
+	return Get(index, itype, MakeQuery(query, order, field, start, limit))
+}
+
+func MakeQuery(query, order, fileds string, start, limit int) string {
+	res := AnalyQuery(query, "", QStr)
+	if len(res) > 10 {
+		res = SR(SR(SR(SR(res, ",$and", "", -1), "$and", "", -1), ",$or", "", -1), "$or", "", -1)
+		if len(fileds) > 0 {
+			//"_source":["account_number","balance"]
+			res = res[:len(res)-1] + `,"_source":[` + fileds + "]}"
+		}
+		//{"name":-1,"age":1}
+		if len(order) > 0 {
+			res = res[:len(res)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
+		}
+		if start > -1 {
+			res = res[:len(res)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		return res
+	}
+	return ""
+}
+
+//{"name":"aaa"}
+func AnalyQuery(query interface{}, parent string, result string) string {
+	m := make(map[string]interface{})
+	if q1, ok := query.(string); ok {
+		json.Unmarshal([]byte(q1), &m)
+	} else if q2, ok2 := query.(map[string]interface{}); ok2 {
+		m = q2
+	}
+	if len(parent) == 0 {
+		for k, v := range m {
+			if k == "$and" || k == "$or" {
+				temps := ""
+				if map1, ok := v.([]interface{}); ok {
+					for i := 0; i < len(map1); i++ {
+						temps += "," + AnalyQuery(map1[i], k, "")
+					}
+				}
+				if len(temps) > 0 {
+					temps = temps[1:]
+				}
+				result = SR(result, k, temps+","+k, 1)
+			} else {
+				switch reflect.TypeOf(v).String() {
+				case "string":
+					if strings.Index(k, "TERM_") == 0 {
+						result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
+					} else {
+						result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":"`+fmt.Sprintf("%v", v)+`"}},$and`, 1)
+					}
+				case "int", "int8", "int32", "int64", "float32", "float64":
+					if strings.Index(k, "TERM_") == 0 {
+						result = SR(result, "$and", `{"term":{"`+SR(k, "TERM_", "", 1)+`":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
+					} else {
+						result = SR(result, "$and", `{"query_string":{"default_field":"`+k+`","query":`+fmt.Sprintf("%v", v)+`}},$and`, 1)
+					}
+				default:
+					result = SR(result, "$and", AnalyQuery(v, k, "")+",$and", 1)
+				}
+			}
+		}
+		return result
+	} else {
+		for k, v := range m {
+			if k == "$in" {
+				s := ""
+				if map1, ok := v.([]interface{}); ok {
+					for i := 0; i < len(map1); i++ {
+						s += "," + `"` + fmt.Sprintf("%v", map1[i]) + `"`
+					}
+				}
+				if len(s) > 0 {
+					s = s[1:]
+				}
+				return `{"terms":{"` + parent + `":[` + s + `]}}`
+			} else if strings.Contains(k, "$lt") || strings.Contains(k, "$gt") {
+				return `{"range":{"` + parent + `":{"` + SR(k, "$", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}}`
+			} else {
+				switch reflect.TypeOf(v).String() {
+				case "string":
+					if strings.Index(k, "TERM_") == 0 {
+						return `{"term":{"` + SR(k, "TERM_", "", 1) + `":"` + fmt.Sprintf("%v", v) + `"}}`
+					} else {
+						return `{"query_string":{"default_field":"` + k + `","query":"` + fmt.Sprintf("%v", v) + `"}}`
+					}
+				case "int", "int8", "int32", "int64", "float32", "float64":
+					if strings.Index(k, "TERM_") == 0 {
+						return `{"term":{"` + SR(k, "TERM_", "", 1) + `":` + fmt.Sprintf("%v", v) + `}}`
+					} else {
+						return `{"query_string":{"default_field":"` + k + `","query":` + fmt.Sprintf("%v", v) + `}}`
+					}
+				default:
+					return AnalyQuery(v, k, result)
+				}
+			}
+		}
+	}
+	return result
+}
+
+func GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		query := `{"query":{"term":{"_id":"` + id + `"}}`
+		if len(fields) > 0 {
+			query = query + `,"_source":[` + fields + `]`
+		}
+		query = query + "}"
+		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do(context.TODO())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		var res map[string]interface{}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum == 1 {
+				res = make(map[string]interface{})
+				for _, hit := range searchResult.Hits.Hits {
+					json.Unmarshal(*hit.Source, &res)
+					if res != nil {
+						res["_id"] = hit.Id
+					}
+				}
+				return &res
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+
+		}
+	}
+	return nil
+}
+
+//根据id来查询文档
+func GetById(index, itype string, ids ...string) *[]map[string]interface{} {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		query := es.NewIdsQuery().Ids(ids...)
+		searchResult, err := client.Search().Index(index).Type(itype).Query(query).Do(context.TODO())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					json.Unmarshal(*hit.Source, &res[i])
+					if res[i] != nil {
+						res[i]["_id"] = hit.Id
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+
+		}
+	}
+	return &res
+}
+
+//elastic_v5之后查询语句适配
+func StringToQuery(q string) es.Query {
+	defer util.Catch()
+	m := map[string]interface{}{}
+	err := json.Unmarshal([]byte(q), &m)
+	if err == nil {
+		q, _ := m["query"].(map[string]interface{})
+		return AQ(q)
+	}
+	return nil
+}
+
+func AQ(m map[string]interface{}) (bq es.Query) {
+	for k, v := range m {
+		if k == "bool" {
+			bq1 := es.NewBoolQuery()
+			b1, _ := v.(map[string]interface{})
+			if b1["must"] != nil {
+				bm, _ := b1["must"].([]interface{})
+				for _, tm := range bm {
+					tm1, _ := tm.(map[string]interface{})
+					if tm1 != nil {
+						bq1.Must(AQ(tm1))
+					}
+				}
+			}
+			if b1["must_not"] != nil {
+				bm, _ := b1["must_not"].([]interface{})
+				for _, tm := range bm {
+					tm1, _ := tm.(map[string]interface{})
+					if tm1 != nil {
+						bq1.MustNot(AQ(tm1))
+					}
+				}
+			}
+			if b1["should"] != nil {
+				bm, _ := b1["should"].([]interface{})
+				for _, tm := range bm {
+					tm1, _ := tm.(map[string]interface{})
+					if tm1 != nil {
+						bq1.Should(AQ(tm1))
+					}
+				}
+			}
+			if b1["minimum_should_match"] != nil {
+				bq1.MinimumNumberShouldMatch(util.IntAll(b1["minimum_should_match"]))
+			}
+			bq = bq1
+		} else if k == "term" {
+			b1, _ := v.(map[string]interface{})
+			for k, v := range b1 {
+				bq = es.NewTermQuery(k, v)
+				break
+			}
+		} else if k == "terms" {
+			b1, _ := v.(map[string]interface{})
+			for k, v := range b1 {
+				vs, _ := v.([]interface{})
+				bq = es.NewTermsQuery(k, vs...)
+				break
+			}
+		} else if k == "range" {
+			b1, _ := v.(map[string]interface{})
+			for k, v := range b1 {
+				vm := v.(map[string]interface{})
+				bq1 := es.NewRangeQuery(k)
+				if vm["$gt"] != nil {
+					bq1.Gt(vm["$gt"])
+				}
+				if vm["$gte"] != nil {
+					bq1.Gte(vm["$gte"])
+				}
+				if vm["$lt"] != nil {
+					bq1.Lt(vm["$lt"])
+				}
+				if vm["$lte"] != nil {
+					bq1.Lte(vm["$lte"])
+				}
+				break
+			}
+		}
+	}
+	return
+}
+
+//删除某个索引,根据查询
+func Del(index, itype string, query interface{}) bool {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	b := false
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var err error
+		if qs, ok := query.(string); ok {
+			//by, e := json.Marshal(qs)
+			esq := StringToQuery(qs)
+			if esq != nil {
+				_, err = client.DeleteByQuery().Index(index).Type(itype).Query(esq).Do(context.TODO())
+			}
+		} else if qi, ok2 := query.(es.Query); ok2 {
+			_, err = client.DeleteByQuery().Index(index).Type(itype).Query(qi).Do(context.TODO())
+		}
+		if err != nil {
+			log.Println("删除索引出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+//根据语句更新对象
+func Update(index, itype, id string, updateStr string) bool {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	b := false
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var err error
+		esc := es.NewScript(updateStr)
+		esc.Lang("groovy")
+		_, err = client.Update().Index(index).Type(itype).Id(id).Script(esc).Do(context.TODO())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func BulkUpdate(index, itype string, ids []string, updateStr string) {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		for _, id := range ids {
+			esc := es.NewScript(updateStr)
+			esc.Lang("groovy")
+			_, err := client.Update().Index(index).Type(itype).Id(id).Script(esc).Do(context.TODO())
+			if err != nil {
+				log.Println("更新检索出错:", err.Error())
+			}
+		}
+	}
+}
+
+//根据id删除索引对象
+func DelById(index, itype, id string) bool {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	b := false
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.TODO())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+//先删除后增
+func UpdateNewDoc(index, itype string, obj ...interface{}) bool {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	b := false
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var err error
+		for _, v := range obj {
+			tempObj := util.ObjToMap(v)
+			if tempObj == nil || len(*tempObj) == 0 {
+				continue
+			}
+			id := util.BsonIdToSId((*tempObj)["_id"])
+			(*tempObj)["id"] = id
+			delete(*tempObj, "_id")
+			if id != "" {
+				client.Delete().Index(index).Type(itype).Id(id).Do(context.TODO())
+			}
+			_, err = client.Index().Index(index).Type(itype).Id(id).BodyJson(tempObj).Do(context.TODO())
+			if err != nil {
+				log.Println("保存到ES出错", err.Error())
+			} else {
+				b = true
+			}
+		}
+
+	}
+	return b
+}
+
+func UpdateEntDoc(id string) bool {
+	b := false
+	map2 := map[string]interface{}{}
+	util.ReadConfig(&map2)
+	ent := mongodbutil.FindById("enterprise", map2["entMongodbAlias"].(string), map2["entMongodbName"].(string), id, "")
+	_ent := mongodb.FindById("enterprise", id, "")
+	if _ent != nil && len(*_ent) > 0 {
+		for k, v := range *_ent {
+			(*ent)[k] = v
+		}
+	}
+	if ent != nil {
+		b = UpdateNewDoc("enterprise", "enterprise", ConverData(ent))
+	}
+	return b
+}
+
+//把地市代码转为地市
+func getLoc(code string, res *map[string]string) (loc string) {
+	switch len(code) {
+	case 6:
+		loc = (*res)[code[:2]] + " " + (*res)[code[:4]] + " " + (*res)[code]
+		break
+	case 4:
+		loc = (*res)[code[:2]] + " " + (*res)[code]
+		break
+	case 2:
+		loc = (*res)[code]
+		break
+	}
+	return
+}
+
+//把地市代码转为地市
+func Loop(m interface{}, res *map[string]string) {
+	m1, ok := m.([]interface{})
+	if !ok {
+		m2, _ := m.([]map[string]interface{})
+		for i := 0; i < len(m2); i++ {
+			ms := m2[i]
+			(*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
+			s := ms["s"]
+			if nil != s {
+				mss, _ := s.([]interface{})
+				if nil != mss {
+					Loop(mss, res)
+				}
+			}
+		}
+	} else {
+		for i := 0; i < len(m1); i++ {
+			ms, _ := m1[i].(map[string]interface{})
+			(*res)[fmt.Sprintf("%1.0f", ms["k"])] = fmt.Sprintf("%s", ms["n"])
+			s := ms["s"]
+			if nil != s {
+				mss, _ := s.([]interface{})
+				if nil != mss {
+					Loop(mss, res)
+				}
+			}
+		}
+	}
+}
+
+func ConverData(ent *map[string]interface{}) map[string]interface{} {
+	tmp := *ent
+	id64, _ := tmp["ID"].(int64)
+	ids := fmt.Sprintf("%d", id64)
+	tmp2 := make(map[string]interface{})
+	tmp2["ID"] = ids
+	tmp2["_id"] = tmp["_id"]
+	tmp2["id"] = tmp["_id"]
+	tmp2["Area"] = tmp["Area"]
+	tmp2["LeRep"] = tmp["LeRep"]
+	tmp2["RegNo"] = tmp["RegNo"]
+	tmp2["EntType"] = tmp["EntType"]
+	tmp2["EntName"] = tmp["EntName"]
+	tmp2["EntTypeName"] = tmp["EntTypeName"]
+	tmp2["Dom"] = tmp["Dom"]
+	tmp2["EstDate"] = tmp["EstDate"]
+	tmp2["OpStateName"] = tmp["OpStateName"]
+	tmp2["OpScope"] = tmp["OpScope"]
+	tmp2["OpState"] = tmp["OpState"]
+	tmp2["s_submitid"] = tmp["s_submitid"]
+	tmp2["l_submittime"] = tmp["l_submittime"]
+	tmp2["s_submitname"] = tmp["s_submitname"]
+	tmp2["RegCapCurName"] = tmp["RegCapCurName"]
+	//增加营业状态排序
+	if tmp2["OpState"] == "06" {
+		tmp2["OpSint"] = true
+	} else {
+		tmp2["OpSint"] = false
+	}
+	tmp2["OpLocDistrict"] = tmp["OpLocDistrict"]
+	//增加代码转名称
+	tmpLoc, _ := tmp["OpLocDistrict"].(string)
+	tmp2["OpLocDistrictName"] = getLoc(tmpLoc, &LocCity)
+
+	tmp2["RecCap"] = tmp["RecCap"]
+	tmp2["RegCap"] = tmp["RegCap"]
+	tmp2["IndustryPhy"] = tmp["IndustryPhy"]
+	tmp2["IndustryPhyName"] = tmp["IndustryPhyName"]
+	tmp2["RegOrg"] = tmp["RegOrg"]
+	tmp2["RegOrgName"] = tmp["RegOrgName"]
+	tmp2["Tel"] = tmp["Tel"]
+	tmp2["CompForm"] = tmp["CompForm"]
+	tmp2["CompFormName"] = tmp["CompFormName"]
+	//增加异常名录标记 Ycml可能是bool也可能是string
+	Ycmlb, _ := tmp["Ycml"].(bool)
+	Ycmls, _ := tmp["Ycml"].(string)
+	if Ycmlb || Ycmls == "1" {
+		tmp2["Ycml"] = true
+	} else {
+		tmp2["Ycml"] = false
+	}
+	//增加年报联系信息
+	if tmp["Nb_email"] != nil {
+		tmp2["Nb_email"] = tmp["Nb_email"]
+	}
+	if tmp["Nb_tel"] != nil {
+		tmp2["Nb_tel"] = tmp["Nb_tel"]
+	}
+	if tmp["Nb_addr"] != nil {
+		tmp2["Nb_addr"] = tmp["Nb_addr"]
+	}
+
+	s_synopsis := tmp["s_synopsis"]
+	if s_synopsis == nil {
+		s_synopsis = ""
+	}
+	tmp2["s_synopsis"] = s_synopsis //企业简介
+
+	//股东
+	stock := getStock(tmp["investor"])
+	tmp2["stock"] = stock
+
+	tmp2["LegCerNO"] = tmp["LegCerNO"]
+	if tmp["s_microwebsite"] != nil {
+		tmp2["s_microwebsite"] = tmp["s_microwebsite"]
+	}
+
+	tmp2["SourceType"] = tmp["SourceType"] //数据来源
+	s_servicenames := tmp["s_servicenames"]
+	if s_servicenames == nil {
+		s_servicenames = ""
+	}
+	tmp2["s_servicenames"] = s_servicenames //服务名称
+	s_action := tmp["s_action"]
+	if s_action == nil {
+		s_action = "N"
+	}
+	tmp2["s_action"] = s_action
+	tmp2["s_persion"] = tmp["s_persion"]
+	tmp2["s_mobile"] = tmp["s_mobile"]
+	tmp2["s_enturl"] = tmp["s_enturl"]
+	tmp2["s_weixin"] = tmp["s_weixin"]
+	tmp2["s_avatar"] = tmp["s_avatar"]
+	return tmp2
+}
+
+func getStock(obj interface{}) string {
+	stock := ""
+	if ns, ok := obj.([]interface{}); ok {
+		stock = " "
+		for _, ns1 := range ns {
+			if nn, ok1 := ns1.(map[string]interface{}); ok1 {
+				tmp := fmt.Sprintf("%s", nn["Inv"])
+				if strings.Index(stock, tmp) < 0 {
+					stock += tmp + " "
+				}
+			}
+		}
+	}
+	return stock
+}
+
+func BulkSave(index, itype string, obj *[]map[string]interface{}, isDelBefore bool) {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		req := client.Bulk()
+		for _, v := range *obj {
+			if v == nil || len(v) == 0 {
+				continue
+			}
+			_id := util.BsonIdToSId(v["_id"])
+			v["id"] = _id
+			delete(v, "_id")
+			if isDelBefore && _id != "" {
+				req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(_id))
+			}
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Type(itype).Id(_id).Doc(v))
+		}
+		_, err := req.Do(context.TODO())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+func Count(index, itype string, query interface{}) int64 {
+	client := GetEsConn()
+	defer DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qs, ok := query.(string); ok {
+			temp := &es.BoolQuery{
+				QueryStrings: qs,
+			}
+			qq = temp
+		} else if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Type(itype).Query(qq).Do(context.TODO())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//ngram精确查询
+/*
+{
+  "query": {
+    "bool": {
+      "should": [
+        {
+	"bool":{
+	  "must":[
+	  {     "multi_match": {
+            "query": "智能",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+        },{
+          "multi_match": {
+            "query": "机器",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+        },{
+          "multi_match": {
+            "query": "2016",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+	  }
+	  ]
+	}
+        },
+
+{
+	"bool":{
+	  "must":[
+	  {          "multi_match": {
+            "query": "河南",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+        },{
+          "multi_match": {
+            "query": "工商",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+        },{
+          "multi_match": {
+            "query": "2016",
+            "type": "phrase",
+            "fields": [
+              "title"
+            ],
+           "analyzer": "my_ngram"
+          }
+	  }
+	  ]
+	}
+        }
+      ],"minimum_should_match": 1
+    }
+  },
+  "_source": [
+    "_id",
+    "title"
+  ],
+  "from": 0,
+  "size": 10,
+  "sort": [{
+      "publishtime": "desc"
+    }]
+}
+
+*/
+//"2016+智能+办公,"河南+工商"
+//["2016+智能+办公","河南+工商"]
+//QStr = `{"query":{"bool":{should":[$or],"minimum_should_match" : 1}}}`
+//{"bool":{"must":[]}}
+//{"multi_match": {"query": "$word","type": "phrase", "fields": [$field],"analyzer": "my_ngram"}}
+//"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {"detail": {"fragment_size": 1,"number_of_fragments": 1},"title": {"fragment_size": 1,"number_of_fragments": 1}}}
+const (
+	//此处最后少一个},正好NgramStr取[1:]多一个}
+	FilterQuery     = `{"query": {"bool": {"filter": [%s]}},%s`
+	NgramStr        = `{"query":{"bool":{"must":[%s],"should":[%s],"minimum_should_match" : 0}}}`
+	NgramMust       = `{"bool":{"must":[%s]}}`
+	NgramMustAndNot = `{"bool":{"must":[%s],"must_not":[%s]}}`
+	minq            = `{"multi_match": {"query": "%s","type": "phrase", "fields": [%s],"analyzer": "my_ngram"}}`
+	HL              = `"highlight": {"pre_tags": [""],"post_tags": [""],"fields": {%s}}`
+	highlightStr    = `%s: {"fragment_size": %d,"number_of_fragments": 1}`
+
+	FilterQuery_New  = `{"query":{"bool":{"must": [%s%s%s],"should":[]}}}`
+	MatchQueryString = `{"match": {%s: { "query":"%s", "operator": "and"}}}`
+	HL_New           = `"highlight": {"pre_tags": ["<HL>"],"post_tags": ["<HL>"],"fields": {%s}}`
+)
+
+//替换了"号
+func GetNgramQuery(query interface{}, mustquery, findfields string) (qstr string) {
+	var words []string
+	if q, ok := query.(string); ok {
+		if q != "" {
+			words = strings.Split(q, ",")
+		}
+	} else if q, ok := query.([]string); ok {
+		words = q
+	} else if q, ok := query.([]interface{}); ok {
+		words = util.ObjArrToStringArr(q)
+	}
+	if words != nil {
+		new_minq := fmt.Sprintf(minq, "%s", findfields)
+		musts := []string{}
+		for _, qs_words := range words {
+			qws := strings.Split(qs_words, "+")
+			mq := []string{}
+			for _, qs_word := range qws {
+				mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
+			}
+			musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
+		}
+		qstr = fmt.Sprintf(NgramStr, mustquery, strings.Join(musts, ","))
+		//log.Println("ngram-query", qstr)
+	} else {
+		qstr = fmt.Sprintf(NgramStr, mustquery, "")
+	}
+	return
+}
+
+func GetNgramQuery_New(querystring, querymust interface{}, must, findfields string) (qstring string) {
+	querymust_string := ""
+	var wordsMust []string
+	if q, ok := querymust.(string); ok {
+		if q != "" {
+			wordsMust = strings.Split(q, ",")
+		}
+	} else if q, ok := querymust.([]string); ok {
+		wordsMust = q
+	} else if q, ok := querymust.([]interface{}); ok {
+		wordsMust = util.ObjArrToStringArr(q)
+	}
+	if wordsMust != nil {
+		new_minq := fmt.Sprintf(minq, "%s", findfields)
+		musts := []string{}
+		for _, qs_wordsMust := range wordsMust {
+			qws := strings.Split(qs_wordsMust, "+")
+			mq := []string{}
+			for _, qs_word := range qws {
+				mq = append(mq, fmt.Sprintf(new_minq, qs_word))
+			}
+			musts = append(musts, fmt.Sprintf(NgramMust, strings.Join(mq, ",")))
+		}
+		querymust_string = strings.Join(musts, ",")
+	}
+	//log.Println("must", must, querymust_string)
+
+	//querystring---------------------------------------------
+	query_string := ""
+	var querysShold []string
+	if q, ok := querystring.(string); ok {
+		if q != "" {
+			querysShold = strings.Split(q, ",")
+		}
+	} else if q, ok := querystring.([]string); ok {
+		querysShold = q
+	} else if q, ok := querystring.([]interface{}); ok {
+		querysShold = util.ObjArrToStringArr(q)
+	}
+	if querysShold != nil {
+		for k, name := range strings.Split(findfields, ",") {
+			for _, qs_querysShold := range querysShold {
+				if k > 0 {
+					query_string = query_string + "," + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
+				} else {
+					query_string = query_string + fmt.Sprintf(MatchQueryString, fmt.Sprint(name), qs_querysShold)
+				}
+			}
+		}
+	}
+	//log.Println("querystring", query_string)
+	if querymust_string == "" {
+		qstring = fmt.Sprintf(FilterQuery_New, must, query_string, querymust_string)
+	} else {
+		qstring = fmt.Sprintf(FilterQuery_New, must, query_string, ","+querymust_string)
+	}
+	return
+}
+func GetByNgram(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int) *[]map[string]interface{} {
+	return GetByNgramAll(index, itype, query, mustquery, findfields, order, fields, start, limit, false, false)
+}
+
+//增加高亮、过滤查询、高亮截取字数
+func GetByNgramOther(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool, count int) *[]map[string]interface{} {
+	defer util.Catch()
+	qstr := ""
+	if mustquery != "" && filtermode {
+		qstr = GetNgramQuery(query, "", findfields)
+		qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
+	} else {
+		qstr = GetNgramQuery(query, mustquery, findfields)
+	}
+	if qstr != "" {
+		if highlight {
+			ws := []string{}
+			for _, w := range strings.Split(findfields, ",") {
+				ws = append(ws, fmt.Sprintf(highlightStr, w, count))
+			}
+			qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
+		}
+		if len(fields) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
+		}
+		if len(order) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
+		}
+		if start > -1 {
+			qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		//log.Println("ngram-find", qstr)
+		return Get(index, itype, qstr)
+	} else {
+		return nil
+	}
+}
+
+//增加高亮、过滤查询
+//替换了"号
+func GetByNgramAll(index, itype string, query interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
+	defer util.Catch()
+	qstr := ""
+	if mustquery != "" && filtermode {
+		qstr = GetNgramQuery(query, "", findfields)
+		qstr = fmt.Sprintf(FilterQuery, mustquery, qstr[1:])
+	} else {
+		qstr = GetNgramQuery(query, mustquery, findfields)
+	}
+	if qstr != "" {
+		if highlight {
+			ws := []string{}
+			for _, w := range strings.Split(findfields, ",") {
+				ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
+			}
+			qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
+		}
+		if len(fields) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
+		}
+		if len(order) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
+		}
+		if start > -1 {
+			qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		//	log.Println("ngram-find", qstr)
+		return Get(index, itype, qstr)
+	} else {
+		return nil
+	}
+}
+
+//增加高亮、过滤查询
+func GetByNgramAll_New(index, itype string, querystring, querymust interface{}, mustquery, findfields, order, fields string, start, limit int, highlight bool, filtermode bool) *[]map[string]interface{} {
+	defer util.Catch()
+	qstr := ""
+	if filtermode {
+		qstr = GetNgramQuery_New(querystring, querymust, mustquery, findfields)
+	} else {
+		qstr = GetNgramQuery_New(querystring, "", mustquery, findfields)
+	}
+	if qstr != "" {
+		if highlight {
+			ws := []string{}
+			for _, w := range strings.Split(findfields, ",") {
+				ws = append(ws, w+`:{"force_source": true}`)
+			}
+			qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL_New, strings.Join(ws, ",")) + `}`
+		}
+		if len(fields) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
+		}
+		if len(order) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", ",", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
+		}
+		if start > -1 {
+			qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		//log.Println("ngram-find", order, qstr)
+		return Get(index, itype, qstr)
+	} else {
+		return nil
+	}
+}
+
+type KeyConfig struct {
+	Keys      []string `json:"key"`
+	NotKeys   []string `json:"notkey"`
+	InfoTypes []string `json:"infotype"`
+	Areas     []string `json:"area"`
+}
+
+//替换了"号
+func GetResForJY(index, itype string, keys []KeyConfig, allquery, findfields, SortQuery, fields string, start, limit int) *[]map[string]interface{} {
+	if len(keys) > 0 {
+		qstr := ""
+		new_minq := fmt.Sprintf(minq, "%s", findfields)
+		not_new_minq := fmt.Sprintf(minq, "%s", `"title"`) //排除词只查询标题
+		musts := []string{}
+		for _, qs_words := range keys {
+			mq := []string{}
+			notmq := []string{}
+			for _, qs_word := range qs_words.Keys {
+				mq = append(mq, fmt.Sprintf(new_minq, ReplaceYH(qs_word)))
+			}
+			for _, qs_word := range qs_words.NotKeys {
+				notmq = append(notmq, fmt.Sprintf(not_new_minq, ReplaceYH(qs_word)))
+			}
+			if len(qs_words.Areas) > 0 {
+				mq = append(mq, fmt.Sprintf(`{"terms":{"area":["%s"]}}`, strings.Join(qs_words.Areas, `","`)))
+			}
+			if len(qs_words.InfoTypes) > 0 {
+				mq = append(mq, fmt.Sprintf(`{"terms":{"toptype":["%s"]}}`, strings.Join(qs_words.InfoTypes, `","`)))
+			}
+			musts = append(musts, fmt.Sprintf(NgramMustAndNot, strings.Join(mq, ","), strings.Join(notmq, ",")))
+		}
+		qstr = fmt.Sprintf(NgramStr, "", strings.Join(musts, ","))
+
+		qstr = fmt.Sprintf(FilterQuery, allquery, qstr[1:])
+		ws := []string{}
+		for _, w := range strings.Split(findfields, ",") {
+			ws = append(ws, fmt.Sprintf(highlightStr, w, 1))
+		}
+		qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
+		if len(fields) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
+		}
+		if len(SortQuery) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"sort":` + SortQuery + `}`
+		}
+		if start > -1 {
+			qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		//log.Println("jy-ngram-find", qstr)
+		return Get(index, itype, qstr)
+	} else {
+		return nil
+	}
+}
+
+func ReplaceYH(src string) (rpl string) {
+	return strings.Replace(src, `"`, `\"`, -1)
+}
+
+//
+func GetAllByNgram(index, itype, qstr, findfields, order, fields string, start, limit, count int, highlight bool) *[]map[string]interface{} {
+	if qstr != "" {
+		if highlight {
+			ws := []string{}
+			for _, w := range strings.Split(findfields, ",") {
+				ws = append(ws, fmt.Sprintf(highlightStr, w, count))
+			}
+			qstr = qstr[:len(qstr)-1] + `,` + fmt.Sprintf(HL, strings.Join(ws, ",")) + `}`
+		}
+		if len(fields) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"_source":[` + fields + "]}"
+		}
+		if len(order) > 0 {
+			qstr = qstr[:len(qstr)-1] + `,"sort":[` + SR(SR(SR(SR(order, ",", "},{", -1), " ", "", -1), ":-1", `:"desc"`, -1), ":1", `:"asc"`, -1) + `]}`
+		}
+		if start > -1 {
+			qstr = qstr[:len(qstr)-1] + `,"from":` + strconv.Itoa(start) + `,"size":` + strconv.Itoa(limit) + "}"
+		}
+		//log.Println("GetAllByNgram:", qstr)
+		return Get(index, itype, qstr)
+	} else {
+		return nil
+	}
+}

+ 59 - 6
es/main.go

@@ -5,8 +5,8 @@ import (
 	"mongodb"
 	_ "net/http/pprof"
 	"qfw/util"
-	elastic "qfw/util/elastic"
 	"strings"
+	"time"
 )
 
 var (
@@ -26,6 +26,12 @@ var (
 	fileLength               = 50000
 	savesize                 = 500
 
+	Es *Elastic
+
+	EsBulkSize        int
+	saveProjectEsPool chan map[string]interface{}
+	saveProjectSp     chan bool
+
 	bidding, project map[string]interface{}
 )
 var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
@@ -43,14 +49,20 @@ func init() {
 		MongodbAddr: mconf["addr"].(string),
 		Size:        util.IntAllDef(mconf["pool"], 5),
 		DbName:      mconf["db"].(string),
-		//UserName:	 Sysconfig["uname"].(string),
-		//Password:    Sysconfig["upwd"].(string),
+		UserName:    Sysconfig["uname"].(string),
+		Password:    Sysconfig["upwd"].(string),
 	}
 	mgo.InitPool()
 
 	//初始化es
 	econf := Sysconfig["elastic"].(map[string]interface{})
-	elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
+	Es = &Elastic{
+		S_esurl:  util.ObjToString(econf["addr"]), //http://172.17.4.184:19800
+		I_size:   util.IntAll(econf["size"]),
+		Username: util.ObjToString(econf["user"]),
+		Password: util.ObjToString(econf["password"]),
+	}
+	Es.InitElasticSize()
 
 	//
 	if bidding["indexfields"] != nil {
@@ -102,15 +114,56 @@ func init() {
 	log.Println(projectinfoFields)
 	log.Println(purchasinglistFields)
 
+	EsBulkSize = 200
+	saveProjectEsPool = make(chan map[string]interface{}, 5000)
+	saveProjectSp = make(chan bool, 5)
+
 	//初始化oss
 	InitOss()
 
 }
 
 func main() {
+	go SaveProjectEs()
+
+	//biddingTask()
+	projectTask()
 
-	biddingTask()
-	//projectTask()
 	ch := make(chan bool, 1)
 	<-ch
 }
+
+func SaveProjectEs() {
+	arru := make([]map[string]interface{}, EsBulkSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProjectEsPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == EsBulkSize {
+				saveProjectSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProjectSp
+					}()
+					Es.BulkSave("oprd_projectset_v1", arru)
+				}(arru)
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProjectSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProjectSp
+					}()
+					Es.BulkSave("oprd_projectset_v1", arru)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, EsBulkSize)
+				indexu = 0
+			}
+		}
+	}
+}

+ 7 - 29
es/projectindex.go

@@ -3,8 +3,8 @@ package main
 import (
 	"log"
 	"math"
+	"mongodb"
 	"qfw/util"
-	"qfw/util/elastic"
 	"strconv"
 )
 
@@ -15,17 +15,13 @@ func projectTask() {
 	c, _ := project["collect"].(string)
 	db, _ := project["db"].(string)
 	index, _ := project["index"].(string)
-	itype, _ := project["type"].(string)
 	count, _ := session.DB(db).C(c).Find(nil).Count()
-	savepool := make(chan bool, 5)
 	log.Println(db, c, "查询语句:", nil, "同步总数:", count, "elastic库:", index)
 	query := session.DB(db).C(c).Find(nil).Iter()
-	arr := make([]map[string]interface{}, savesize)
-	var n int
 	i := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); i++ {
-		if n%5000 == 0 {
-			log.Println("current---------", n)
+		if i%5000 == 0 {
+			log.Println("current---------", i)
 		}
 		pp := map[string]map[string]interface{}{}
 		if packages, ok := tmp["package"].(map[string]interface{}); ok {
@@ -149,29 +145,11 @@ func projectTask() {
 			tmp["projectscope"] = string(projectscopeRune[:1000])
 		}
 		tmp["s_projectname"] = tmp["projectname"]
-		arr[i] = tmp
-		n++
-		if i == savesize-1 {
-			savepool <- true
-			tmps := arr
-			go func(tmpn *[]map[string]interface{}) {
-				defer func() {
-					<-savepool
-				}()
-				elastic.BulkSave(index, itype, tmpn, true)
-			}(&tmps)
-			i = 0
-			arr = make([]map[string]interface{}, savesize)
-		}
-		if n%savesize == 1000 {
-			log.Println("当前:", n)
-		}
+		tmp["_id"] = mongodb.BsonIdToSId(tmp["_id"])
+		tmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
+		saveProjectEsPool <- tmp
 		tmp = make(map[string]interface{})
 	}
 
-	if i > 0 {
-		//util.Debug(arr)
-		elastic.BulkSave(index, itype, &arr, true)
-	}
-	log.Println("create project index...over", n)
+	log.Println("create project index...over", i)
 }

+ 3 - 3
export_excel/config.json

@@ -4,17 +4,17 @@
   "dbcoll": "oprd_projectset",
   "dbsize": 5,
   "fields": {
-    "buyer": "采购单位名称",
-    "s_subscopeclass": "合同分类",
     "area": "省份",
     "city": "城市",
     "title": "公告标题",
+    "publishtime": "发布时间",
+    "buyer": "采购单位名称",
+    "s_subscopeclass": "合同分类",
     "bidstatus": "项目状态",
     "jyhref1": "项目状态剑鱼链接",
     "bidtype": "招标类型",
     "jyhref2": "招标公告剑鱼链接",
     "tag_rule": "项目分类(ICT/综合布线/其他)",
-    "publishtime": "发布时间",
     "project_duration": "工期时长",
     "project_timeunit": "工期单位",
     "projectname": "项目名称",

+ 2 - 2
import_execl/main.go

@@ -116,7 +116,7 @@ func updateMethod() {
 					defer func() {
 						<-updateSp
 					}()
-					Mgo.UpSertBulk(DbColl, arru...)
+					Mgo.UpdateBulk(DbColl, arru...)
 				}(arru)
 				arru = make([][]map[string]interface{}, 200)
 				indexu = 0
@@ -128,7 +128,7 @@ func updateMethod() {
 					defer func() {
 						<-updateSp
 					}()
-					Mgo.UpSertBulk(DbColl, arru...)
+					Mgo.UpdateBulk(DbColl, arru...)
 				}(arru[:indexu])
 				arru = make([][]map[string]interface{}, 200)
 				indexu = 0

+ 4 - 4
project_data/config.json

@@ -2,12 +2,12 @@
   "loadStart": 0,
   "validdays":150,
   "statusdays": 15,
-  "mongodbServers": "192.168.3.207:27092",
+  "mongodbServers": "192.168.3.166:27082",
   "mongodbPoolSize": 10,
-  "mongodbName": "wjh",
+  "mongodbName": "guangdongyidong",
   "hints":"publishtime_1",
-  "extractColl": "extract",
-  "projectColl": "projectset-test",
+  "extractColl": "20230128bidding",
+  "projectColl": "oprd_projectset",
   "backupFlag": true,
   "siteColl": "site",
   "thread": 1,

+ 8 - 2
project_data/init.go

@@ -298,6 +298,9 @@ type Info struct {
 	Budget              float64                  `json:"budget"`
 	Bidamount           float64                  `json:"bidamount"`
 	TagRule             string                   `json:"tag_rule"`
+	TopBuyerclass       string                   `json:"top_buyerclass"`
+	FirstTag            string                   `json:"first_tag"`
+	SecondTag           string                   `json:"second_tag"`
 	Winners             []string
 	dealtype            int
 	PTC                 string //从标题中抽的项目编号
@@ -354,8 +357,11 @@ type ProjectInfo struct {
 	BidGuarantee       bool                   `json:"bid_guarantee"`           //投标保证金 是否支持包含
 	Qualifies          string                 `json:"qualifies"`               //资质条件
 	TagRule            string                 `json:"tag_rule"`                //数据标签
-	IsOperators        bool                   `json:"isOperators"`             //是否是运营商
-	EntIdList          []string               `json:"entidlist"`               //企业id
+	TopBuyerclass      string                 `json:"top_buyerclass"`
+	FirstTag           string                 `json:"first_tag"`
+	SecondTag          string                 `json:"second_tag"`
+	IsOperators        bool                   `json:"isOperators"` //是否是运营商
+	EntIdList          []string               `json:"entidlist"`   //企业id
 	score              int
 	comStr             string
 	resVal, pjVal      int

+ 2 - 0
project_data/load_data.go

@@ -32,6 +32,8 @@ func (p *ProjectTask) loadData(starttime int64) {
 				}
 				if tmp != nil {
 					id := tmp.Id.Hex()
+					tmp.ProjectName = filterReg.ReplaceAllString(tmp.ProjectName, "") //清理
+					tmp.ProjectCode = filterReg.ReplaceAllString(tmp.ProjectCode, "")
 					for _, v := range append([]string{tmp.ProjectName}, tmp.MPN...) {
 						if v != "" {
 							//v = pcReplace.ReplaceAllString(v, "")

+ 99 - 55
project_data/project.go

@@ -124,46 +124,41 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 		compareProject.score = 0
 		//问题出地LastTime!!!!!
 		diffTime := int64(math.Abs(float64(info.Publishtime - compareProject.LastTime)))
-		qu.Debug(diffTime, p.validTime)
-		if diffTime <= p.validTime {
-			//代理机构完全不相同,直接新建项目
-			if CheckContain(compareProject.Agency, info.Agency) == 3 {
-				continue
-			}
-			qu.Debug(diffTime)
-			//地区(省、市、区)不同,直接新建项目
-			if ComparePlace(compareProject, info) {
-				continue
-			}
-			qu.Debug(1)
-			info.PNBH = 0
-			info.PCBH = 0
-			info.PTCBH = 0
-			compareStr, score := comparePNC(info, compareProject)
-			qu.Debug(compareStr, score)
-			resVal, pjVal := Select(compareStr, info, compareProject)
-			//---------------------------------------
-			//log.Println(resVal, pjVal, compareProject)
-			if resVal > 0 {
-				compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount, score2 := p.compareBCTABB(info, compareProject, diffTime, score)
+		//if diffTime <= p.validTime {
+		//代理机构完全不相同,直接新建项目
+		if CheckContain(compareProject.Agency, info.Agency) == 3 {
+			continue
+		}
+		//地区(省、市、区)不同,直接新建项目
+		if ComparePlace(compareProject, info) {
+			continue
+		}
+		info.PNBH = 0
+		info.PCBH = 0
+		info.PTCBH = 0
+		compareStr, score := comparePNC(info, compareProject)
+		resVal, pjVal := Select(compareStr, info, compareProject)
+		//---------------------------------------
+		if resVal > 0 {
+			compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount, score2 := p.compareBCTABB(info, compareProject, diffTime, score)
 
-				//项目名称、项目编号、标题项目编号、采购单位、省、市、发布时间、代理机构
-				comStr = compareStr + compareBuyer + compareCity + compareTime + compareAgency + compareBudget + compareBidmount
-				compareProject.comStr = comStr
-				compareProject.pjVal = pjVal
-				compareProject.resVal = resVal
-				//log.Println(compareProject.comStr)
-				eqV := compareResult(resVal, pjVal, score2, comStr, compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount)
-				if eqV == 1 {
-					comRes1 = append(comRes1, compareProject)
-				} else if eqV == 2 {
-					comRes2 = append(comRes2, compareProject)
-				} else if eqV == 3 {
-					comRes3 = append(comRes3, compareProject)
-				}
+			//项目名称、项目编号、标题项目编号、采购单位、省、市、发布时间、代理机构
+			comStr = compareStr + compareBuyer + compareCity + compareTime + compareAgency + compareBudget + compareBidmount
+			compareProject.comStr = comStr
+			compareProject.pjVal = pjVal
+			compareProject.resVal = resVal
+			//log.Println(compareProject.comStr)
+			eqV := compareResult(resVal, pjVal, score2, comStr, compareBuyer, compareCity, compareTime, compareAgency, compareBudget, compareBidmount)
+			if eqV == 1 {
+				comRes1 = append(comRes1, compareProject)
+			} else if eqV == 2 {
+				comRes2 = append(comRes2, compareProject)
+			} else if eqV == 3 {
+				comRes3 = append(comRes3, compareProject)
 			}
 		}
 	}
+	//}
 	//--------------------------------对比完成-----------------------
 
 	//更新数组、更新项目
@@ -604,6 +599,18 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	if len(p1.EntIdList) > 0 {
 		set["entidlist"] = p1.EntIdList
 	}
+	if thisinfo.TopBuyerclass != "" {
+		set["top_buyerclass"] = thisinfo.TopBuyerclass
+		p1.TopBuyerclass = thisinfo.TopBuyerclass
+	}
+	if thisinfo.FirstTag != "" {
+		set["first_tag"] = thisinfo.FirstTag
+		p1.FirstTag = thisinfo.FirstTag
+	}
+	if thisinfo.SecondTag != "" {
+		set["second_tag"] = thisinfo.SecondTag
+		p1.SecondTag = thisinfo.SecondTag
+	}
 	p1.InfoFiled = make(map[string]InfoField)
 	infofield := InfoField{
 		Budget:       thisinfo.Budget,
@@ -851,7 +858,6 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	if (pInfo.Buyer == "" && thisinfo.Buyer != "") || (len([]rune(pInfo.Buyer)) < 5 && len([]rune(thisinfo.Buyer)) > 5) {
 		pInfo.Buyer = thisinfo.Buyer
 		set["buyer"] = thisinfo.Buyer
-
 		pInfo.Buyerclass = thisinfo.Buyerclass
 		set["buyerclass"] = thisinfo.Buyerclass
 	}
@@ -955,29 +961,32 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	if len(thisinfo.Winners) > 0 {
 		if len(pInfo.Winners) <= 0 {
 			set["winner"] = qu.ObjToString(tmp["winner"])
-		}
-		sort.Strings(pInfo.Winners)
-		for _, k := range thisinfo.Winners {
-			if !pInfo.IsOperators {
-				for _, op := range operators {
-					if strings.Contains(k, op) {
-						pInfo.IsOperators = true
-						set["isOperators"] = true
-						break
+			set["s_winner"] = strings.Join(thisinfo.Winners, ",")
+		} else {
+			sort.Strings(pInfo.Winners)
+			for _, k := range thisinfo.Winners {
+				if !pInfo.IsOperators {
+					for _, op := range operators {
+						if strings.Contains(k, op) {
+							pInfo.IsOperators = true
+							set["isOperators"] = true
+							break
+						}
 					}
 				}
-			}
-			if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
-				if BinarySearch(pInfo.Winners, k) != -1 {
-					deleteSlice(pInfo.Winners, k, "")
-				}
-			} else {
-				if BinarySearch(pInfo.Winners, k) == -1 {
-					pInfo.Winners = append(pInfo.Winners, k)
+				if thisinfo.SubType == "流标" || thisinfo.SubType == "废标" {
+					if BinarySearch(pInfo.Winners, k) != -1 {
+						deleteSlice(pInfo.Winners, k, "")
+					}
+				} else {
+					if BinarySearch(pInfo.Winners, k) == -1 {
+						pInfo.Winners = append(pInfo.Winners, k)
+					}
 				}
 			}
+			set["s_winner"] = strings.Join(pInfo.Winners, ",")
 		}
-		set["s_winner"] = strings.Join(pInfo.Winners, ",")
+
 		if tmp["winnertel"] != nil {
 			set["winnertel"] = tmp["winnertel"]
 		}
@@ -999,6 +1008,41 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 			set["tag_rule"] = thisinfo.TagRule
 		}
 	}
+	// top_buyerclass
+	if thisinfo.TopBuyerclass != "" {
+		set["top_buyerclass"] = thisinfo.TopBuyerclass
+		pInfo.TopBuyerclass = thisinfo.TopBuyerclass
+	}
+	if thisinfo.FirstTag != "" {
+		if pInfo.FirstTag != "" {
+			tags := strings.Split(pInfo.FirstTag, ",")
+			for _, s := range strings.Split(thisinfo.FirstTag, ",") {
+				if BinarySearch(tags, s) == -1 {
+					tags = append(tags, s)
+				}
+			}
+			set["first_tag"] = strings.Join(tags, ",")
+			pInfo.FirstTag = strings.Join(tags, ",")
+		} else {
+			set["first_tag"] = thisinfo.FirstTag
+			pInfo.FirstTag = thisinfo.FirstTag
+		}
+	}
+	if thisinfo.SecondTag != "" {
+		if pInfo.SecondTag != "" {
+			tags := strings.Split(pInfo.SecondTag, ",")
+			for _, s := range strings.Split(thisinfo.SecondTag, ",") {
+				if BinarySearch(tags, s) == -1 {
+					tags = append(tags, s)
+				}
+			}
+			set["second_tag"] = strings.Join(tags, ",")
+			pInfo.SecondTag = strings.Join(tags, ",")
+		} else {
+			set["second_tag"] = thisinfo.SecondTag
+			pInfo.SecondTag = thisinfo.SecondTag
+		}
+	}
 	if tmp["buyertagname"] != nil {
 		set["buyertagname"] = tmp["buyertagname"]
 	}

+ 2 - 0
project_data/project_tool.go

@@ -1,5 +1,7 @@
 package main
 
+var DateTimeSelect = []string{"bidopentime", "bidendtime", "signaturedate", "comeintime"}
+
 //项目中的字段
 var FIELDS = []string{
 	"area",

+ 45 - 21
project_data/task.go

@@ -29,6 +29,8 @@ var (
 	titleGetPc  = regexp.MustCompile("^([-0-9a-zA-Z第号采招政询电审竞#]{8,}[-0-9a-zA-Z#]+)")
 	titleGetPc1 = regexp.MustCompile("[\\[【((](.{0,6}(编号|编码|项号|包号|代码|标段?号)[::为])?([-0-9a-zA-Z第号采招政询电审竞#]{5,}([\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+[\\[\\]()()][-0-9a-zA-Z第号采招审竞#]+)?)[\\]】))]")
 	titleGetPc2 = regexp.MustCompile("([-0-9a-zA-Z第号采政招询电审竞#]{8,}[-0-9a-zA-Z#]+)(.{0,5}公告)?$")
+	//对标题、项目名称等中英文符号、空格等进行处理
+	filterReg = regexp.MustCompile("[`~!@#$^&*()=|{}':;,\\[\\].<>/?!¥…()—【】‘;:”“。,、?%+_-]")
 	//项目编号过滤
 	pcReplace = regexp.MustCompile("([\\[【((〖〔《{﹝{](重|第?[二三四再]次.{0,4})[\\]】))〗〕》}﹞}])$|[\\[\\]【】()()〖〗〔〕《》{}﹝﹞-;{}–  ]+|(号|重|第?[二三四五再]次(招标)?)$|[ __]+|((采购)?项目|采购(项目)?)$")
 	//项目编号只是数字或只是字母4个以下
@@ -195,6 +197,23 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 		return nil
 	}
 
+	thisinfo.Bidamount = util.Float64All(tmp["bidamount"])
+	thisinfo.Budget = util.Float64All(tmp["budget"])
+
+	if tmp["pt_modify"] != nil {
+		thisinfo.Publishtime = util.Int64All(tmp["pt_modify"])
+		tmp["publishtime"] = tmp["pt_modify"]
+	}
+	// 处理publishtime为空
+	if thisinfo.Publishtime <= 0 {
+		for _, d := range DateTimeSelect {
+			if tmp[d] != nil {
+				thisinfo.Publishtime = util.Int64All(tmp[d])
+				tmp["publishtime"] = tmp[d]
+			}
+		}
+	}
+
 	if len(thisinfo.Topscopeclass) == 0 {
 		thisinfo.Topscopeclass = []string{}
 	}
@@ -205,6 +224,9 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 		thisinfo.SubType = util.ObjToString(tmp["bidstatus"])
 	}
 
+	// 项目名称、项目标题过滤
+	thisinfo.ProjectName = filterReg.ReplaceAllString(thisinfo.ProjectName, "")
+	thisinfo.ProjectCode = filterReg.ReplaceAllString(thisinfo.ProjectCode, "")
 	//从标题中查找项目编号
 	res := titleGetPc.FindStringSubmatch(thisinfo.Title)
 	if len(res) > 1 && len(res[1]) > 6 && thisinfo.ProjectCode != res[1] && !numCheckPc.MatchString(res[1]) && !_zimureg1.MatchString(res[1]) {
@@ -220,7 +242,6 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 			}
 		}
 	}
-
 	if thisinfo.ProjectName != "" && len([]rune(thisinfo.ProjectName)) > 0 {
 		//thisinfo.ProjectName = pcReplace.ReplaceAllString(thisinfo.ProjectName, "")
 		//if thisinfo.ProjectName != "" {
@@ -257,32 +278,35 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 	if len(thisinfo.ReviewExperts) > 0 {
 		thisinfo.ReviewExperts = ClearRp(thisinfo.ReviewExperts)
 	}
-	//winners整理
-	thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",")
 
 	thisinfo.LenPC = len([]rune(thisinfo.ProjectCode))
 	thisinfo.LenPTC = len([]rune(thisinfo.PTC))
 	thisinfo.LenPN = len([]rune(thisinfo.ProjectName))
 
-	winner := util.ObjToString(tmp["winner"])
-	winners := []string{}
-	m1 := map[string]bool{}
-	if winner != "" {
-		m1[winner] = true
-		winners = append(winners, winner)
-	}
-	packageM, _ := tmp["package"].(map[string]interface{})
-	if packageM != nil {
-		thisinfo.HasPackage = true
-		for _, p := range packageM {
-			pm, _ := p.(map[string]interface{})
-			if pw := util.ObjToString(pm["winner"]); pw != "" && !m1[pw] {
-				m1[pw] = true
-				winners = append(winners, pw)
-			}
-		}
+	//winner := util.ObjToString(tmp["winner"])
+	//winners := []string{}
+	//m1 := map[string]bool{}
+	//if winner != "" {
+	//	m1[winner] = true
+	//	winners = append(winners, winner)
+	//}
+	//packageM, _ := tmp["package"].(map[string]interface{})
+	//if packageM != nil {
+	//	thisinfo.HasPackage = true
+	//	for _, p := range packageM {
+	//		pm, _ := p.(map[string]interface{})
+	//		if pw := util.ObjToString(pm["winner"]); pw != "" && !m1[pw] {
+	//			m1[pw] = true
+	//			winners = append(winners, pw)
+	//		}
+	//	}
+	//}
+
+	//winners整理
+	if util.ObjToString(tmp["s_winner"]) != "" {
+		thisinfo.Winners = strings.Split(util.ObjToString(tmp["s_winner"]), ",")
 	}
-	thisinfo.Winners = winners
+	//thisinfo.Winners = winners
 	//处理分包中数据异常问题
 	for k, tmp := range thisinfo.Package {
 		if ps, ok := tmp.([]map[string]interface{}); ok {

+ 2 - 2
qyxy_data/main.go

@@ -35,7 +35,7 @@ func main() {
 	ch := make(chan bool, 3)
 	wg := &sync.WaitGroup{}
 
-	field := map[string]interface{}{"bid_contracttype": -1, "bid_unittype": -1, "bid_projectname": -1, "bid_purchasing": -1, "bid_area": -1}
+	field := map[string]interface{}{"bid_contracttype": 0, "bid_unittype": 0, "bid_projectname": 0, "bid_purchasing": 0, "bid_area": 0}
 	query := sess.DB("mixdata").C("qyxy_tmp").Find(nil).Select(field).Iter()
 	count, taskcount := 0, 0
 	for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
@@ -50,7 +50,7 @@ func main() {
 				wg.Done()
 			}()
 			name := util.ObjToString(tmp["company_name"])
-			info, _ := Mgo1.FindOne("qyxy_std", map[string]interface{}{"company_name": name})
+			info, _ := Mgo1.FindOneByField("qyxy_std", map[string]interface{}{"company_name": name}, field)
 			if len(*info) > 0 {
 				Mgo1.Save("qyxy_oprd", *info)
 				Mgo1.UpdateById("qyxy_tmp", tmp["_id"], map[string]interface{}{"$set": map[string]interface{}{"exist": true}})

+ 3 - 3
qyxy_es/config.json

@@ -1,14 +1,14 @@
 {
-  "mgodb": "192.168.3.207:27092",
+  "mgodb": "192.168.3.166:27082",
   "dbsize": 12,
-  "dbname": "wjh",
+  "dbname": "guangdongyidong",
   "dbcoll": "oprd_qyxy",
   "uname": "",
   "upwd": "",
   "tasktime": 0,
   "updatetime": 0,
   "elastic": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://192.168.3.241:9205",
     "index": "oprd_qyxy_v1",
     "itype": "qyxy",
     "pool": 12,

+ 423 - 0
qyxy_es/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 4 - 3
qyxy_es/main.go

@@ -3,7 +3,6 @@ package main
 import (
 	"mongodb"
 	qu "qfw/util"
-	es "qfw/util/elastic"
 )
 
 var (
@@ -11,7 +10,7 @@ var (
 	Mgo        *mongodb.MongodbSim
 	Dbname     string
 	Dbcoll     string
-	Es         *es.Elastic
+	Es         *Elastic
 	Index      string
 	Itype      string
 	EsFields   []string
@@ -37,9 +36,11 @@ func init() {
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	Index = econf["index"].(string)
 	Itype = econf["itype"].(string)
-	Es = &es.Elastic{
+	Es = &Elastic{
 		S_esurl: econf["addr"].(string),
 		I_size:  qu.IntAllDef(econf["pool"], 12),
+		//Username: "es_all",
+		//Password: "TopJkO2E_d1x",
 	}
 	Es.InitElasticSize()
 	EsFields = qu.ObjArrToStringArr(econf["esfields"].([]interface{}))

+ 6 - 3
qyxy_es/task.go

@@ -5,6 +5,7 @@ import (
 	"github.com/cron"
 	"go.mongodb.org/mongo-driver/bson"
 	"log"
+	"mongodb"
 	"qfw/util"
 	"strconv"
 	"strings"
@@ -208,7 +209,9 @@ func StdAll() {
 				if tmp[field] == nil {
 					continue
 				}
-				if field == "company_name" {
+				if field == "_id" {
+					esMap[field] = mongodb.BsonIdToSId(tmp[field])
+				} else if field == "company_name" {
 					esMap[field] = tmp["company_name"]
 					esMap["name"] = tmp["company_name"]
 				} else if field == "history_name" {
@@ -349,7 +352,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					Es.BulkSave(Index, Itype, &arru, true)
+					Es.BulkSave(Index, arru)
 				}(arru)
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0
@@ -361,7 +364,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					Es.BulkSave(Index, Itype, &arru, true)
+					Es.BulkSave(Index, arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0

+ 554 - 0
test/main.go

@@ -0,0 +1,554 @@
+// main
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"mongodb"
+	"os"
+	qu "qfw/util"
+	"regexp"
+	"strings"
+	//"time"
+)
+
+var zhb_key_list = []string{"budget", "buyer", "agency", "s_winner", "bidamount", "projectcode", "contractcode"}
+var packreg *regexp.Regexp
+var Mgo *mongodb.MongodbSim
+var listSource []*dataSource
+
+type dataSource struct {
+	_id, id, title                         string
+	projectname, projectcode, contractcode string
+	buyer, agency, s_winner                string
+	budget, bidamount                      float64
+	budget_isnull,bidamount_isnull 		   bool
+	isrepeat                               bool
+	repeat_id_source                       string
+	repeat_id                              map[string]string
+	repeatText                             string
+	publishtime							   int64
+}
+
+
+
+
+//var addr, dbname, table, startTime, endTime, sortType *string
+var addr, dbname, table,  sortType *string
+
+
+func init() {
+	//addr = flag.String("addr", "192.168.3.167:27080", "数据库名称")
+	addr = flag.String("addr", "192.168.3.166:27082", "数据库名称")
+	//addr = flag.String("addr", "127.0.0.1:27167", "数据库名称")
+	//dbname = flag.String("dbname", "qfw", "数据库名称")
+	dbname = flag.String("dbname", "zhaolongyue", "数据库名称")
+	//table = flag.String("table", "0210test", "表名称")
+	table = flag.String("table", "Htgx0425_data", "表名称")
+	sortType = flag.String("sort", "1", "sort--请输入排序方式,1正序、-1倒序")
+	flag.Parse()
+	Mgo = &mongodb.MongodbSim{
+		MongodbAddr: *addr,
+		Size:        3,
+		DbName:      *dbname,
+	}
+	Mgo.InitPool()
+
+	packreg, _ = regexp.Compile(`([a-zA-Z0-9①②ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ一二三四五六七八九十][包标段])`)
+	//packreg, _ = regexp.Compile(`([包标段][::]?[a-zA-Z0-9①②ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ一二三四五六七八九十]|[a-zA-Z0-9①②ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ一二三四五六七八九十][包标段]){1,}`)
+	//packreg, _ = regexp.MustCompile("([a-zA-Z0-9①②ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ一二三四五六七八九十](包|标|段)[::]?)")
+}
+//创建mgo索引
+//func createMgoIndex(){
+//	mongoDBDialInfo := &mgo.DialInfo{
+//		Addrs:    []string{addr},
+//		Timeout:  60 * time.Second,
+//		Database: dbname,
+//	}
+//	session, err := mgo.DialWithInfo(mongoDBDialInfo)
+//	if err != nil {
+//		log.Fatalf("CreateSession failed:%\n", err)
+//	}
+//	coll := session.DB(dbname).C(table)
+//	err = coll.EnsureIndexKey("publishtime")
+//	fmt.Println("创建索引~publishtime",err)
+//
+//	//查询所有的已存在索引
+//	//indexs, err := coll.Indexes()
+//	//fmt.Println("indexs--------------:", indexs)
+//}
+
+func main() {
+	log.Printf("表名:%s,排序方式:%s", *table, *sortType)
+	if *addr == "" || *dbname == "" || *table == "" ||  *sortType == "" {
+		log.Println("参数输入有误")
+		fmt.Printf("数据库地址:%s\n数据库名称:%s\n表名:%s\n排序方式:%s\n", *addr, *dbname, *table, *sortType)
+		os.Exit(0)
+	}
+	//stime, _ := time.Parse(qu.Date_Short_Layout, *startTime)
+	//etime, _ := time.Parse(qu.Date_Short_Layout, *endTime)
+	//query := map[string]interface{}{}
+	//query["$and"] = []interface{}{
+	//	map[string]interface{}{
+	//		"publishtime":map[string]interface{}{
+	//			"$gte":stime.Unix(),
+	//		},
+	//	},
+	//	map[string]interface{}{
+	//		"publishtime":map[string]interface{}{
+	//			"$lte":etime.Unix(),
+	//		},
+	//	},
+	//	//bson.M{"publishtime": bson.M{"$gte": stime.Unix()}},
+	//	//bson.M{"publishtime": bson.M{"$lte": etime.Unix()}},
+	//}
+	sort := "publishtime"
+	if *sortType == "-1" {
+		sort = "-publishtime"
+	}
+	//log.Println(sort)
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+	//it := sess.DB(Mgo.DbName).C(*table).Find(query).Sort(sort).Iter()
+	it := sess.DB(Mgo.DbName).C(*table).Find(nil).Sort(sort).Iter()
+	//对标题、项目名称等中英文符号、空格等进行处理
+	var filterReg = regexp.MustCompile("[`~!@#$^&*()=|{}':;,\\[\\].<>/?!¥…()—【】‘;:”“。,、?%+_-]")
+	//var filterReg = regexp.MustCompile("[`~!@#$^&*()=|{}':;,\\[\\].<>/?!¥…()—【】‘;:”“。,、?%+_--]")
+	index := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		d := &dataSource{
+			_id:          mongodb.BsonIdToSId(tmp["_id"]),
+			id:           qu.ObjToString(tmp["id"]),
+			title:        filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["title"])), ""),
+			projectname:  filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["projectname"])), ""),
+			projectcode:  filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["projectcode"])), ""),
+			contractcode: filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["contractcode"])), ""),
+			buyer:        filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["buyer"])), ""),
+			agency:       filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["agency"])), ""),
+			s_winner:     filterReg.ReplaceAllString(strings.ToLower(qu.ObjToString(tmp["s_winner"])), ""),
+			budget:       qu.Float64All(tmp["budget"]),
+			bidamount:    qu.Float64All(tmp["bidamount"]),
+			publishtime:  qu.Int64All(tmp["publishtime"]),
+			repeat_id:    map[string]string{},
+		}
+		if tmp["budget"]==nil{
+			d.budget_isnull=true
+		}
+		if tmp["bidamount"]==nil{
+			d.bidamount_isnull=true
+		}
+		//log.Println(tmp["_id"],tmp["title"],tmp["projectname"])
+		if index%10000 == 0 {
+			log.Println("加载数据:", index)
+		}
+		listSource = append(listSource, d)
+		tmp = map[string]interface{}{}
+	}
+	log.Println("数据加载完成",len(listSource))
+	dataItem()
+	dd := 0
+	for i := 0; i < len(listSource); i++ {
+		a := listSource[i]
+		if a.isrepeat {
+			dd++
+		}
+		//更新数据
+		if len(a.repeat_id) ==0{
+			Mgo.UpdateById(*table, a._id,
+				map[string]interface{}{"$set": map[string]interface{}{
+					//重复数据看repeatid
+					"repeatid":     a.repeat_id_source, //和哪条数据重复id
+					"repeat":       a.isrepeat,         //本条数据是否重复数据
+					"repeattext":   a.repeatText,       //本数据被判重的原因
+				}})
+		}else {
+			if len(a.repeat_id) > 0{
+				arr:=[]string{}
+				for k,_:=range a.repeat_id{
+					arr = append(arr,k)
+				}
+			Mgo.UpdateById(*table, a._id,
+				map[string]interface{}{"$set": map[string]interface{}{
+					//原始数据看repeatid_ids_str
+					"repeatid":     a.repeat_id_source, //和哪条数据重复id
+					"repeat":       a.isrepeat,         //本条数据是否重复数据
+					//"repeatid_ids": a.repeat_id,        //和我重复的数据都有哪些
+					"repeatid_ids_str": strings.Join(arr,","),
+					"repeattext":   a.repeatText,       //本数据被判重的原因
+				}})}
+		}
+		if i%1000 == 0 {
+			log.Println("已更新:", i)
+		}
+	}
+	log.Println("重复数据量:",dd)
+}
+
+var listSize = 20000
+
+func dataItem() {
+	for i := 0; i < len(listSource); i++ {
+		a := listSource[i]
+		// if a.isrepeat {
+		// 	continue
+		// }
+		b := &dataSource{}
+		for j := i + 1; j < len(listSource); j++ {
+			b = listSource[j]
+			if *sortType == "1" {
+				if publishtime_b_a(*a,*b){
+					// if b.isrepeat {
+					// 	continue
+					// }
+					a, b = panchong(*a, *b)
+					listSource[j] = b
+					listSource[i] = a
+					// if b.isrepeat {
+					// 	log.Println("sss", a.id, b.isrepeat, b.repeat_id)
+					// }
+				}
+			}else{
+				if publishtime_a_b(*a,*b){
+					// if b.isrepeat {
+					// 	continue
+					// }
+					a, b = panchong(*a, *b)
+					listSource[j] = b
+					listSource[i] = a
+					// if b.isrepeat {
+					// 	log.Println("sss", a.id, b.isrepeat, b.repeat_id)
+					// }
+				}
+			}
+		}
+		if i%500 == 0 {
+			log.Println("已处理:", i)
+		}
+	}
+}
+
+func panchong(a, b dataSource) (c, d *dataSource) {
+	switch {
+	case a.title == b.title: //标题相等
+		if pankong(a.contractcode) && pankong(b.contractcode) && a.contractcode != b.contractcode {
+
+		} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+			if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+				if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+					b.repeat_id_source = a.id
+					a.repeat_id[b.id] = ""
+					b.isrepeat = true
+					b.repeatText = "标题相等 && bidamount && buyer && s_winner"
+				}else{
+					r := key_list(a, b)
+					if r {
+						b.repeat_id_source = a.id
+						a.repeat_id[b.id] = ""
+						b.isrepeat = true
+						b.repeatText = "标题相等 && bidamount && buyer && key_list"
+					}
+				}
+			} else if  pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+				b.repeat_id_source = a.id
+				a.repeat_id[b.id] = ""
+				b.isrepeat = true
+				b.repeatText = "标题相等 && bidamount && s_winner"
+			}else {
+				r := key_list(a, b)
+				if r {
+					b.repeat_id_source = a.id
+					a.repeat_id[b.id] = ""
+					b.isrepeat = true
+					b.repeatText = "标题相等 && bidamount && key_list"
+				}
+			}
+		}else if pankong(a.projectcode) && pankong(b.projectcode) && a.projectcode == b.projectcode {
+			r := key_list(a, b)
+			if r {
+				b.repeat_id_source = a.id
+				a.repeat_id[b.id] = ""
+				b.isrepeat = true
+				b.repeatText = "标题相等 && projectcode && key_list"
+			}
+		}else if !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+			if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+				if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+					if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+						b.repeat_id_source = a.id
+						a.repeat_id[b.id] = ""
+						b.isrepeat = true
+						b.repeatText = "标题相等 && budget && buyer && s_winner && bidamount"
+						//log.Println("1111", a.id, b.id, b.isrepeat)
+					}
+				}
+			} else {
+				r := key_list(a, b)
+				if r {
+					b.repeat_id_source = a.id
+					a.repeat_id[b.id] = ""
+					b.isrepeat = true
+					b.repeatText = "标题相等 && budget && key_list"
+				}
+			}
+		}   else {
+			//
+		}
+	case a.title != b.title: //标题不相等
+		//项目名称包含及相等
+		if strings.Contains(a.projectname, b.projectname) || strings.Contains(b.projectname, a.projectname) {
+			isp := packreg.MatchString(a.title)
+			//有分包
+			if isp {
+				//项目名称相等
+				if a.projectname == b.projectname {
+					if pankong(a.contractcode) && pankong(b.contractcode) && a.contractcode != b.contractcode {
+						//
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+						if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner != b.s_winner{
+
+						}else{
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->有分包 && projectname && bidamount"
+						}
+						//b.repeat_id_source = a.id
+						//a.repeat_id[b.id] = ""
+						//b.isrepeat = true
+						//b.repeatText = "标题不相等-->有分包 && projectname && bidamount"
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount != b.bidamount  {
+						//
+					} else {
+						if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner && !a.budget_isnull && !b.budget_isnull && a.budget == b.budget && (a.budget >=0 || b.budget >= 0) {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->有分包 && projectname && s_winner && budget"
+						}
+					}
+				} else { //项目名称包含
+					if pankong(a.contractcode) && pankong(b.contractcode) && a.contractcode != b.contractcode {
+						//
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+						if pankong(a.projectcode) && pankong(b.projectcode) && a.projectcode == b.projectcode {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->有分包 && projectname包含 && bidamount && projectcode"
+						} else if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->有分包 && projectname包含 && bidamount && s_winner"
+						} else if !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+							if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->有分包 && projectname包含 && bidamount && budget && buyer"
+							} else if strings.Contains(a.agency, b.agency) || strings.Contains(b.agency, a.agency)  && pankong(a.agency) && pankong(b.agency) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->有分包 && projectname包含 && bidamount && budget && agency"
+							} else {
+								//
+							}
+						}
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount != b.bidamount {
+						//
+					} else {
+						if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner && !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->有分包 && projectname包含 && s_winner && budget"
+						} else {
+							//
+						}
+					}
+				}
+			} else { //无分包
+				//项目名称相等
+				if a.projectname == b.projectname {
+					if pankong(a.contractcode) && pankong(b.contractcode) && a.contractcode != b.contractcode {
+						//
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+						b.repeat_id_source = a.id
+						a.repeat_id[b.id] = ""
+						b.isrepeat = true
+						b.repeatText = "标题不相等-->无分包 && projectname && bidamount"
+					} else if !a.bidamount_isnull && !b.bidamount_isnull &&  a.bidamount != b.bidamount  {
+						//
+					} else {
+						if pankong(a.projectcode) && pankong(b.projectcode) && a.projectcode == b.projectcode {
+							if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner != b.s_winner {
+
+							}else if !a.budget_isnull && !b.budget_isnull && a.budget != b.budget{
+
+							}else{
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->无分包 && projectname && projectcode"
+							}
+							//if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+							//	b.repeat_id_source = a.id
+							//	a.repeat_id[b.id] = ""
+							//	b.isrepeat = true
+							//	b.repeatText = "标题不相等-->无分包 && projectname && projectcode && s_winner"
+							//} else if !a.budget_isnull && !b.budget_isnull && a.budget == b.budget {
+							//	b.repeat_id_source = a.id
+							//	a.repeat_id[b.id] = ""
+							//	b.isrepeat = true
+							//	b.repeatText = "标题不相等-->无分包 &&  projectname && projectcode && budget"
+							//}
+						} else if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->无分包 && projectname &&  s_winner"
+							//r := key_list(a, b)
+							//if r {
+							//	b.repeat_id_source = a.id
+							//	a.repeat_id[b.id] = ""
+							//	b.isrepeat = true
+							//	b.repeatText = "标题不相等-->无分包 && projectname && s_winner && key_list"
+							//}
+						} else if !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+							if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->无分包 && projectname && budget && buyer"
+							} else if strings.Contains(a.agency, b.agency) || strings.Contains(b.agency, a.agency)  && pankong(a.agency) && pankong(b.agency) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->无分包 && projectname && budget && agency"
+							} else {
+								//
+							}
+						}
+					}
+				} else { //项目名称包含
+					if pankong(a.contractcode) && pankong(b.contractcode) && a.contractcode != b.contractcode {
+						//
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount == b.bidamount  {
+						if pankong(a.projectcode) && pankong(b.projectcode) && a.projectcode == b.projectcode {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->无分包 && projectname包含 && bidamount && projectcode"
+						} else if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->无分包 && projectname包含 && bidamount && s_winner"
+						} else if !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+							if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->无分包 && projectname包含 && budget && buyer"
+							} else if strings.Contains(a.agency, b.agency) || strings.Contains(b.agency, a.agency)  && pankong(a.agency) && pankong(b.agency) {
+								b.repeat_id_source = a.id
+								a.repeat_id[b.id] = ""
+								b.isrepeat = true
+								b.repeatText = "标题不相等-->无分包 && projectname包含 && budget && agency"
+							} else {
+								//
+							}
+						} else {
+							//
+						}
+					} else if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount != b.bidamount  {
+						//
+					} else {
+						if pankong(a.s_winner) && pankong(b.s_winner) && a.s_winner == b.s_winner && !a.budget_isnull && !b.budget_isnull && a.budget == b.budget  {
+							b.repeat_id_source = a.id
+							a.repeat_id[b.id] = ""
+							b.isrepeat = true
+							b.repeatText = "标题不相等-->无分包 && projectname包含 && s_winner && budget"
+						}
+					}
+				}
+			}
+		}
+	default:
+	}
+	return &a, &b
+}
+
+//zhb_key_list 判断
+//"budget", "buyer", "agency", "s_winner", "bidamount", "projectcode", "contractcode"
+func key_list(a, b dataSource) bool {
+	for i := 0; i < len(zhb_key_list); i++ {
+		key := zhb_key_list[i]
+		switch key {
+		case "budget":
+			if !a.budget_isnull && !b.budget_isnull && a.budget != b.budget  {
+				return false
+			} else {
+				continue
+			}
+		case "buyer":
+			if strings.Contains(a.buyer, b.buyer) || strings.Contains(b.buyer, a.buyer)  && pankong(a.buyer) && pankong(b.buyer) {
+				continue
+			} else {
+				return false
+			}
+		case "agency":
+			if strings.Contains(a.agency, b.agency) || strings.Contains(b.agency, a.agency)  && pankong(a.agency) && pankong(b.agency) {
+				continue
+			} else {
+				return false
+			}
+		case "s_winner":
+			if a.s_winner != b.s_winner && pankong(a.s_winner) && pankong(b.s_winner) {
+				return false
+			} else {
+				continue
+			}
+		case "bidamount":
+			if !a.bidamount_isnull && !b.bidamount_isnull && a.bidamount != b.bidamount   {
+				return false
+			} else {
+				continue
+			}
+		case "projectcode":
+			if a.projectcode != b.projectcode && pankong(a.projectcode) && pankong(b.projectcode) {
+				return false
+			} else {
+				continue
+			}
+		case "contractcode":
+			if a.contractcode != b.contractcode && pankong(a.contractcode) && pankong(b.contractcode) {
+				return false
+			} else {
+				continue
+			}
+		}
+	}
+	return true
+}
+//发布时间判断
+//正序
+func publishtime_b_a(a,b dataSource) bool{
+	return b.publishtime-a.publishtime < 86400 * 31 * 12
+}
+//倒序
+func publishtime_a_b(a,b dataSource) bool {
+	return a.publishtime-b.publishtime < 86400 * 31 * 12
+}
+
+
+//
+func pankong(a string) bool {
+	if a != "" {
+		return true
+	} else {
+		return false
+	}
+}