瀏覽代碼

更新获取项目信息方式

wcc 2 年之前
父節點
當前提交
e4ce27e1d4
共有 5 個文件被更改,包括 208 次插入122 次删除
  1. 4 3
      bidding_listen/config.go
  2. 10 8
      bidding_listen/config.toml
  3. 30 21
      bidding_listen/init.go
  4. 147 90
      bidding_listen/main.go
  5. 17 0
      bidding_listen/utils.go

+ 4 - 3
bidding_listen/config.go

@@ -3,7 +3,7 @@ package main
 type GlobalConf struct {
 	Mongob MgoConf
 	Cron   CronConf
-	EsP    EspConf
+	Mongop MgoConf
 	Mysql  MysqlConf
 	Log    Log
 }
@@ -19,8 +19,9 @@ type MgoConf struct {
 
 //CronConf 定时任务
 type CronConf struct {
-	Spec string
-	Day  int
+	Spec  string
+	Start int
+	End   int
 }
 
 type EspConf struct {

+ 10 - 8
bidding_listen/config.toml

@@ -7,19 +7,21 @@
     password = ""
 
 
-[esp]
-    url = "http://127.0.0.1:19805"
-#    url = "http://172.17.4.184:19805"
-    username = "es_all"
-    password = "TopJkO2E_d1x"
-    index="projectset"
+[mongop] ## 项目信息
+    host = "127.0.0.1:27017"
+    #    host = "192.168.3.207:29099"
+    db = "wcc"
+    coll = "bidding"
+    username = ""
+    password = ""
 
 
 
 [cron] ## 定时任务
 #    spec = "0 */1 * * * *"  ## 5分钟执行一次
-    spec = "0 00 20 * * *"  ## 每天20点执行
-    day = -1                ## 表示最近一天的数据
+    spec = "0 00 20 * * *"   ## 每天20点执行
+    start = -1               ## 表示开始时间昨天凌晨
+    end = 0                  ## 表示截止时间到今天凌晨
 
 
 [mysql]

+ 30 - 21
bidding_listen/init.go

@@ -6,7 +6,6 @@ import (
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/mysqldb"
 	"fmt"
-	es "github.com/olivere/elastic/v7"
 	"github.com/spf13/viper"
 	"go.uber.org/zap"
 	"os"
@@ -43,7 +42,7 @@ func init() {
 
 	InitLog()
 	InitMgo()
-	InitEs()
+	//InitEs()
 	InitMysql()
 	//
 
@@ -80,25 +79,25 @@ func InitMysql() {
 	log.Info("InitMysql", zap.Any("duration", time.Since(now).Seconds()))
 }
 
-func InitEs() {
-	now := time.Now()
-	//Es = &elastic.Elastic{
-	//	//S_esurl: "http://127.0.0.1:19805",
-	//	S_esurl:  GF.EsP.URL,
-	//	I_size:   5,
-	//	Username: GF.EsP.Username,
-	//	Password: GF.EsP.Password,
-	//}
-	//Es.InitElasticSize()
-
-	EsClient, _ = es.NewClient(
-		es.SetURL(GF.EsP.URL),
-		es.SetBasicAuth(GF.EsP.Username, GF.EsP.Password),
-		es.SetSniff(false),
-	)
-
-	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
-}
+//func InitEs() {
+//	now := time.Now()
+//	//Es = &elastic.Elastic{
+//	//	//S_esurl: "http://127.0.0.1:19805",
+//	//	S_esurl:  GF.EsP.URL,
+//	//	I_size:   5,
+//	//	Username: GF.EsP.Username,
+//	//	Password: GF.EsP.Password,
+//	//}
+//	//Es.InitElasticSize()
+//
+//	EsClient, _ = es.NewClient(
+//		es.SetURL(GF.EsP.URL),
+//		es.SetBasicAuth(GF.EsP.Username, GF.EsP.Password),
+//		es.SetSniff(false),
+//	)
+//
+//	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
+//}
 
 func InitMgo() {
 	now := time.Now()
@@ -110,5 +109,15 @@ func InitMgo() {
 		Password:    GF.Mongob.Password,
 	}
 	Mgo.InitPool()
+
+	MgoP = &mongodb.MongodbSim{
+		MongodbAddr: GF.Mongop.Host,
+		DbName:      GF.Mongop.DB,
+		Size:        10,
+		UserName:    GF.Mongop.Username,
+		Password:    GF.Mongop.Password,
+	}
+	MgoP.InitPool()
+
 	log.Info("InitMgo", zap.Any("duration", time.Since(now).Seconds()))
 }

+ 147 - 90
bidding_listen/main.go

@@ -5,8 +5,6 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/mysqldb"
-	"context"
-	"encoding/json"
 	"fmt"
 	es "github.com/olivere/elastic/v7"
 	"github.com/robfig/cron/v3"
@@ -17,6 +15,7 @@ import (
 
 var (
 	Mgo      *mongodb.MongodbSim
+	MgoP     *mongodb.MongodbSim
 	Mysql    *mysqldb.Mysql
 	EsClient *es.Client
 )
@@ -47,10 +46,14 @@ func specData() {
 func dealBidding() {
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
+	// 指定对应的时间格式
+	//layout := "2006-01-02 15:04:05"
 	// 获取当前时间
 	now := time.Now()
-	targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Day, 0, 0, 0, 0, now.Location())
-	todayTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+
+	//targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 04, 20, 0, 0, now.Location())
+	targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
+	todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
 
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
@@ -58,6 +61,7 @@ func dealBidding() {
 			"$lte": todayTime.Unix(),
 		},
 	}
+	log.Info("dealBidding", zap.Any("q", q))
 	query := sess.DB(GF.Mongob.DB).C(GF.Mongob.Coll).Find(q).Select(map[string]interface{}{
 		"contenthtml": 0}).Iter()
 	count := 0
@@ -89,6 +93,9 @@ func dealBidding() {
 
 //saveBidding 保存bidding数据
 func saveBidding(tmp map[string]interface{}) {
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
 	//针对产权数据,暂时不入es 索引库
 	if utils.IntAll(tmp["infoformat"]) == 3 {
 		return
@@ -103,25 +110,20 @@ func saveBidding(tmp map[string]interface{}) {
 	insert["bidding_id"] = id
 	insert["spidercode"] = tmp["spidercode"]
 	insert["site"] = tmp["site"]
-	if comeintime, ok := tmp["comeintime"].(int64); ok {
-		timeString := time.Unix(comeintime, 0).Format("2006-01-02 15:04:05")
-		insert["comeintime"] = timeString
+
+	if tmp["comeintime"] != nil {
+		time, _ := convertToTime(tmp["comeintime"])
+		insert["comeintime"] = time.Format("2006-01-02 15:04:05")
 	}
 
-	if publishtime, ok := tmp["publishtime"].(int64); ok {
-		timeString := time.Unix(publishtime, 0).Format("2006-01-02 15:04:05")
-		insert["publishtime"] = timeString
-	} else if publishtime, ok := tmp["publishtime"].(float64); ok {
-		timeString := time.Unix(int64(publishtime), 0).Format("2006-01-02 15:04:05")
-		insert["publishtime"] = timeString
+	if tmp["publishtime"] != nil {
+		time, _ := convertToTime(tmp["publishtime"])
+		insert["publishtime"] = time.Format("2006-01-02 15:04:05")
 	}
 
-	if pici, ok := tmp["pici"].(int64); ok {
-		timeString := time.Unix(pici, 0).Format("2006-01-02 15:04:05")
-		insert["es_bidding_pici"] = timeString
-	} else if pici, ok := tmp["pici"].(float64); ok {
-		timeString := time.Unix(int64(pici), 0).Format("2006-01-02 15:04:05")
-		insert["es_bidding_pici"] = timeString
+	if tmp["pici"] != nil {
+		time, _ := convertToTime(tmp["pici"])
+		insert["es_bidding_pici"] = time.Format("2006-01-02 15:04:05")
 	}
 
 	if utils.IntAll(tmp["extracttype"]) == -1 || utils.IntAll(tmp["dataprocess"]) == 7 {
@@ -187,50 +189,72 @@ func saveBidding(tmp map[string]interface{}) {
 		insert["district_code"] = ""
 	}
 
-	esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", id))
-	// 执行查询
-	searchResult, err := EsClient.Search().
-		Index(GF.EsP.Index).
-		Query(esquery).
-		Do(context.Background())
+	insert["project_id"] = ""
 
-	if err != nil {
-		log.Info("saveBidding", zap.Any("EsClient.Search.Error", err))
+	//查询MongoDB project 信息
+	wherep := map[string]interface{}{
+		"ids": id,
 	}
-
-	if searchResult.Hits.TotalHits.Value > 0 {
-		// 处理查询结果
-		// 处理查询结果并转换为map
-		for _, hit := range searchResult.Hits.Hits {
-			//fmt.Printf("Found document with id %s\n", hit.Id)
-			// 处理你的文档数据...
-			result := make(map[string]interface{})
-			err = json.Unmarshal(hit.Source, &result)
-			if err != nil {
-				log.Info("dealBidding", zap.Any("Unmarshal err", err))
-			}
-			if len(result) > 0 {
-				insert["project_id"] = result["id"]
-				if project_pici, ok := result["createtime"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					insert["project_pici"] = timeString
-				}
-				if project_pici, ok := result["pici"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					insert["es_project_pici"] = timeString
-				}
-			}
+	count := 0
+	query := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(wherep).Limit(1).Iter()
+	for p := make(map[string]interface{}); query.Next(p); count++ {
+		projectId := mongodb.BsonIdToSId(p["_id"])
+		insert["project_id"] = projectId
+		if p["pici"] != nil {
+			time, _ := convertToTime(p["pici"])
+			insert["project_pici"] = time.Format("2006-01-02 15:04:05")
+			insert["es_project_pici"] = time.Format("2006-01-02 15:04:05")
 		}
-	} else {
-		insert["project_id"] = ""
 	}
 
+	//
+	//esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", id))
+	//// 执行查询
+	//searchResult, err := EsClient.Search().
+	//	Index(GF.EsP.Index).
+	//	Query(esquery).
+	//	Source(es.NewFetchSourceContext(true).Include("id", "pici")).
+	//	Do(context.Background())
+	//
+	//if err != nil {
+	//	log.Info("saveBidding", zap.Any("EsClient.Search.Error", err))
+	//}
+	//
+	//if searchResult.Hits.TotalHits.Value > 0 {
+	//	// 处理查询结果
+	//	// 处理查询结果并转换为map
+	//	for _, hit := range searchResult.Hits.Hits {
+	//		//fmt.Printf("Found document with id %s\n", hit.Id)
+	//		// 处理你的文档数据...
+	//		result := make(map[string]interface{})
+	//		err = json.Unmarshal(hit.Source, &result)
+	//		if err != nil {
+	//			log.Info("dealBidding", zap.Any("Unmarshal err", err))
+	//		}
+	//		if len(result) > 0 {
+	//			insert["project_id"] = result["id"]
+	//
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				insert["project_pici"] = timeString
+	//			}
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				insert["es_project_pici"] = timeString
+	//			}
+	//		}
+	//	}
+	//} else {
+	//	insert["project_id"] = ""
+	//}
+
 	if len(insert) > 0 {
 		insertId := Mysql.Insert(GF.Mysql.Table, insert)
 		if insertId <= 0 {
 			log.Info("saveBidding", zap.Any("insertId", insertId), zap.Any("insert", insert))
 		}
 	}
+
 }
 
 func dealProject() {
@@ -298,47 +322,80 @@ func dealProject() {
 }
 
 func updateProject(tmp map[string]interface{}) {
-	esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", tmp["bidding_id"]))
-	// 执行查询
-	searchResult, err := EsClient.Search().
-		Index(GF.EsP.Index).
-		Query(esquery).
-		Do(context.Background())
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
+	update := map[string]interface{}{}
+	id := mongodb.BsonIdToSId(tmp["bidding_id"])
+	//更新MySQL where
+	where := map[string]interface{}{
+		"id": tmp["id"],
+	}
 
-	if err != nil {
-		log.Info("updateProject", zap.Any("EsClient.Search.Error", err))
+	//查询MongoDB project 信息
+	wherep := map[string]interface{}{
+		"ids": id,
+	}
+	count := 0
+	query := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(wherep).Limit(1).Iter()
+	for p := make(map[string]interface{}); query.Next(p); count++ {
+		projectId := mongodb.BsonIdToSId(p["_id"])
+		update["project_id"] = projectId
+		if p["pici"] != nil {
+			time, _ := convertToTime(p["pici"])
+			update["project_pici"] = time.Format("2006-01-02 15:04:05")
+			update["es_project_pici"] = time.Format("2006-01-02 15:04:05")
+		}
 	}
 
-	if searchResult.Hits.TotalHits.Value > 0 {
-		// 处理查询结果
-		// 处理查询结果并转换为map
-		for _, hit := range searchResult.Hits.Hits {
-			//fmt.Printf("Found document with id %s\n", hit.Id)
-			// 处理你的文档数据...
-			result := make(map[string]interface{})
-			err = json.Unmarshal(hit.Source, &result)
-			if err != nil {
-				log.Info("dealBidding", zap.Any("Unmarshal err", err))
-			}
-			if len(result) > 0 {
-				update := map[string]interface{}{
-					"project_id": result["id"],
-				}
-				log.Info("updateProject", zap.Any("bidding_id", tmp["bidding_id"]))
-				if project_pici, ok := result["pici"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					update["project_pici"] = timeString
-					update["es_project_pici"] = timeString
-				}
-
-				where := map[string]interface{}{
-					"id": tmp["id"],
-				}
-				res := Mysql.Update(GF.Mysql.Table, where, update)
-				if !res {
-					log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
-				}
-			}
+	if len(update) > 0 {
+		res := Mysql.Update(GF.Mysql.Table, where, update)
+		if !res {
+			log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
 		}
 	}
+
+	//esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", tmp["bidding_id"]))
+	//// 执行查询
+	//searchResult, err := EsClient.Search().
+	//	Index(GF.EsP.Index).
+	//	Query(esquery).
+	//	Do(context.Background())
+	//
+	//if err != nil {
+	//	log.Info("updateProject", zap.Any("EsClient.Search.Error", err))
+	//}
+	//
+	//if searchResult.Hits.TotalHits.Value > 0 {
+	//	// 处理查询结果
+	//	// 处理查询结果并转换为map
+	//	for _, hit := range searchResult.Hits.Hits {
+	//		//fmt.Printf("Found document with id %s\n", hit.Id)
+	//		// 处理你的文档数据...
+	//		result := make(map[string]interface{})
+	//		err = json.Unmarshal(hit.Source, &result)
+	//		if err != nil {
+	//			log.Info("dealBidding", zap.Any("Unmarshal err", err))
+	//		}
+	//		if len(result) > 0 {
+	//			update := map[string]interface{}{
+	//				"project_id": result["id"],
+	//			}
+	//			log.Info("updateProject", zap.Any("bidding_id", tmp["bidding_id"]))
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				update["project_pici"] = timeString
+	//				update["es_project_pici"] = timeString
+	//			}
+	//
+	//			where := map[string]interface{}{
+	//				"id": tmp["id"],
+	//			}
+	//			res := Mysql.Update(GF.Mysql.Table, where, update)
+	//			if !res {
+	//				log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
+	//			}
+	//		}
+	//	}
+	//}
 }

+ 17 - 0
bidding_listen/utils.go

@@ -0,0 +1,17 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+func convertToTime(value interface{}) (time.Time, error) {
+	switch v := value.(type) {
+	case int64:
+		return time.Unix(v, 0), nil
+	case float64:
+		return time.Unix(int64(v), 0), nil
+	default:
+		return time.Time{}, fmt.Errorf("unsupported type: %T", v)
+	}
+}