Jianghan hai 1 ano
pai
achega
c03a9478d0

+ 98 - 101
data_monitoring/get_assign/src/main.go

@@ -11,20 +11,20 @@ import (
 )
 
 var (
-	sysconfig    				map[string]interface{} //配置文件
-	save_mgo        			*MongodbSim            //mongodb操作对象
-	save_c_name					string
+	sysconfig   map[string]interface{} //配置文件
+	save_mgo    *MongodbSim            //mongodb操作对象
+	save_c_name string
 )
-var Url = "https://www.jianyu360.com/article/content/%s.html"
+var Url = "https://www.jianyu360.cn/article/content/%s.html"
 
-func initMgo()  {
+func initMgo() {
 	saveconf := sysconfig["save_mgodb"].(map[string]interface{})
 	save_c_name = qu.ObjToString(saveconf["coll"])
 	save_mgo = &MongodbSim{
 		MongodbAddr: saveconf["addr"].(string),
 		DbName:      saveconf["db"].(string),
 		Size:        qu.IntAllDef(saveconf["pool"], 5),
-		Password: 	 "zk@123123",
+		Password:    "zk@123123",
 		UserName:    "zhengkun",
 	}
 	save_mgo.InitPool()
@@ -36,11 +36,11 @@ func init() {
 	initMgo()
 }
 
-func main()  {
+func main() {
 	//定时任务
 	c := cron.New()
 	c.AddFunc("0 30 7 ? * *", func() {
-		exportSpecSiteDataWeek() //周邮件-每周1点    8点
+		exportSpecSiteDataWeek()  //周邮件-每周1点    8点
 		exportSpecSiteDataMonth() //月邮件-每月1号   8点
 	})
 	c.Start()
@@ -49,86 +49,84 @@ func main()  {
 	exportSpecSiteDataWeek()
 	exportSpecSiteDataMonth()
 
-
-	time.Sleep(99999*time.Hour)
+	time.Sleep(99999 * time.Hour)
 }
 
 //处理数据-周邮件
 func exportSpecSiteDataWeek() {
 	cur_time := time.Now().Unix()
 	today := GetOneWeekDay(TimeStampToString(cur_time))
-	if today!=1 {
+	if today != 1 {
 		return
 	}
 	log.Println("每周一:准备邮件数据...")
-	now:=time.Now()
-	durdays:=7*2
-	start:= time.Date(now.Year(), now.Month(), now.Day()-durdays, 0, 0, 0, 0, time.Local).Unix()
+	now := time.Now()
+	durdays := 7 * 2
+	start := time.Date(now.Year(), now.Month(), now.Day()-durdays, 0, 0, 0, 0, time.Local).Unix()
 	end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
 
-	log.Println(start,end)
+	log.Println(start, end)
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$gte":  start,
-			"$lt": end,
+			"$gte": start,
+			"$lt":  end,
 		},
 	}
 	sess := save_mgo.GetMgoConn()
 	defer save_mgo.DestoryMongoConn(sess)
-	log.Println("bidding 查询条件:",q)
+	log.Println("bidding 查询条件:", q)
 	//省份、公告标题、公告类别、发布时间、公告地址、项目名称、来源网站
 	it_site := sess.DB(save_mgo.DbName).C(save_c_name).Find(&q).Sort("comeintime").Select(map[string]interface{}{
-		"area":1,
-		"title":1,
-		"subtype":1,
-		"publishtime":1,
-		"href":1,
-		"projectname":1,
-		"site":1,
+		"area":        1,
+		"title":       1,
+		"subtype":     1,
+		"publishtime": 1,
+		"href":        1,
+		"projectname": 1,
+		"site":        1,
 	}).Iter()
 	timeLayout := "2006-01-02"
-	total,isok,dataArr:= 0,0,make([]map[string]string,0)
+	total, isok, dataArr := 0, 0, make([]map[string]string, 0)
 	for tmp := make(map[string]interface{}); it_site.Next(&tmp); total++ {
 		if total%10000 == 0 {
-			log.Println("cur index :", total,isok)
+			log.Println("cur index :", total, isok)
 		}
-		site:=qu.ObjToString(tmp["site"])
-		area:=qu.ObjToString(tmp["area"])
-		subtype:=qu.ObjToString(tmp["subtype"])
-		projectname:=qu.ObjToString(tmp["projectname"])
-		publishtime:=qu.Int64All(tmp["publishtime"])
-		new_publishtime := ""//转日期
-		if publishtime>0 {
+		site := qu.ObjToString(tmp["site"])
+		area := qu.ObjToString(tmp["area"])
+		subtype := qu.ObjToString(tmp["subtype"])
+		projectname := qu.ObjToString(tmp["projectname"])
+		publishtime := qu.Int64All(tmp["publishtime"])
+		new_publishtime := "" //转日期
+		if publishtime > 0 {
 			new_publishtime = time.Unix(publishtime, 0).Format(timeLayout)
 		}
-		href:=qu.ObjToString(tmp["href"])
-		title:=qu.ObjToString(tmp["title"])
+		href := qu.ObjToString(tmp["href"])
+		title := qu.ObjToString(tmp["title"])
 
-		if (site=="上海政府采购网" || site=="中国政府采购网") && area=="上海" {
+		if (site == "上海政府采购网" || site == "中国政府采购网") && area == "上海" {
 			isok++
 			dataArr = append(dataArr, map[string]string{
-				"area":area,
-				"subtype":subtype,
-				"title":title,
-				"projectname":projectname,
-				"site":site,
-				"publishtime":new_publishtime,
-				"href":href,
-				"jyhref":fmt.Sprintf(Url, qu.CommonEncodeArticle("content", BsonTOStringId(tmp["_id"]))),
+				"area":        area,
+				"subtype":     subtype,
+				"title":       title,
+				"projectname": projectname,
+				"site":        site,
+				"publishtime": new_publishtime,
+				"href":        href,
+				"jyhref":      fmt.Sprintf(Url, qu.CommonEncodeArticle("content", BsonTOStringId(tmp["_id"]))),
 			})
 		}
 
 		tmp = make(map[string]interface{})
 	}
-	log.Println("is site over :",total,isok)
-
+	log.Println("is site over :", total, isok)
 
 	start_str := time.Unix(start, 0).Format(timeLayout)
 	end_str := time.Unix(end-1, 0).Format(timeLayout)
-	xlsxName := "上海通服:"+start_str+"~"+end_str+".xlsx"
-	log.Println("邮件名:",xlsxName)
+	xlsxName := "上海通服:" + start_str + "~" + end_str + ".xlsx"
+	log.Println("邮件名:", xlsxName)
 	os.Remove(xlsxName)
-	f :=xlsx.NewFile()
+	f := xlsx.NewFile()
 	sheet, _ := f.AddSheet("上海数据")
 	row := sheet.AddRow()
 	//省份、公告标题、公告类别、发布时间、公告地址、项目名称、来源网站
@@ -141,7 +139,7 @@ func exportSpecSiteDataWeek() {
 	row.AddCell().Value = "公告地址"
 	row.AddCell().Value = "剑鱼地址"
 
-	for _,tmp:=range dataArr {
+	for _, tmp := range dataArr {
 		row = sheet.AddRow()
 		row.AddCell().SetString(tmp["area"])
 		row.AddCell().SetString(tmp["subtype"])
@@ -156,11 +154,11 @@ func exportSpecSiteDataWeek() {
 
 	if err != nil {
 		log.Println("保存xlsx失败:", err)
-		sendWarningSmtp("保存xlsx异常","请检查...上海通服...")
-	}else {
+		sendWarningSmtp("保存xlsx异常", "请检查...上海通服...")
+	} else {
 		log.Println("保存xlsx成功:", err)
-		body_str := fmt.Sprintf("日期:%s~%s\t\t总计:%d条",start_str,end_str,isok)
-		sendErrMailSmtp("上海通服数据~周",body_str,xlsxName)
+		body_str := fmt.Sprintf("日期:%s~%s\t\t总计:%d条", start_str, end_str, isok)
+		sendErrMailSmtp("上海通服数据~周", body_str, xlsxName)
 	}
 }
 
@@ -168,79 +166,78 @@ func exportSpecSiteDataWeek() {
 func exportSpecSiteDataMonth() {
 	cur_time := time.Now().Unix()
 	cur_day := time.Now().Day()
-	if cur_day!=1 {
+	if cur_day != 1 {
 		return
 	}
 	today := GetOneWeekDay(TimeStampToString(cur_time))
 	log.Println("每月1号:准备邮件数据...")
-	now:=time.Now()
-	durdays:=int64(7)+today-int64(1)
-	start:= time.Date(now.Year(), now.Month(), now.Day()-int(durdays), 0, 0, 0, 0, time.Local).Unix()
+	now := time.Now()
+	durdays := int64(7) + today - int64(1)
+	start := time.Date(now.Year(), now.Month(), now.Day()-int(durdays), 0, 0, 0, 0, time.Local).Unix()
 	end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
 
-	log.Println(start,end)
+	log.Println(start, end)
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
-			"$gte":  start,
-			"$lt": end,
+			"$gte": start,
+			"$lt":  end,
 		},
 	}
 	sess := save_mgo.GetMgoConn()
 	defer save_mgo.DestoryMongoConn(sess)
-	log.Println("bidding 查询条件:",q)
+	log.Println("bidding 查询条件:", q)
 	//省份、公告标题、公告类别、发布时间、公告地址、项目名称、来源网站
 	it_site := sess.DB(save_mgo.DbName).C(save_c_name).Find(&q).Sort("comeintime").Select(map[string]interface{}{
-		"area":1,
-		"title":1,
-		"subtype":1,
-		"publishtime":1,
-		"href":1,
-		"projectname":1,
-		"site":1,
+		"area":        1,
+		"title":       1,
+		"subtype":     1,
+		"publishtime": 1,
+		"href":        1,
+		"projectname": 1,
+		"site":        1,
 	}).Iter()
 	timeLayout := "2006-01-02"
-	total,isok,dataArr:= 0,0,make([]map[string]string,0)
+	total, isok, dataArr := 0, 0, make([]map[string]string, 0)
 	for tmp := make(map[string]interface{}); it_site.Next(&tmp); total++ {
 		if total%10000 == 0 {
-			log.Println("cur index :", total,isok)
+			log.Println("cur index :", total, isok)
 		}
-		site:=qu.ObjToString(tmp["site"])
-		area:=qu.ObjToString(tmp["area"])
-		subtype:=qu.ObjToString(tmp["subtype"])
-		projectname:=qu.ObjToString(tmp["projectname"])
-		publishtime:=qu.Int64All(tmp["publishtime"])
-		new_publishtime := ""//转日期
-		if publishtime>0 {
+		site := qu.ObjToString(tmp["site"])
+		area := qu.ObjToString(tmp["area"])
+		subtype := qu.ObjToString(tmp["subtype"])
+		projectname := qu.ObjToString(tmp["projectname"])
+		publishtime := qu.Int64All(tmp["publishtime"])
+		new_publishtime := "" //转日期
+		if publishtime > 0 {
 			new_publishtime = time.Unix(publishtime, 0).Format(timeLayout)
 		}
-		href:=qu.ObjToString(tmp["href"])
-		title:=qu.ObjToString(tmp["title"])
+		href := qu.ObjToString(tmp["href"])
+		title := qu.ObjToString(tmp["title"])
 
-		if (site=="上海政府采购网" || site=="中国政府采购网") && area=="上海" {
+		if (site == "上海政府采购网" || site == "中国政府采购网") && area == "上海" {
 			isok++
 			dataArr = append(dataArr, map[string]string{
-				"area":area,
-				"subtype":subtype,
-				"title":title,
-				"projectname":projectname,
-				"site":site,
-				"publishtime":new_publishtime,
-				"href":href,
-				"jyhref":fmt.Sprintf(Url, qu.CommonEncodeArticle("content", BsonTOStringId(tmp["_id"]))),
+				"area":        area,
+				"subtype":     subtype,
+				"title":       title,
+				"projectname": projectname,
+				"site":        site,
+				"publishtime": new_publishtime,
+				"href":        href,
+				"jyhref":      fmt.Sprintf(Url, qu.CommonEncodeArticle("content", BsonTOStringId(tmp["_id"]))),
 			})
 		}
 
 		tmp = make(map[string]interface{})
 	}
-	log.Println("is site over :",total,isok)
-
+	log.Println("is site over :", total, isok)
 
 	start_str := time.Unix(start, 0).Format(timeLayout)
 	end_str := time.Unix(end-1, 0).Format(timeLayout)
-	xlsxName := "上海通服:"+start_str+"~"+end_str+".xlsx"
-	log.Println("邮件名:",xlsxName)
+	xlsxName := "上海通服:" + start_str + "~" + end_str + ".xlsx"
+	log.Println("邮件名:", xlsxName)
 	os.Remove(xlsxName)
-	f :=xlsx.NewFile()
+	f := xlsx.NewFile()
 	sheet, _ := f.AddSheet("上海数据")
 	row := sheet.AddRow()
 	//省份、公告标题、公告类别、发布时间、公告地址、项目名称、来源网站
@@ -253,7 +250,7 @@ func exportSpecSiteDataMonth() {
 	row.AddCell().Value = "公告地址"
 	row.AddCell().Value = "剑鱼地址"
 
-	for _,tmp:=range dataArr {
+	for _, tmp := range dataArr {
 		row = sheet.AddRow()
 		row.AddCell().SetString(tmp["area"])
 		row.AddCell().SetString(tmp["subtype"])
@@ -267,10 +264,10 @@ func exportSpecSiteDataMonth() {
 	err := f.Save(xlsxName)
 	if err != nil {
 		log.Println("保存xlsx失败:", err)
-		sendWarningSmtp("保存xlsx异常","请检查...上海通服...")
-	}else {
+		sendWarningSmtp("保存xlsx异常", "请检查...上海通服...")
+	} else {
 		log.Println("保存xlsx成功:", err)
-		body_str := fmt.Sprintf("日期:%s~%s\t\t总计:%d条",start_str,end_str,isok)
-		sendErrMailSmtp("上海通服数据~月)",body_str,xlsxName)
+		body_str := fmt.Sprintf("日期:%s~%s\t\t总计:%d条", start_str, end_str, isok)
+		sendErrMailSmtp("上海通服数据~月)", body_str, xlsxName)
 	}
-}
+}

+ 4 - 1
esmgocount/src/main.go

@@ -148,5 +148,8 @@ func (t *T) task() {
 }
 
 func (t *T) SendMail(report string) {
-	http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report))
+	resp, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report))
+	if err != nil {
+		util.Debug(resp.Status, err)
+	}
 }

+ 8 - 6
forecast/es/config.json

@@ -1,21 +1,23 @@
 {
-  "mgodb": "192.168.3.207:27092",
+  "mgodb": "192.168.3.206:27002",
   "dbsize": 12,
-  "dbname": "wjh",
+  "dbname": "mixdata",
   "dbcoll": "project_forecast",
-  "uname": "",
-  "upwd": "",
+  "uname": "root",
+  "upwd": "root",
   "tasktime": 0,
   "updateid": "",
   "elastic": {
-    "addr": "http://127.0.0.1:9800",
-    "index": "forecast_v2",
+    "addr": "http://192.168.3.241:9205",
+    "index": "forecast_v1",
     "itype": "forecast",
     "pool": 12,
     "esfields": [
       "_id",
+      "infoid",
       "area",
       "city",
+      "district",
       "nature",
       "buyer",
       "buyerclass",

+ 423 - 0
forecast/es/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 8 - 11
forecast/es/main.go

@@ -3,7 +3,6 @@ package main
 import (
 	"mongodb"
 	qu "qfw/util"
-	es "qfw/util/elastic"
 )
 
 var (
@@ -11,7 +10,7 @@ var (
 	Mgo       *mongodb.MongodbSim
 	Dbname    string
 	Dbcoll    string
-	Es1, Es2  *es.Elastic
+	Es1       *Elastic
 	Index     string
 	Itype     string
 	EsFields  []string
@@ -37,16 +36,14 @@ func init() {
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	Index = econf["index"].(string)
 	Itype = econf["itype"].(string)
-	//Es1 = &es.Elastic{
-	//	S_esurl: econf["addr"].(string),
-	//	I_size:  qu.IntAllDef(econf["pool"], 12),
-	//}
-	//Es1.InitElasticSize()
-	Es2 = &es.Elastic{
-		S_esurl: econf["addr1"].(string),
-		I_size:  qu.IntAllDef(econf["pool"], 12),
+	Es1 = &Elastic{
+		S_esurl:  econf["addr"].(string),
+		I_size:   qu.IntAllDef(econf["pool"], 12),
+		Username: "es_all",
+		Password: "TopJkO2E_d1x",
 	}
-	Es2.InitElasticSize()
+	Es1.InitElasticSize()
+
 	EsFields = qu.ObjArrToStringArr(econf["esfields"].([]interface{}))
 	//TaskTime = qu.IntAll(Sysconfig["tasktime"])
 	UpdateId = qu.ObjToString(Sysconfig["updateid"])

+ 16 - 11
forecast/es/task.go

@@ -11,7 +11,7 @@ import (
 	"time"
 )
 
-//定时任务
+// 定时任务
 func TimeTask() {
 	//go SaveAdd()
 	c := cron.New()
@@ -33,10 +33,11 @@ func SaveAdd() {
 
 	pool := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
-	if UpdateId == "" {
-		util.Debug("update id err...")
-		return
-	}
+	//if UpdateId == "" {
+	//	util.Debug("update id err...")
+	//	return
+	//}
+	//q := bson.M{"_id": mongodb.StringTOBsonId("647e30b3eabd4b61e72b3f8f")}
 	q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(UpdateId)}}
 	util.Debug("q ---", q)
 	it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
@@ -61,7 +62,10 @@ func SaveAdd() {
 				if tmp[field] == nil {
 					continue
 				}
-				if field == "buyerclass" {
+				if field == "_id" {
+					esMap["_id"] = mongodb.BsonIdToSId(tmp["_id"])
+					esMap["id"] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if field == "buyerclass" {
 					if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
 						esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
 					} else {
@@ -125,7 +129,10 @@ func SaveAll() {
 				if tmp[field] == nil {
 					continue
 				}
-				if field == "buyerclass" {
+				if field == "_id" {
+					esMap["_id"] = mongodb.BsonIdToSId(tmp["_id"])
+					esMap["id"] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if field == "buyerclass" {
 					if reflect.TypeOf(tmp["buyerclass"]).String() == "[]interface {}" {
 						esMap["buyerclass"] = util.ObjArrToStringArr(tmp["buyerclass"].([]interface{}))[0]
 					} else {
@@ -177,8 +184,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					//Es1.BulkSave(Index, Itype, &arru, true)
-					Es2.BulkSave(Index, Itype, &arru, true)
+					Es1.BulkSave(Index, arru)
 				}(arru)
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0
@@ -190,8 +196,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					//Es1.BulkSave(Index, Itype, &arru, true)
-					Es2.BulkSave(Index, Itype, &arru, true)
+					Es1.BulkSave(Index, arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0

+ 423 - 0
forecast/project-yece/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 24 - 25
forecast/project-yece/main.go

@@ -2,39 +2,39 @@ package main
 
 import (
 	"mongodb"
-	es "qfw/util/elastic"
 	"time"
 )
 
 var (
-	MongoTool			*mongodb.MongodbSim
-	Es              	*es.Elastic
-
-	updatePool			chan []map[string]interface{}
-	updateSp			chan bool
-	updatePool1			chan []map[string]interface{}
-	updateSp1			chan bool
-	saveSize			int
-	savePool			chan map[string]interface{}
-	saveSp				chan bool
-	savePool1			chan map[string]interface{}
-	saveSp1				chan bool
+	MongoTool *mongodb.MongodbSim
+	Es        *Elastic
 
+	updatePool  chan []map[string]interface{}
+	updateSp    chan bool
+	updatePool1 chan []map[string]interface{}
+	updateSp1   chan bool
+	saveSize    int
+	savePool    chan map[string]interface{}
+	saveSp      chan bool
+	savePool1   chan map[string]interface{}
+	saveSp1     chan bool
 )
 
 func init() {
 	MongoTool = &mongodb.MongodbSim{
-		MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",				// 172.17.4.187:27082,172.17.145.163:27083
+		MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083", // 172.17.4.187:27082,172.17.145.163:27083
 		Size:        10,
 		DbName:      "mixdata",
-		UserName:	 "SJZY_RWESBid_Other",
-		Password: 	 "SJZY@O17t8herB3B",
+		UserName:    "SJZY_RWESBid_Other",
+		Password:    "SJZY@O17t8herB3B",
 	}
 	MongoTool.InitPool()
 
-	Es = &es.Elastic{
-		S_esurl: "http://172.17.145.170:9800",									//http://172.17.145.170:9800
-		I_size:  10,
+	Es = &Elastic{
+		S_esurl:  "http://172.17.145.164:19805", //http://172.17.4.184:19800
+		I_size:   10,
+		Username: "es_all",
+		Password: "TopJkO2E_d1x",
 	}
 	Es.InitElasticSize()
 
@@ -56,14 +56,13 @@ func main() {
 	//go updateMethod1()
 
 	//go findEs()
-	//go fcResult()
-	go TimeTask()
+	go fcResult()
+	//go TimeTask()
 
 	ch := make(chan bool, 1)
 	<-ch
 }
 
-
 func saveMethod() {
 	arru := make([]map[string]interface{}, saveSize)
 	indexu := 0
@@ -113,7 +112,7 @@ func saveMethod1() {
 					defer func() {
 						<-saveSp1
 					}()
-					MongoTool.SaveBulk("project_forecast", arru...)
+					MongoTool.SaveBulk("project_forecast_1", arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -125,7 +124,7 @@ func saveMethod1() {
 					defer func() {
 						<-saveSp1
 					}()
-					MongoTool.SaveBulk("project_forecast", arru...)
+					MongoTool.SaveBulk("project_forecast_1", arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -202,4 +201,4 @@ func updateMethod1() {
 			}
 		}
 	}
-}
+}

+ 20 - 17
forecast/project-yece/task.go

@@ -1,11 +1,12 @@
 package main
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/cron"
 	"go.mongodb.org/mongo-driver/bson"
-	es "gopkg.in/olivere/elastic.v1"
+	es "gopkg.in/olivere/elastic.v7"
 	"mongodb"
 	"qfw/util"
 	"regexp"
@@ -95,15 +96,18 @@ func findEs() {
 	escount := Es.Count("bidding", "bidding", esquery)
 	util.Debug("查询总数:", escount)
 	//查询条件类型转换
-	var q es.Query
-	tmpQuery := es.BoolQuery{
-		QueryStrings: esquery,
-	}
-	q = tmpQuery
+	//var q es.Query
+	//tmpQuery := es.BoolQuery{
+	//	QueryStrings: esquery,
+	//}
+	//q = tmpQuery
 	numDocs := 0
 
+	query := es.NewBoolQuery().
+		Must(es.NewRangeQuery("comeintime").Gte(stime).Lte(currenttime)).
+		MustNot(es.NewExistsQuery("yuceendtime"))
 	//游标查询,index不支持别名,只能写索引库的名称
-	res, err := client.Scroll("bidding_v1").Query(q).Size(500).Do() //查询一条获取游标
+	res, err := client.Scroll("bidding").Query(query).Scroll("5m").Size(500).Do(context.TODO()) //查询一条获取游标
 	if err == nil {
 		scrollId := res.ScrollId
 		for {
@@ -111,7 +115,7 @@ func findEs() {
 				util.Debug("ScrollId Is Error")
 				break
 			}
-			searchResult, err := client.Scroll("bidding_v1").Size(500).ScrollId(scrollId).Do() //查询
+			searchResult, err := client.Scroll("1m").Size(500).ScrollId(scrollId).Do(context.TODO()) //查询
 			if err != nil {
 				if err.Error() == "EOS" { //迭代完毕
 					util.Debug("Es Search Data Over:", err)
@@ -130,7 +134,7 @@ func findEs() {
 						wg.Done()
 					}()
 					tmp := make(map[string]interface{})
-					if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
+					if json.Unmarshal(tmpHit.Source, &tmp) == nil {
 						save := make(map[string]interface{})
 						for _, v := range fieldArr {
 							if tmp[v] != nil {
@@ -162,7 +166,7 @@ func findEs() {
 					}
 				}(hit)
 				numDocs += 1
-				if numDocs%100 == 0 {
+				if numDocs%5000 == 0 {
 					util.Debug("Current:", numDocs)
 				}
 			}
@@ -170,7 +174,7 @@ func findEs() {
 		}
 		wg.Wait()
 		util.Debug("over---", numDocs)
-		client.ClearScroll().ScrollId(scrollId).Do() //清理游标
+		client.ClearScroll().ScrollId(scrollId).Do(context.TODO()) //清理游标
 	}
 }
 
@@ -216,9 +220,9 @@ func YcTime(tmp map[string]interface{}) (int64, int64) {
 
 func fcResult() {
 	util.Debug("预测结果时间-------结果表迁移数据----------")
-	currenttime := time.Now().Unix()
-	endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix()
-	q := bson.M{"yuceendtime": bson.M{"$gte": currenttime, "$lt": endtime}, "move": nil}
+	//currenttime := time.Now().Unix()
+	//endtime := time.Unix(currenttime, 0).AddDate(0, 3, 0).Unix()
+	q := bson.M{"yuceendtime": bson.M{"$exists": 1}, "move": nil}
 	field := bson.M{"project_duration": 0, "project_timeunit": 0, "projectperiod": 0, "s_subscopeclass": 0}
 	util.Debug(q)
 	sess := MongoTool.GetMgoConn()
@@ -238,12 +242,11 @@ func fcResult() {
 
 		id := mongodb.BsonIdToSId(tmp["_id"])
 		tmp["infoid"] = id
-		delete(tmp, "_id")
-		tmp["yucetime"] = currenttime
+		tmp["yucetime"] = util.Int64All(tmp["publishtime"])
 		tmp["jyhref"] = `/jyapp/article/content/` + util.CommonEncodeArticle("content", id) + `.html`
 		if tmp["buyer"] == nil || tmp["buyerperson"] == nil || tmp["buyertel"] == nil {
 			esq := `{"query":{"bool":{"must":[{"term":{"ids":"` + id + `"}}]}}}`
-			info := Es.Get("project", "project", esq)
+			info := Es.Get("projectset", esq)
 			if len(*info) > 0 {
 				if (*info)[0]["buyer"] != nil {
 					tmp["buyer"] = (*info)[0]["buyer"]

+ 10 - 10
fullproject/src_v1/config.json

@@ -1,15 +1,15 @@
 {
-    "udpport": ":1482",
+    "udpport": ":1782",
     "loadStart": 0,
 	"validdays":150,
     "statusdays": 15,
-    "redis-addr": "project=192.168.3.207:2679",
-	"mongodbServers": "192.168.3.207:27092",
+    "redis-addr": "project=192.168.3.207:1679",
+	"mongodbServers": "192.168.3.207:29099",
     "mongodbPoolSize": 10,
     "mongodbName": "wjh",
 	"hints":"_id_1_publishtime_1",
-    "extractColl": "bidding",
-    "extractColl1": "bidding",
+    "extractColl": "extract",
+    "extractColl1": "extract",
     "projectColl": "projectset",
     "backupFlag": true,
     "siteColl": "site",
@@ -19,19 +19,19 @@
         "api": "http://172.17.145.179:19281/_send/_mail"
     },
     "redis": {
-        "dbname": "",
-        "addr": "",
-        "db": ""
+        "dbname": "qyxy_buyer",
+        "addr": "192.168.3.207:1679",
+        "db": "3"
     },
     "bidding": {
-        "addr": "192.168.3.207:27092",
+        "addr": "192.168.3.207:29099",
         "dbname": "qfw",
         "dbsize": 5,
         "uname": "",
         "upwd": ""
     },
     "spider": {
-        "addr": "192.168.3.207:27092",
+        "addr": "192.168.3.207:29099",
         "dbname": "wjh",
         "dbsize": 2
     },

+ 3 - 3
fullproject/src_v1/main.go

@@ -92,7 +92,7 @@ func main() {
 			P_QL.loadData(loadStart)
 		}
 	}
-	go checkMapJob()
+	//go checkMapJob()
 	go P_QL.nsqMethod()
 
 	for {
@@ -177,8 +177,8 @@ func main() {
 }
 
 func mainT() {
-	sid = "62df420b2e43d7e553df78f3"
-	eid = "62df420b2e43d7e553df78f3"
+	sid = "623d1ed4923488e17244bea7"
+	eid = "6327653467a6b0a286122df9"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()

+ 2 - 2
fullproject/src_v1/project.go

@@ -996,7 +996,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 		}
 		set["entidlist"] = project.EntIdList
 	}
-	// first_cooperation
+	//first_cooperation
 	if pInfo.Buyer != "" && len(strings.Split(project.Winners, ",")) > 0 {
 		FirstCooperation(set, pInfo.Buyer, strings.Split(project.Winners, ","), project.EntIdList)
 	}
@@ -1122,7 +1122,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 
 	set["mpn"] = pInfo.MPN
 	set["mpc"] = pInfo.MPC
-	if p.currentType == "project" {
+	if p.currentType == "project" || p.currentType == "project_history" {
 		set["pici"] = p.pici
 	} else {
 		set["pici"] = tmp["comeintime"]

+ 3 - 1
fullproject/src_v1/project_tool.go

@@ -42,7 +42,7 @@ var INFOFIELDS = []string{
 	"budget",
 	"bidamount",
 	"topscopeclass",
-	"subscopclass",
+	"subscopeclass",
 	"infoformat",
 	"buyerperson",
 	"buyertel",
@@ -63,6 +63,8 @@ var INFOFIELDS = []string{
 	"bid_guarantee",
 	"qualifies",
 	"entidlist",
+	"winnerorder",
+	"purchasinglist",
 }
 
 // 包信息

+ 5 - 4
fullproject/src_v1/task.go

@@ -296,7 +296,7 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 	if coll == "" {
 		coll = ExtractColl
 	}
-	thread := util.IntAllDef(Thread, 4)
+	thread := util.IntAllDef(Thread, 1)
 	if thread > 0 {
 		p.thread = thread
 	}
@@ -486,7 +486,8 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 			}
 		}
 	}()
-	fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
+	//fields := map[string]interface{}{"kvtext": 0, "repeat_reason": 0, "field_source": 0}
+	fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0}
 	if p.currentType == "project" || p.currentType == "project_history" {
 		c, _ := sess.DB(db).C(coll).Find(q).Count()
 		util.Debug("共查询:", c, "条")
@@ -516,13 +517,13 @@ L:
 					}
 				}
 				if util.IntAll(tmp["repeat"]) == 0 {
-					if util.ObjToString(tmp["toptype"]) != "采购意向" {
+					if util.ObjToString(tmp["toptype"]) != "采购意向" && util.ObjToString(tmp["toptype"]) != "产权" {
 						if P_QL.currentType == "ql" {
 							infoPool <- tmp
 						} else if P_QL.currentType == "project" && util.IntAll(tmp["dataging"]) == 0 {
 							// id段增量数据
 							infoPool <- tmp
-						} else if P_QL.currentType == "project_history" && util.ObjToString(tmp["history_updatetime"]) != "" {
+						} else if P_QL.currentType == "project_history" && tmp["history_updatetime"] != nil {
 							// id段 历史数据
 							infoPool <- tmp
 						}

+ 1 - 0
fullproject/src_v1/update.go

@@ -1300,6 +1300,7 @@ func (p *ProjectTask) taskinfo(id string) {
 	}
 
 	newP["ids"] = ids
+	newP["pici"] = p.pici
 	p.AllIdsMapLock.Lock()
 	_, ok := p.AllIdsMap[pid]
 	if ok {

+ 8 - 4
monitor/task.go

@@ -19,9 +19,10 @@ var taskA = 0 // 增量统计跳过
 var (
 	WebUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a"
 
-	WarningStr1 = "机器智能识别提醒,%d分钟未有数据入库,bidding表数据已积累%d条。"
+	WarningStr1 = "机器智能识别提醒,超过30分钟未有数据入库,bidding表数据已积累%d条。"
 	WarningStr2 = "数据采集提醒,%d分钟未有数据入库。"
-	skip, send  = 0, 0
+
+	skip, send = 0, 0
 )
 
 func TimeTask() {
@@ -53,13 +54,16 @@ func Attachment() {
 	info, _ := Mgo.Find("ocr_goods_over", nil, map[string]interface{}{"_id": -1}, nil, true, -1, -1)
 	lastT := util.Int64All((*info)[0]["import_time"])
 	if time.Now().Unix()-lastT > 30*60 {
+		lteid := util.ObjToString((*info)[0]["lteid"])
 		if skip == 0 {
 			skip += 1
 			util.Debug("跳过第一次", (*info)[0]["_id"])
 		} else {
 			send++
 			if send <= 3 {
-				SendMsg("当前最后一个id段数据累积时间已经超过30分钟, 最后一段数据入库时间为:" + util.FormatDateByInt64(&lastT, util.Date_Full_Layout))
+				//SendMsg("当前最后一个id段数据累积时间已经超过30分钟, 最后一段数据入库时间为:" + util.FormatDateByInt64(&lastT, util.Date_Full_Layout))
+				c := Mgo.Count("ocr_goods_over", map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId(lteid)}})
+				SendMsg(fmt.Sprintf(WarningStr1, c))
 			}
 		}
 	} else {
@@ -145,7 +149,7 @@ func SendMail(report string) {
 func SendMsg(content string) {
 	client := &http.Client{}
 	data := map[string]interface{}{"msgtype": "text", "text": map[string]interface{}{
-		"content": content, "mentioned_list": []string{"@all"},
+		"content": content, "mentioned_mobile_list": []string{"13373929153,15090279371,15639297172"},
 	}}
 	bytesData, _ := json.Marshal(data)
 	req, _ := http.NewRequest("POST", WebUrl, bytes.NewReader(bytesData))

+ 3 - 1
projectinfo/src/config.json

@@ -31,6 +31,8 @@
         "addr": "http://172.17.145.170:9800",
         "index": "bidding",
         "itype": "bidding",
-        "pool": 12
+        "pool": 12,
+		"user": "",
+		"password": ""
     }
 }

+ 423 - 0
projectinfo/src/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 7 - 7
projectinfo/src/main.go

@@ -2,19 +2,17 @@ package main
 
 import (
 	"encoding/json"
+	"github.com/donnie4w/go-logger/logger"
 	"log"
 	mu "mfw/util"
 	mgoutil "mongodb"
 	"net"
 	"qfw/util"
-	es "qfw/util/elastic"
-
-	"github.com/donnie4w/go-logger/logger"
 )
 
 var (
 	Sysconfig      map[string]interface{}
-	Es             *es.Elastic
+	Es             *Elastic
 	Index          string
 	Itype          string
 	MongoTool      *mgoutil.MongodbSim
@@ -63,9 +61,11 @@ func init() {
 	MixMgo.InitPool()
 	//es
 	econf := Sysconfig["elastic"].(map[string]interface{})
-	Es = &es.Elastic{
-		S_esurl: econf["addr"].(string),
-		I_size:  util.IntAllDef(econf["pool"], 12),
+	Es = &Elastic{
+		S_esurl:  econf["addr"].(string),
+		I_size:   util.IntAllDef(econf["pool"], 12),
+		Username: econf["user"].(string),
+		Password: econf["password"].(string),
 	}
 	Es.InitElasticSize()
 	Index = econf["index"].(string)

+ 2 - 2
projectinfo/src/task.go

@@ -184,7 +184,7 @@ func GetProjectData(sid, eid string) {
 			buyer := qu.ObjToString(pro["buyer"])
 			if buyer != "" {
 				esqyery := `{"query": {"bool": {"must": [{"term": {"buyer": "` + buyer + `"}}],"must_not": [{"term": {"toptype": "拟建"}}]}},"from": 0,"size": 1}`
-				list := Es.Get(Index, Itype, esqyery)
+				list := Es.Get(Index, esqyery)
 				if list == nil || len(*list) == 0 { //buyer仅有拟建数据不预测
 					return
 				}
@@ -304,7 +304,7 @@ func GetProjects(purchasing, buyer string) (projects []map[string]interface{}) {
 			latest_project := map[string]interface{}{} //存储最后一条数据信息
 			result_project := map[string]interface{}{} //存储每个purchasing所查询的招标信息
 			esquery := fmt.Sprintf(ESMODEL, buyer, text)
-			list := Es.Get(Index, Itype, esquery)
+			list := Es.Get(Index, esquery)
 			if list != nil && len(*list) > 0 {
 				for i, l := range *list {
 					p_phone := qu.ObjToString(l["buyertel"])

+ 6 - 6
qyxy/qyxy_es/config.json

@@ -1,14 +1,14 @@
 {
-  "mgodb": "192.168.3.207:27092",
+  "mgodb": "192.168.3.206:27002",
   "dbsize": 12,
-  "dbname": "wjh",
-  "dbcoll": "qyxy_0902",
-  "uname": "",
-  "upwd": "",
+  "dbname": "mixdata",
+  "dbcoll": "qyxy_std",
+  "uname": "root",
+  "upwd": "root",
   "tasktime": 0,
   "updatetime": 0,
   "elastic": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://192.168.3.241:9205",
     "index": "qyxy",
     "itype": "qyxy",
     "pool": 12,

+ 423 - 0
qyxy/qyxy_es/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	//bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 3 - 4
qyxy/qyxy_es/main.go

@@ -3,7 +3,6 @@ package main
 import (
 	"mongodb"
 	qu "qfw/util"
-	es "qfw/util/elastic"
 )
 
 var (
@@ -11,7 +10,7 @@ var (
 	Mgo        *mongodb.MongodbSim
 	Dbname     string
 	Dbcoll     string
-	Es, Es1    *es.Elastic
+	Es, Es1    *Elastic
 	Index      string
 	Itype      string
 	EsFields   []string
@@ -37,12 +36,12 @@ func init() {
 	econf := Sysconfig["elastic"].(map[string]interface{})
 	Index = econf["index"].(string)
 	Itype = econf["itype"].(string)
-	Es = &es.Elastic{
+	Es = &Elastic{
 		S_esurl: econf["addr"].(string),
 		I_size:  qu.IntAllDef(econf["pool"], 12),
 	}
 	Es.InitElasticSize()
-	Es1 = &es.Elastic{
+	Es1 = &Elastic{
 		S_esurl: econf["addr"].(string),
 		I_size:  qu.IntAllDef(econf["pool"], 12),
 	}

+ 8 - 4
qyxy/qyxy_es/task.go

@@ -228,8 +228,8 @@ func StdAll() {
 
 	pool := make(chan bool, 10)
 	wg := &sync.WaitGroup{}
-	//q := bson.M{"_id": "f9ad04e5529023e8af0b2ad8b49bf227"}
-	it := sess.DB(Dbname).C(Dbcoll).Find(nil).Iter()
+	q := bson.M{"_id": "f32b67603a7f296e5f710caf2064ef9d"}
+	it := sess.DB(Dbname).C(Dbcoll).Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
 		if count%20000 == 0 {
@@ -248,6 +248,10 @@ func StdAll() {
 				if tmp[field] == nil {
 					continue
 				}
+				if field == "_id" {
+					esMap[field] = tmp[field]
+					esMap["id"] = tmp[field]
+				}
 				if field == "company_name" {
 					esMap[field] = tmp["company_name"]
 					esMap["name"] = tmp["company_name"]
@@ -413,7 +417,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					Es.BulkSave(Index, Itype, &arru, true)
+					Es.BulkSave(Index, arru)
 				}(arru)
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0
@@ -425,7 +429,7 @@ func SaveEs() {
 					defer func() {
 						<-SP
 					}()
-					Es.BulkSave(Index, Itype, &arru, true)
+					Es.BulkSave(Index, arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, 100)
 				indexu = 0

+ 1 - 2
qyxy/save_ent/main.go

@@ -50,7 +50,7 @@ func main() {
 
 	field := map[string]interface{}{"company_status": 1, "company_type": 1, "company_phone": 1, "company_email": 1, "business_scope": 1, "employees": 1,
 		"annual_reports": 1, "company_name": 1, "company_type_old": 1}
-	query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Limit(3000).Sort("-updatetime").Iter()
+	query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Sort("-updatetime").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
 		if count%5000 == 0 {
@@ -195,4 +195,3 @@ func updateMethod() {
 		}
 	}
 }
-

+ 30 - 0
qyxy/save_mgo/main.go

@@ -10,6 +10,7 @@ import (
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -56,6 +57,35 @@ func initRedis() (err error) {
 }
 
 func main() {
+	go updateMethod()
+
+	sess := MongoTool.GetMgoConn()
+	defer MongoTool.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+
+	f := bson.M{"company_name": 1}
+	query := sess.DB("mixdata").C("winner_enterprise").Find(nil).Select(f).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%5000 == 0 {
+			util.Debug("current ---", count)
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+}
+
+func main1() {
 
 	go updateMethod()
 

+ 11 - 11
udpcreateindex/src/biddingall.go

@@ -142,17 +142,17 @@ func (t *TaskInfo) biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				FieldMethod(compare, update)
 				compare = nil
 			} else {
-				area := qutil.ObjToString(tmp["area"])
-				city := qutil.ObjToString(tmp["city"])
-				district := qutil.ObjToString(tmp["district"])
-				UpdatesLock.Lock()
-				rdata := standardCheckCity(area, city, district)
-				UpdatesLock.Unlock()
-				if len(rdata) > 0 {
-					for k, v := range rdata {
-						update[k] = v
-					}
-				}
+				//area := qutil.ObjToString(tmp["area"])
+				//city := qutil.ObjToString(tmp["city"])
+				//district := qutil.ObjToString(tmp["district"])
+				//UpdatesLock.Lock()
+				//rdata := standardCheckCity(area, city, district)
+				//UpdatesLock.Unlock()
+				//if len(rdata) > 0 {
+				//	for k, v := range rdata {
+				//		update[k] = v
+				//	}
+				//}
 			}
 			//------------------对比结束
 			//同时保存到elastic

+ 1 - 1
udpcreateindex/src/biddingdeletebyextracttype.go

@@ -40,7 +40,7 @@ func biddingDelByExtracttype(data []byte, mapInfo map[string]interface{}) {
 	for tmp := make(map[string]interface{}); biddingquery.Next(tmp); i = i + 1 {
 		n++
 		_id := mongodb.BsonIdToSId(tmp["_id"])
-		if Es2.DelById(qutil.ObjToString(biddingIndex["index"]), qutil.ObjToString(biddingIndex["type"]), _id) { //删除
+		if Es1.DelById(qutil.ObjToString(biddingIndex["index"]), qutil.ObjToString(biddingIndex["type"]), _id) { //删除
 			//Es2.DelById(qutil.ObjToString(biddingIndex["idnex"]), qutil.ObjToString(biddingIndex["type"]), _id)
 			dnum++
 		}

+ 3 - 3
udpcreateindex/src/biddingindex.go

@@ -169,9 +169,9 @@ func (t *TaskInfo) doIndex(infos []map[string]interface{}, eMap map[string]map[s
 		//------------------对比结束
 
 		//处理key descript
-		if bkey == "" {
-			DealInfo(&tmp, &update)
-		}
+		//if bkey == "" {
+		//	DealInfo(&tmp, &update)
+		//}
 		//同时保存到elastic
 		for tk, tv := range update {
 			tmp[tk] = tv

+ 4 - 1
udpcreateindex/src/biddingtask.go

@@ -267,7 +267,10 @@ func GetEsField(tmp, update map[string]interface{}, stype string) map[string]int
 						newTmp[field] = util.ObjToString(tmp["title"]) + " " + detail
 					}
 				}
-			} else if field == "_id" || field == "topscopeclass" || field == "entidlist" { //不做处理
+			} else if field == "_id" {
+				newTmp[field] = mongodb.BsonIdToSId(tmp["_id"])
+				newTmp["id"] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if field == "topscopeclass" || field == "entidlist" { //不做处理
 				newTmp[field] = tmp[field]
 			} else if field == "publishtime" || field == "comeintime" {
 				//字段类型不正确,特别处理

+ 3 - 5
udpcreateindex/src/buyertask.go

@@ -38,7 +38,7 @@ func buyerEsTaskOnce() {
 	buyer_ent := qu.ObjToString(buyerent["collect1"])
 	//buyer_enterr := qu.ObjToString(buyerent["collect2"])
 	index, _ := buyerent["index"].(string)
-	itype, _ := buyerent["type"].(string)
+	//itype, _ := buyerent["type"].(string)
 	//mongo
 	sess := standardMgo.GetMgoConn()
 	defer standardMgo.DestoryMongoConn(sess)
@@ -95,8 +95,7 @@ func buyerEsTaskOnce() {
 			arrEs = append(arrEs, savetmp)
 			if len(arrEs) >= EsBulkSize {
 				tmps := arrEs
-				//Es1.BulkSave(index, itype, &tmps, true)
-				Es2.BulkSave(index, itype, &tmps, true)
+				Es1.BulkSave(index, tmps)
 				arrEs = []map[string]interface{}{}
 			}
 			buyerEsLock.Unlock()
@@ -147,8 +146,7 @@ func buyerEsTaskOnce() {
 	buyerEsLock.Lock()
 	if len(arrEs) > 0 {
 		tmps := arrEs
-		//Es1.BulkSave(index, itype, &tmps, true)
-		Es2.BulkSave(index, itype, &tmps, true)
+		Es1.BulkSave(index, tmps)
 		arrEs = []map[string]interface{}{}
 	}
 	buyerEsLock.Unlock()

+ 8 - 8
udpcreateindex/src/config.json

@@ -2,7 +2,7 @@
   "udpport": ":1483",
   "msg_server": "10.171.112.160:7070",
   "mgo_bidding": {
-    "addr": "192.168.3.207:27092",
+    "addr": "192.168.3.207:29099",
     "size": 10,
     "db": "wjh",
     "uname": "",
@@ -10,26 +10,26 @@
     "collect": "bidding"
   },
   "mgo_extract": {
-    "addr": "192.168.3.207:27092",
+    "addr": "192.168.3.207:29099",
     "size": 10,
     "db": "wjh",
     "collect": "extract",
     "collect1": ""
   },
   "mgo_qyxy": {
-    "addr": "192.168.3.207",
+    "addr": "192.168.3.207:29099",
     "pool": 10,
     "db": "mixdata"
   },
   "mgo_project": {
-    "addr": "192.168.3.207",
+    "addr": "192.168.3.207:29099",
     "db": "qfw",
     "collect": "projectset",
     "index": "projectset",
     "type": "projectset"
   },
   "standard": {
-    "addr": "192.168.3.207:27092",
+    "addr": "192.168.3.206:27002",
     "pool": 10,
     "db": "mixdata",
     "coll_area": "address_jy_2021",
@@ -52,12 +52,12 @@
     }
   },
   "elastic_1": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://192.168.3.241:9205",
     "pool": 12,
     "label": "旧es集群库"
   },
   "elastic_2": {
-    "addr": "http://192.168.3.206:9800",
+    "addr": "http://192.168.3.241:9205",
     "pool": 12,
     "label": "新es集群库"
   },
@@ -68,7 +68,7 @@
     "concurrent": 1
   },
   "bidding_index": {
-    "index": "bidding_v2",
+    "index": "bidding_v1",
     "type": "bidding",
     "multiIndex": "",
     "esfieldsmap": {

+ 423 - 0
udpcreateindex/src/elasticSim.go

@@ -0,0 +1,423 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	es "gopkg.in/olivere/elastic.v7"
+	"log"
+	"qfw/util"
+	"runtime"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Elastic struct {
+	S_esurl      string
+	I_size       int
+	Addrs        []string
+	Pool         chan *es.Client
+	lastTime     int64
+	lastTimeLock sync.Mutex
+	ntimeout     int
+	Username     string
+	Password     string
+}
+
+func (e *Elastic) InitElasticSize() {
+	e.Pool = make(chan *es.Client, e.I_size)
+	for _, s := range strings.Split(e.S_esurl, ",") {
+		e.Addrs = append(e.Addrs, s)
+	}
+	log.Println(e.Password, e.Username)
+	for i := 0; i < e.I_size; i++ {
+		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
+		e.Pool <- client
+	}
+}
+
+//关闭连接
+func (e *Elastic) DestoryEsConn(client *es.Client) {
+	select {
+	case e.Pool <- client:
+		break
+	case <-time.After(time.Second * 1):
+		if client != nil {
+			client.Stop()
+		}
+		client = nil
+	}
+}
+
+func (e *Elastic) GetEsConn() *es.Client {
+	select {
+	case c := <-e.Pool:
+		if c == nil || !c.IsRunning() {
+			log.Println("new esclient.", len(e.Pool))
+			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
+				es.SetSniff(false))
+			if err == nil && client.IsRunning() {
+				return client
+			}
+		}
+		return c
+	case <-time.After(time.Second * 4):
+		//超时
+		e.ntimeout++
+		e.lastTimeLock.Lock()
+		defer e.lastTimeLock.Unlock()
+		//12秒后允许创建链接
+		c := time.Now().Unix() - e.lastTime
+		if c > 12 {
+			e.lastTime = time.Now().Unix()
+			log.Println("add client..", len(e.Pool))
+			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+			go func() {
+				for i := 0; i < 2; i++ {
+					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
+					e.Pool <- client
+				}
+			}()
+			return c
+		}
+		return nil
+	}
+}
+
+func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer func() {
+		go e.DestoryEsConn(client)
+	}()
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			if resNum < 5000 {
+				res = make([]map[string]interface{}, resNum)
+				for i, hit := range searchResult.Hits.Hits {
+					parseErr := json.Unmarshal(hit.Source, &res[i])
+					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
+						res[i]["highlight"] = map[string][]string(hit.Highlight)
+					}
+				}
+			} else {
+				log.Println("查询结果太多,查询到:", resNum, "条")
+			}
+		}
+	}
+	return &res
+}
+
+//关闭elastic
+func (e *Elastic) Close() {
+	for i := 0; i < e.I_size; i++ {
+		cli := <-e.Pool
+		cli.Stop()
+		cli = nil
+	}
+	e.Pool = nil
+	e = nil
+}
+
+//获取连接
+//func (e *Elastic) GetEsConn() (c *es.Client) {
+//	defer util.Catch()
+//	select {
+//	case c = <-e.Pool:
+//		if c == nil || !c.IsRunning() {
+//			client, err := es.NewClient(es.SetURL(addrs...),
+//				es.SetMaxRetries(2), es.SetSniff(false))
+//			if err == nil && client.IsRunning() {
+//				return client
+//			}
+//			return nil
+//		}
+//		return
+//	case <-time.After(time.Second * 7):
+//		//超时
+//		ntimeout++
+//		log.Println("timeout times:", ntimeout)
+//		return nil
+//	}
+//}
+
+func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		req := client.Bulk()
+		for _, v := range obj {
+			//if isDelBefore {
+			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
+			//}
+			id := util.ObjToString(v["_id"])
+			delete(v, "_id")
+			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
+		}
+		_, err := req.Do(context.Background())
+		if err != nil {
+			log.Println("批量保存到ES出错", err.Error())
+		}
+	}
+}
+
+//根据id删除索引对象
+func (e *Elastic) DelById(index, itype, id string) bool {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	b := false
+	if client != nil {
+		var err error
+		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
+		if err != nil {
+			log.Println("更新检索出错:", err.Error())
+		} else {
+			b = true
+		}
+	}
+	return b
+}
+
+func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	var res []map[string]interface{}
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
+		if err != nil {
+			log.Println("从ES查询出错", err.Error())
+			return nil
+		}
+
+		if searchResult.Hits != nil {
+			resNum := len(searchResult.Hits.Hits)
+			util.Debug(resNum)
+			res = make([]map[string]interface{}, resNum)
+			for i, hit := range searchResult.Hits.Hits {
+				json.Unmarshal(hit.Source, &res[i])
+			}
+		}
+	}
+	return &res
+}
+
+//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		query := `{"query":{"term":{"_id":"` + id + `"}}`
+//		if len(fields) > 0 {
+//			query = query + `,"_source":[` + fields + `]`
+//		}
+//		query = query + "}"
+//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
+//		if err != nil {
+//			log.Println("从ES查询出错", err.Error())
+//			return nil
+//		}
+//		var res map[string]interface{}
+//		if searchResult.Hits != nil {
+//			resNum := len(searchResult.Hits.Hits)
+//			if resNum == 1 {
+//				res = make(map[string]interface{})
+//				for _, hit := range searchResult.Hits.Hits {
+//					json.Unmarshal(*hit.Source., &res)
+//				}
+//				return &res
+//			}
+//		}
+//	}
+//	return nil
+//}
+
+func (e *Elastic) Count(index, itype string, query interface{}) int64 {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	if client != nil {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Println("[E]", r)
+				for skip := 1; ; skip++ {
+					_, file, line, ok := runtime.Caller(skip)
+					if !ok {
+						break
+					}
+					go log.Printf("%v,%v\n", file, line)
+				}
+			}
+		}()
+		var qq es.Query
+		if qi, ok2 := query.(es.Query); ok2 {
+			qq = qi
+		}
+		n, err := client.Count(index).Query(qq).Do(context.Background())
+		if err != nil {
+			log.Println("统计出错", err.Error())
+		}
+
+		return n
+	}
+	return 0
+}
+
+//更新一个字段
+//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, data := range update {
+//			id := data["id"]
+//			updateStr := data["updateStr"]
+//			if id != "" && updateStr != "" {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			} else {
+//				log.Println("数据错误")
+//			}
+//		}
+//	}
+//}
+
+//更新多个字段
+//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
+//	client := e.GetEsConn()
+//	defer e.DestoryEsConn(client)
+//	if client != nil {
+//		defer func() {
+//			if r := recover(); r != nil {
+//				log.Println("[E]", r)
+//				for skip := 1; ; skip++ {
+//					_, file, line, ok := runtime.Caller(skip)
+//					if !ok {
+//						break
+//					}
+//					go log.Printf("%v,%v\n", file, line)
+//				}
+//			}
+//		}()
+//		for _, arr := range arrs {
+//			id := arr[0]["id"].(string)
+//			update := arr[1]["update"].([]string)
+//			for _, str := range update {
+//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
+//				if err != nil {
+//					log.Println("更新检索出错:", err.Error())
+//				}
+//			}
+//		}
+//	}
+//}
+
+// UpdateBulk 批量修改文档
+func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type(itype)
+	for _, d := range docs {
+		id := d[0]["_id"].(string)
+		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
+		bulkService.Add(doc)
+	}
+	_, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("UpdateBulk all success err is %v\n", err)
+	}
+	//if len(res.Failed()) > 0 {
+	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
+	//}
+}
+
+// UpsertBulk 批量修改文档(不存在则插入)
+func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
+		bulkService.Add(doc)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		return err
+	}
+	if len(res.Failed()) > 0 {
+		return errors.New(res.Failed()[0].Error.Reason)
+	}
+	return nil
+}
+
+// 批量删除
+func (e *Elastic) DeleteBulk(index string, ids []string) {
+	client := e.GetEsConn()
+	defer e.DestoryEsConn(client)
+	bulkService := client.Bulk().Index(index).Refresh("true")
+	bulkService.Type("bidding")
+	for i := range ids {
+		req := es.NewBulkDeleteRequest().Id(ids[i])
+		bulkService.Add(req)
+	}
+	res, err := bulkService.Do(context.Background())
+	if err != nil {
+		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
+	}
+}

+ 8 - 6
udpcreateindex/src/init.go

@@ -6,7 +6,6 @@ import (
 	"mongodb"
 	"net"
 	"qfw/util"
-	elastic "qfw/util/elastic"
 	"strings"
 	"time"
 	u "util"
@@ -21,8 +20,7 @@ var (
 	projectMgo  *mongodb.MongodbSim
 	standardMgo *mongodb.MongodbSim
 
-	//Es1 *elastic.Elastic
-	Es2 *elastic.Elastic
+	Es1 *Elastic
 
 	currentColl string
 	udpclient   mu.UdpClient //udp对象
@@ -97,12 +95,16 @@ func init() {
 		MongodbAddr: extract["addr"].(string),
 		Size:        util.IntAllDef(extract["pool"], 5),
 		DbName:      extract["db"].(string),
+		UserName:    bidding["uname"].(string),
+		Password:    bidding["upwd"].(string),
 	}
 	extractMgo.InitPool()
 	qyxyMgo = &mongodb.MongodbSim{
 		MongodbAddr: qyxy["addr"].(string),
 		Size:        util.IntAllDef(qyxy["pool"], 5),
 		DbName:      qyxy["db"].(string),
+		UserName:    bidding["uname"].(string),
+		Password:    bidding["upwd"].(string),
 	}
 	qyxyMgo.InitPool()
 	projectMgo = &mongodb.MongodbSim{
@@ -128,12 +130,12 @@ func init() {
 	//}
 	//Es1.InitElasticSize()
 
-	econf2 := Sysconfig["elastic_2"].(map[string]interface{})
-	Es2 = &elastic.Elastic{
+	econf2 := Sysconfig["elastic_1"].(map[string]interface{})
+	Es1 = &Elastic{
 		S_esurl: econf2["addr"].(string),
 		I_size:  util.IntAllDef(econf2["pool"], 5),
 	}
-	Es2.InitElasticSize()
+	Es1.InitElasticSize()
 
 	if mi := util.ObjToString(biddingIndex["multiIndex"]); mi != "" {
 		multiIndex = strings.Split(mi, ",")

+ 13 - 24
udpcreateindex/src/main.go

@@ -11,7 +11,6 @@ import (
 	"net/http"
 	nsq "nsq"
 	"qfw/util"
-	"qfw/util/redis"
 	"strings"
 	"time"
 )
@@ -22,14 +21,14 @@ var (
 
 func main() {
 	// company_id
-	redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
-	inits()
+	//redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
+	//inits()
 
 	//go inspectQuery()
-	go checkMapJob()
-	go task_index()
+	//go checkMapJob()
+	//go task_index()
 
-	go nsqMethod()
+	//go nsqMethod()
 
 	go UpdateBidding()
 	go UpdateExtract()
@@ -248,11 +247,7 @@ func SaveEsMethod() {
 					defer func() {
 						<-saveEsSp
 					}()
-					//Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
-					Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
-					//if len(multiIndex) == 2 {
-					//	Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
-					//}
+					Es1.BulkSave(util.ObjToString(biddingIndex["index"]), arru)
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -264,11 +259,7 @@ func SaveEsMethod() {
 					defer func() {
 						<-saveEsSp
 					}()
-					//Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
-					Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
-					//if len(multiIndex) == 2 {
-					//	Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
-					//}
+					Es1.BulkSave(util.ObjToString(biddingIndex["index"]), arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -291,7 +282,7 @@ func SaveElseEsMethod() {
 					defer func() {
 						<-saveEsElseSp
 					}()
-					Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
+					Es1.BulkSave(util.ObjToString(biddingIndex["index"]), arru)
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -303,7 +294,7 @@ func SaveElseEsMethod() {
 					defer func() {
 						<-saveEsElseSp
 					}()
-					Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
+					Es1.BulkSave(util.ObjToString(biddingIndex["index"]), arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -326,7 +317,7 @@ func SaveAllEsMethod() {
 					defer func() {
 						<-saveEsAllSp
 					}()
-					Es2.BulkSave("biddingall", "bidding", &arru, true)
+					Es1.BulkSave("biddingall", arru)
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -338,7 +329,7 @@ func SaveAllEsMethod() {
 					defer func() {
 						<-saveEsAllSp
 					}()
-					Es2.BulkSave("biddingall", "bidding", &arru, true)
+					Es1.BulkSave("biddingall", arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -361,8 +352,7 @@ func SaveProjectEs() {
 					defer func() {
 						<-saveProjectSp
 					}()
-					//Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
-					Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
+					Es1.BulkSave(util.ObjToString(project["index"]), arru)
 				}(arru)
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0
@@ -374,8 +364,7 @@ func SaveProjectEs() {
 					defer func() {
 						<-saveProjectSp
 					}()
-					//Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
-					Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
+					Es1.BulkSave(util.ObjToString(project["index"]), arru)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, EsBulkSize)
 				indexu = 0

+ 3 - 5
udpcreateindex/src/winnertask.go

@@ -36,7 +36,7 @@ func winnerEsTaskOnce() {
 	win_ent := qu.ObjToString(winnerent["collect1"])
 	//win_enterr := qu.ObjToString(winnerent["collect2"])
 	index, _ := winnerent["index"].(string)
-	itype, _ := winnerent["type"].(string)
+	//itype, _ := winnerent["type"].(string)
 	//mongo
 	sess := standardMgo.GetMgoConn()
 	defer standardMgo.DestoryMongoConn(sess)
@@ -75,8 +75,7 @@ func winnerEsTaskOnce() {
 			arrEs = append(arrEs, savetmp)
 			if len(arrEs) >= EsBulkSize {
 				tmps := arrEs
-				//Es1.BulkSave(index, itype, &tmps, true)
-				Es2.BulkSave(index, itype, &tmps, true)
+				Es1.BulkSave(index, tmps)
 				arrEs = []map[string]interface{}{}
 			}
 			winerEsLock.Unlock()
@@ -120,8 +119,7 @@ func winnerEsTaskOnce() {
 	winerEsLock.Lock()
 	if len(arrEs) > 0 {
 		tmps := arrEs
-		//Es1.BulkSave(index, itype, &tmps, true)
-		Es2.BulkSave(index, itype, &tmps, true)
+		Es1.BulkSave(index, tmps)
 		arrEs = []map[string]interface{}{}
 	}
 	winerEsLock.Unlock()