maxiaoshan 5 years ago
parent
commit
c9af34f53c

+ 155 - 0
udpcreateindex/src/bidingpurchasing.go

@@ -0,0 +1,155 @@
+package main
+
+import (
+	"log"
+	"qfw/util"
+	elastic "qfw/util/elastic"
+	"sync"
+	"unicode/utf8"
+
+	u "./util"
+	"gopkg.in/mgo.v2/bson"
+)
+
+//定时查询bidding中extract_state为2的数据生成索引
+func biddingPurchaingTask(q map[string]interface{}) {
+	defer util.Catch()
+	//线程池
+	SaveUpdageLock := sync.Mutex{}
+	//连接参数
+	c, _ := bidding["collect"].(string)   //bidding表
+	db, _ := bidding["db"].(string)       //库
+	index, _ := bidding["index"].(string) //索引别名
+	itype, _ := bidding["type"].(string)
+	//
+	session := mgo.GetMgoConn(86400)
+	count, _ := session.DB(db).C(c).Find(&q).Count()
+	log.Println("biddingPurchaingTask:	", db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
+
+	query := session.DB(db).C(c).Find(q).Select(bson.M{
+		"projectinfo.attachment": 0,
+		"contenthtml":            0,
+	}).Iter()
+	arrEs := make([]map[string]interface{}, savesizei)
+	arrMgo := [][]map[string]interface{}{}
+	var n int
+	i := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
+		n++
+		if util.IntAll(tmp["extracttype"]) == -1 {
+			continue
+		}
+		newTmp := map[string]interface{}{} //最终生索引的数据
+		//oss拼装filetext
+		filetext := getFileText(tmp)
+		newTmp["filetext"] = filetext
+		//purchasing
+		newTmp["purchasing"] = tmp["purchasing"]
+		//purchasinglist
+		newTmp["purchasinglist"] = tmp["purchasinglist"]
+		for _, v := range biddingIndexFields { //索引字段
+			if tmp[v] != nil {
+				if "projectinfo" == v {
+					mp, _ := tmp[v].(map[string]interface{})
+					if mp != nil {
+						newmap := map[string]interface{}{}
+						for _, v1 := range projectinfoFields {
+							if mp[v1] != nil {
+								newmap[v1] = mp[v1]
+							}
+						}
+						newTmp[v] = newmap
+						attachments := mp["attachments"]
+						con := ""
+						if attachments != nil {
+							am, _ := attachments.(map[string]interface{})
+							if am != nil {
+								for _, v1 := range am {
+									vm, _ := v1.(map[string]interface{})
+									if vm != nil {
+										c, _ := vm["content"].(string)
+										con += c
+									}
+								}
+							}
+						}
+						con = FilterDetailSpace(con)
+						if con != "" {
+							newTmp["attachments"] = con
+						}
+					}
+				} else {
+					if v == "detail" {
+						detail, _ := tmp[v].(string)
+						newTmp[v] = FilterDetail(detail)
+					} else {
+						newTmp[v] = tmp[v]
+					}
+				}
+			}
+		}
+		arrEs = append(arrEs, newTmp)
+
+		SaveUpdageLock.Lock()
+		arrMgo = append(arrMgo, []map[string]interface{}{ //要更新数据
+			map[string]interface{}{
+				"_id": tmp["_id"],
+			},
+			map[string]interface{}{
+				"$set": map[string]interface{}{
+					"extract_state": 4,
+				},
+			},
+		})
+		//批量更新
+		if len(arrMgo) >= savesizei-1 {
+			mgo.UpdateBulkAll(db, c, arrMgo...)
+			arrMgo = [][]map[string]interface{}{}
+		}
+		//生索引
+		if len(arrEs) >= savesizei-1 {
+			tmps := arrEs
+			elastic.BulkSave(index, itype, &tmps, true)
+			arrEs = []map[string]interface{}{}
+		}
+		SaveUpdageLock.Unlock()
+		//计数
+		if n%savesizei == 0 {
+			log.Println("当前:", n)
+		}
+		tmp = make(map[string]interface{})
+	}
+
+	SaveUpdageLock.Lock()
+	if len(arrMgo) > 0 {
+		mgo.UpdateBulkAll(db, c, arrMgo...)
+	}
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		elastic.BulkSave(index, itype, &tmps, true)
+	}
+	SaveUpdageLock.Unlock()
+	log.Println("create filetext index...over", n)
+}
+
+func getFileText(tmp map[string]interface{}) (filetext string) {
+	if attchMap, ok := tmp["attach_text"].(map[string]interface{}); attchMap != nil && ok {
+		for _, tmpData1 := range attchMap {
+			if tmpData2, ok := tmpData1.(map[string]interface{}); tmpData2 != nil && ok {
+				for _, result := range tmpData2 {
+					if resultMap, ok := result.(map[string]interface{}); resultMap != nil && ok {
+						if attach_url := util.ObjToString(resultMap["attach_url"]); attach_url != "" {
+							bs := u.OssGetObject(attach_url) //oss读数据
+							if utf8.RuneCountInString(filetext+bs) < util.IntAllDef(Sysconfig["filelength"], 100000) {
+								filetext += bs + "\n"
+							} else {
+								break
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+	return
+}

+ 16 - 14
udpcreateindex/src/config.json

@@ -2,9 +2,9 @@
     "udpport": ":1483",
     "msg_server": "10.171.112.160:7070",
 	"savedb": {
-        "addr": "172.17.4.187:27083",
+        "addr": "192.168.3.207:27092",
         "size": 10,
-        "db": "qfw"
+        "db": "mxs"
     },
     "jkmail": {
         "to":"zhangjinkun@topnet.net.cn",
@@ -29,20 +29,22 @@
         "type": "bidding"
     },
     "bidding": {
-        "db": "qfw",
-        "collect": "bidding_back",
-        "index": "bidding_v1",
+        "db": "mxs",
+        "collect": "bidding",
+        "index": "bidding_v2",
         "type": "bidding",
-        "extractdb": "qfw",
-        "extractcollect": "result_20200116",
-        "indexfields":[ "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
+        "extractdb": "mxs",
+        "extractcollect": "extract",
+        "indexfields":[ 
+        "buyerzipcode","winnertel","winnerperson","contractcode","winneraddr","agencyaddr","buyeraddr","signaturedate","projectperiod","projectaddr","agencytel","agencyperson","buyerperson","agency","projectscope","projectcode","bidopentime","supervisorrate","buyertel","bidamount","winner","buyer","budget","projectname","bidstatus","buyerclass","topscopeclass","s_subscopeclass","area","city","district","s_winner","_id","title","detail","site","comeintime","href","infoformat","publishtime","s_sha","spidercode","subtype","toptype","projectinfo"
         ],
         "fields": "buyerzipcode,winnertel,winnerperson,contractcode,winneraddr,agencyaddr,buyeraddr,signaturedate,projectperiod,projectaddr,agencytel,agencyperson,buyerperson,agency,projectscope,projectcode,bidopentime,supervisorrate,buyertel,bidamount,winner,buyer,budget,projectname,buyerclass,topscopeclass,area,city,district,s_winner",
         "projectinfo": "approvecode,approvecontent,approvestatus,approvetime,industry",
         "multiIndex": ""
     },
+    "filelength": 100000,
     "project": {
-		"addr": "172.17.4.189:27082",
+		"addr": "192.168.3.207:27092",
         "size": 2,
         "db": "extract_kf",
         "collect": "huawei_project",
@@ -50,14 +52,14 @@
         "type": "projectset"
     },
     "project2": {
-
+		"addr": "192.168.3.207:27092",
         "db": "extract_kf",
         "collect": "huawei_project",
         "index": "project_v2",
         "type": "project"
     },
     "standard": {
- 		"addr": "172.17.145.163:27082",
+ 		"addr": "192.168.3.207:27092",
         "size": 10,
         "db": "qfw",
     	"winnerent":{
@@ -77,12 +79,12 @@
 		}
     },
     "mongodb": {
-        "addr": "10.172.242.243:27080,10.30.94.175:27081,10.81.232.246:27082",
+        "addr": "192.168.3.207:27092",
         "pool": 10,
-        "db": "qfw"
+        "db": "mxs"
     },
     "elastic": {
-        "addr": "http://172.17.145.170:9800",
+        "addr": "http://192.168.3.128:9800",
         "pool": 12
     }
 }

+ 8 - 2
udpcreateindex/src/main.go

@@ -10,6 +10,8 @@ import (
 	"qfw/util/mongodb"
 	"strings"
 	"time"
+
+	u "./util"
 )
 
 var (
@@ -31,8 +33,8 @@ var (
 
 func init() {
 	util.ReadConfig(&Sysconfig)
-	inits()
-	go checkMapJob()
+	//inits()
+	//go checkMapJob()
 	updport, _ = Sysconfig["updport"].(string)
 	winner, _ = Sysconfig["winner"].(map[string]interface{})
 	standard, _ = Sysconfig["standard"].(map[string]interface{})
@@ -95,9 +97,13 @@ func init() {
 		}
 	}
 	log.Println(projectinfoFields)
+	//初始化oss
+	u.InitOss()
 }
 
 func main() {
+	//go task_projects()
+	//go task_biddingfile()
 	go task_index()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}

+ 4 - 4
udpcreateindex/src/task.go

@@ -11,7 +11,8 @@ import (
 
 func task_index() {
 	c := cron.New()
-	c.AddFunc("20 30 5 * * *", func() { task_projects() })
+	//c.AddFunc("20 30 5 * * *", func() { task_projects() })
+	c.AddFunc("0 */1 * * * *", func() { task_biddingfile() })
 	c.Start()
 }
 
@@ -20,11 +21,10 @@ func task_biddingfile() {
 	defer qutil.Catch()
 	q := map[string]interface{}{
 		"extract_state": map[string]interface{}{
-			"$eq": 2,
+			"$eq": 3,
 		},
 	}
-	log.Println(q)
-	//待续
+	biddingPurchaingTask(q)
 }
 
 //project2项目索引

+ 52 - 0
udpcreateindex/src/util/ossclient.go

@@ -0,0 +1,52 @@
+// ossclient
+package util
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"qfw/util"
+
+	"github.com/aliyun/aliyun-oss-go-sdk/oss"
+)
+
+var (
+	ossEndpoint        = "http://oss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com
+	ossAccessKeyId     = "LTAI4FvLSWN3Wz9F6dUxQGMR"
+	ossAccessKeySecret = "WnQpnNVEiRfZsz5hIqFSr0phayMo3U"
+	ossBucketName      = "topjy"
+	ossclient          *oss.Client
+)
+
+func InitOss() {
+	client, err := oss.New(ossEndpoint, ossAccessKeyId, ossAccessKeySecret)
+	if err != nil {
+		fmt.Println("Error:", err)
+		os.Exit(-1)
+	}
+	ossclient = client
+}
+
+func OssGetObject(objectName string) string {
+	util.Catch()
+	// 获取存储空间。
+	bucket, err := ossclient.Bucket(ossBucketName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+
+	// 下载文件到流。
+	body, err := bucket.GetObject(objectName)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	defer body.Close()
+	data, err := ioutil.ReadAll(body)
+	if err != nil {
+		fmt.Println("Error:", err)
+		return ""
+	}
+	return string(data)
+}