Răsfoiți Sursa

Merge remote-tracking branch 'origin/master'

maxiaoshan 2 ani în urmă
părinte
comite
26c4dbf6ca
5 a modificat fișierele cu 268 adăugiri și 130 ștergeri
  1. 9 7
      bidding_listen/config.go
  2. 16 8
      bidding_listen/config.toml
  3. 58 21
      bidding_listen/init.go
  4. 157 94
      bidding_listen/main.go
  5. 28 0
      bidding_listen/utils.go

+ 9 - 7
bidding_listen/config.go

@@ -1,11 +1,12 @@
 package main
 
 type GlobalConf struct {
-	Mongob MgoConf
-	Cron   CronConf
-	EsP    EspConf
-	Mysql  MysqlConf
-	Log    Log
+	Mongob      MgoConf
+	Cron        CronConf
+	Mongop      MgoConf
+	Mysql       MysqlConf
+	Log         Log
+	Mongospider MgoConf
 }
 
 type MgoConf struct {
@@ -19,8 +20,9 @@ type MgoConf struct {
 
 //CronConf 定时任务
 type CronConf struct {
-	Spec string
-	Day  int
+	Spec  string
+	Start int
+	End   int
 }
 
 type EspConf struct {

+ 16 - 8
bidding_listen/config.toml

@@ -7,19 +7,27 @@
     password = ""
 
 
-[esp]
-    url = "http://127.0.0.1:19805"
-#    url = "http://172.17.4.184:19805"
-    username = "es_all"
-    password = "TopJkO2E_d1x"
-    index="projectset"
+[mongop] ## 项目信息
+    host = "127.0.0.1:27017"
+    #    host = "192.168.3.207:29099"
+    db = "wcc"
+    coll = "bidding"
+    username = ""
+    password = ""
 
+[mongospider] ## 竞品网站数据库
+    host = "172.17.4.87:27080"
+    db = "spider"
+    coll = "spider_compete"
+    username = ""
+    password = ""
 
 
 [cron] ## 定时任务
 #    spec = "0 */1 * * * *"  ## 5分钟执行一次
-    spec = "0 00 20 * * *"  ## 每天20点执行
-    day = -1                ## 表示最近一天的数据
+    spec = "0 00 20 * * *"   ## 每天20点执行
+    start = -1               ## 表示开始时间昨天凌晨
+    end = 0                  ## 表示截止时间到今天凌晨
 
 
 [mysql]

+ 58 - 21
bidding_listen/init.go

@@ -6,7 +6,6 @@ import (
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/mysqldb"
 	"fmt"
-	es "github.com/olivere/elastic/v7"
 	"github.com/spf13/viper"
 	"go.uber.org/zap"
 	"os"
@@ -43,8 +42,9 @@ func init() {
 
 	InitLog()
 	InitMgo()
-	InitEs()
+	//InitEs()
 	InitMysql()
+	initSpider()
 	//
 
 }
@@ -80,25 +80,25 @@ func InitMysql() {
 	log.Info("InitMysql", zap.Any("duration", time.Since(now).Seconds()))
 }
 
-func InitEs() {
-	now := time.Now()
-	//Es = &elastic.Elastic{
-	//	//S_esurl: "http://127.0.0.1:19805",
-	//	S_esurl:  GF.EsP.URL,
-	//	I_size:   5,
-	//	Username: GF.EsP.Username,
-	//	Password: GF.EsP.Password,
-	//}
-	//Es.InitElasticSize()
-
-	EsClient, _ = es.NewClient(
-		es.SetURL(GF.EsP.URL),
-		es.SetBasicAuth(GF.EsP.Username, GF.EsP.Password),
-		es.SetSniff(false),
-	)
-
-	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
-}
+//func InitEs() {
+//	now := time.Now()
+//	//Es = &elastic.Elastic{
+//	//	//S_esurl: "http://127.0.0.1:19805",
+//	//	S_esurl:  GF.EsP.URL,
+//	//	I_size:   5,
+//	//	Username: GF.EsP.Username,
+//	//	Password: GF.EsP.Password,
+//	//}
+//	//Es.InitElasticSize()
+//
+//	EsClient, _ = es.NewClient(
+//		es.SetURL(GF.EsP.URL),
+//		es.SetBasicAuth(GF.EsP.Username, GF.EsP.Password),
+//		es.SetSniff(false),
+//	)
+//
+//	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))
+//}
 
 func InitMgo() {
 	now := time.Now()
@@ -110,5 +110,42 @@ func InitMgo() {
 		Password:    GF.Mongob.Password,
 	}
 	Mgo.InitPool()
+
+	MgoP = &mongodb.MongodbSim{
+		MongodbAddr: GF.Mongop.Host,
+		DbName:      GF.Mongop.DB,
+		Size:        10,
+		UserName:    GF.Mongop.Username,
+		Password:    GF.Mongop.Password,
+	}
+	MgoP.InitPool()
+
+	MgoSpider = &mongodb.MongodbSim{
+		MongodbAddr: GF.Mongospider.Host,
+		DbName:      GF.Mongospider.DB,
+		Size:        10,
+		UserName:    GF.Mongospider.Username,
+		Password:    GF.Mongospider.Password,
+	}
+	MgoSpider.InitPool()
+
 	log.Info("InitMgo", zap.Any("duration", time.Since(now).Seconds()))
 }
+
+//initSpider 初始化 竞品站点
+func initSpider() {
+	fmt.Println("开始初始化 -- 竞品站点")
+	sess := MgoSpider.GetMgoConn()
+	defer MgoSpider.DestoryMongoConn(sess)
+	query := sess.DB(GF.Mongospider.DB).C(GF.Mongospider.Coll).Find(nil).Iter()
+	count := 0
+
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if tmp["site"] != nil {
+			site := tmp["site"].(string)
+			SpiderSites = append(SpiderSites, site)
+		}
+	}
+	log.Info("initSpider", zap.Int("初始化竞品站点完毕,竞品网站数量为:", len(SpiderSites)))
+
+}

+ 157 - 94
bidding_listen/main.go

@@ -5,10 +5,7 @@ import (
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/mysqldb"
-	"context"
-	"encoding/json"
 	"fmt"
-	es "github.com/olivere/elastic/v7"
 	"github.com/robfig/cron/v3"
 	"go.uber.org/zap"
 	"sync"
@@ -16,9 +13,12 @@ import (
 )
 
 var (
-	Mgo      *mongodb.MongodbSim
-	Mysql    *mysqldb.Mysql
-	EsClient *es.Client
+	Mgo       *mongodb.MongodbSim
+	MgoP      *mongodb.MongodbSim
+	MgoSpider *mongodb.MongodbSim
+	Mysql     *mysqldb.Mysql
+	//EsClient    *es.Client
+	SpiderSites = make([]string, 0) //排除,竞品站点
 )
 
 func main() {
@@ -47,10 +47,14 @@ func specData() {
 func dealBidding() {
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
+	// 指定对应的时间格式
+	//layout := "2006-01-02 15:04:05"
 	// 获取当前时间
 	now := time.Now()
-	targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Day, 0, 0, 0, 0, now.Location())
-	todayTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+
+	//targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 04, 20, 0, 0, now.Location())
+	targetTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.Start, 0, 0, 0, 0, now.Location())
+	todayTime := time.Date(now.Year(), now.Month(), now.Day()+GF.Cron.End, 0, 0, 0, 0, now.Location())
 
 	q := map[string]interface{}{
 		"comeintime": map[string]interface{}{
@@ -58,6 +62,7 @@ func dealBidding() {
 			"$lte": todayTime.Unix(),
 		},
 	}
+	log.Info("dealBidding", zap.Any("q", q))
 	query := sess.DB(GF.Mongob.DB).C(GF.Mongob.Coll).Find(q).Select(map[string]interface{}{
 		"contenthtml": 0}).Iter()
 	count := 0
@@ -89,6 +94,9 @@ func dealBidding() {
 
 //saveBidding 保存bidding数据
 func saveBidding(tmp map[string]interface{}) {
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
 	//针对产权数据,暂时不入es 索引库
 	if utils.IntAll(tmp["infoformat"]) == 3 {
 		return
@@ -103,25 +111,25 @@ func saveBidding(tmp map[string]interface{}) {
 	insert["bidding_id"] = id
 	insert["spidercode"] = tmp["spidercode"]
 	insert["site"] = tmp["site"]
-	if comeintime, ok := tmp["comeintime"].(int64); ok {
-		timeString := time.Unix(comeintime, 0).Format("2006-01-02 15:04:05")
-		insert["comeintime"] = timeString
+	site := tmp["site"].(string)
+	//竞品网站数据,直接过滤
+	if IsInStringArray(site, SpiderSites) {
+		return
 	}
 
-	if publishtime, ok := tmp["publishtime"].(int64); ok {
-		timeString := time.Unix(publishtime, 0).Format("2006-01-02 15:04:05")
-		insert["publishtime"] = timeString
-	} else if publishtime, ok := tmp["publishtime"].(float64); ok {
-		timeString := time.Unix(int64(publishtime), 0).Format("2006-01-02 15:04:05")
-		insert["publishtime"] = timeString
+	if tmp["comeintime"] != nil {
+		time, _ := convertToTime(tmp["comeintime"])
+		insert["comeintime"] = time.Format("2006-01-02 15:04:05")
 	}
 
-	if pici, ok := tmp["pici"].(int64); ok {
-		timeString := time.Unix(pici, 0).Format("2006-01-02 15:04:05")
-		insert["es_bidding_pici"] = timeString
-	} else if pici, ok := tmp["pici"].(float64); ok {
-		timeString := time.Unix(int64(pici), 0).Format("2006-01-02 15:04:05")
-		insert["es_bidding_pici"] = timeString
+	if tmp["publishtime"] != nil {
+		time, _ := convertToTime(tmp["publishtime"])
+		insert["publishtime"] = time.Format("2006-01-02 15:04:05")
+	}
+
+	if tmp["pici"] != nil {
+		time, _ := convertToTime(tmp["pici"])
+		insert["es_bidding_pici"] = time.Format("2006-01-02 15:04:05")
 	}
 
 	if utils.IntAll(tmp["extracttype"]) == -1 || utils.IntAll(tmp["dataprocess"]) == 7 {
@@ -187,50 +195,72 @@ func saveBidding(tmp map[string]interface{}) {
 		insert["district_code"] = ""
 	}
 
-	esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", id))
-	// 执行查询
-	searchResult, err := EsClient.Search().
-		Index(GF.EsP.Index).
-		Query(esquery).
-		Do(context.Background())
+	insert["project_id"] = ""
 
-	if err != nil {
-		log.Info("saveBidding", zap.Any("EsClient.Search.Error", err))
+	//查询MongoDB project 信息
+	wherep := map[string]interface{}{
+		"ids": id,
 	}
-
-	if searchResult.Hits.TotalHits.Value > 0 {
-		// 处理查询结果
-		// 处理查询结果并转换为map
-		for _, hit := range searchResult.Hits.Hits {
-			//fmt.Printf("Found document with id %s\n", hit.Id)
-			// 处理你的文档数据...
-			result := make(map[string]interface{})
-			err = json.Unmarshal(hit.Source, &result)
-			if err != nil {
-				log.Info("dealBidding", zap.Any("Unmarshal err", err))
-			}
-			if len(result) > 0 {
-				insert["project_id"] = result["id"]
-				if project_pici, ok := result["createtime"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					insert["project_pici"] = timeString
-				}
-				if project_pici, ok := result["pici"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					insert["es_project_pici"] = timeString
-				}
-			}
+	count := 0
+	query := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(wherep).Limit(1).Iter()
+	for p := make(map[string]interface{}); query.Next(p); count++ {
+		projectId := mongodb.BsonIdToSId(p["_id"])
+		insert["project_id"] = projectId
+		if p["pici"] != nil {
+			time, _ := convertToTime(p["pici"])
+			insert["project_pici"] = time.Format("2006-01-02 15:04:05")
+			insert["es_project_pici"] = time.Format("2006-01-02 15:04:05")
 		}
-	} else {
-		insert["project_id"] = ""
 	}
 
+	//
+	//esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", id))
+	//// 执行查询
+	//searchResult, err := EsClient.Search().
+	//	Index(GF.EsP.Index).
+	//	Query(esquery).
+	//	Source(es.NewFetchSourceContext(true).Include("id", "pici")).
+	//	Do(context.Background())
+	//
+	//if err != nil {
+	//	log.Info("saveBidding", zap.Any("EsClient.Search.Error", err))
+	//}
+	//
+	//if searchResult.Hits.TotalHits.Value > 0 {
+	//	// 处理查询结果
+	//	// 处理查询结果并转换为map
+	//	for _, hit := range searchResult.Hits.Hits {
+	//		//fmt.Printf("Found document with id %s\n", hit.Id)
+	//		// 处理你的文档数据...
+	//		result := make(map[string]interface{})
+	//		err = json.Unmarshal(hit.Source, &result)
+	//		if err != nil {
+	//			log.Info("dealBidding", zap.Any("Unmarshal err", err))
+	//		}
+	//		if len(result) > 0 {
+	//			insert["project_id"] = result["id"]
+	//
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				insert["project_pici"] = timeString
+	//			}
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				insert["es_project_pici"] = timeString
+	//			}
+	//		}
+	//	}
+	//} else {
+	//	insert["project_id"] = ""
+	//}
+
 	if len(insert) > 0 {
 		insertId := Mysql.Insert(GF.Mysql.Table, insert)
 		if insertId <= 0 {
 			log.Info("saveBidding", zap.Any("insertId", insertId), zap.Any("insert", insert))
 		}
 	}
+
 }
 
 func dealProject() {
@@ -298,47 +328,80 @@ func dealProject() {
 }
 
 func updateProject(tmp map[string]interface{}) {
-	esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", tmp["bidding_id"]))
-	// 执行查询
-	searchResult, err := EsClient.Search().
-		Index(GF.EsP.Index).
-		Query(esquery).
-		Do(context.Background())
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
+
+	update := map[string]interface{}{}
+	id := mongodb.BsonIdToSId(tmp["bidding_id"])
+	//更新MySQL where
+	where := map[string]interface{}{
+		"id": tmp["id"],
+	}
 
-	if err != nil {
-		log.Info("updateProject", zap.Any("EsClient.Search.Error", err))
+	//查询MongoDB project 信息
+	wherep := map[string]interface{}{
+		"ids": id,
+	}
+	count := 0
+	query := sess.DB(GF.Mongop.DB).C(GF.Mongop.Coll).Find(wherep).Limit(1).Iter()
+	for p := make(map[string]interface{}); query.Next(p); count++ {
+		projectId := mongodb.BsonIdToSId(p["_id"])
+		update["project_id"] = projectId
+		if p["pici"] != nil {
+			time, _ := convertToTime(p["pici"])
+			update["project_pici"] = time.Format("2006-01-02 15:04:05")
+			update["es_project_pici"] = time.Format("2006-01-02 15:04:05")
+		}
 	}
 
-	if searchResult.Hits.TotalHits.Value > 0 {
-		// 处理查询结果
-		// 处理查询结果并转换为map
-		for _, hit := range searchResult.Hits.Hits {
-			//fmt.Printf("Found document with id %s\n", hit.Id)
-			// 处理你的文档数据...
-			result := make(map[string]interface{})
-			err = json.Unmarshal(hit.Source, &result)
-			if err != nil {
-				log.Info("dealBidding", zap.Any("Unmarshal err", err))
-			}
-			if len(result) > 0 {
-				update := map[string]interface{}{
-					"project_id": result["id"],
-				}
-				log.Info("updateProject", zap.Any("bidding_id", tmp["bidding_id"]))
-				if project_pici, ok := result["pici"].(float64); ok {
-					timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
-					update["project_pici"] = timeString
-					update["es_project_pici"] = timeString
-				}
-
-				where := map[string]interface{}{
-					"id": tmp["id"],
-				}
-				res := Mysql.Update(GF.Mysql.Table, where, update)
-				if !res {
-					log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
-				}
-			}
+	if len(update) > 0 {
+		res := Mysql.Update(GF.Mysql.Table, where, update)
+		if !res {
+			log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
 		}
 	}
+
+	//esquery := es.NewBoolQuery().Filter(es.NewTermQuery("ids", tmp["bidding_id"]))
+	//// 执行查询
+	//searchResult, err := EsClient.Search().
+	//	Index(GF.EsP.Index).
+	//	Query(esquery).
+	//	Do(context.Background())
+	//
+	//if err != nil {
+	//	log.Info("updateProject", zap.Any("EsClient.Search.Error", err))
+	//}
+	//
+	//if searchResult.Hits.TotalHits.Value > 0 {
+	//	// 处理查询结果
+	//	// 处理查询结果并转换为map
+	//	for _, hit := range searchResult.Hits.Hits {
+	//		//fmt.Printf("Found document with id %s\n", hit.Id)
+	//		// 处理你的文档数据...
+	//		result := make(map[string]interface{})
+	//		err = json.Unmarshal(hit.Source, &result)
+	//		if err != nil {
+	//			log.Info("dealBidding", zap.Any("Unmarshal err", err))
+	//		}
+	//		if len(result) > 0 {
+	//			update := map[string]interface{}{
+	//				"project_id": result["id"],
+	//			}
+	//			log.Info("updateProject", zap.Any("bidding_id", tmp["bidding_id"]))
+	//			if project_pici, ok := result["pici"].(float64); ok {
+	//				timeString := time.Unix(int64(project_pici), 0).Format("2006-01-02 15:04:05")
+	//				update["project_pici"] = timeString
+	//				update["es_project_pici"] = timeString
+	//			}
+	//
+	//			where := map[string]interface{}{
+	//				"id": tmp["id"],
+	//			}
+	//			res := Mysql.Update(GF.Mysql.Table, where, update)
+	//			if !res {
+	//				log.Info("updateProject", zap.Any("update", update), zap.Any("where", where))
+	//			}
+	//		}
+	//	}
+	//}
 }

+ 28 - 0
bidding_listen/utils.go

@@ -0,0 +1,28 @@
+package main
+
+import (
+	"fmt"
+	"sort"
+	"time"
+)
+
+func convertToTime(value interface{}) (time.Time, error) {
+	switch v := value.(type) {
+	case int64:
+		return time.Unix(v, 0), nil
+	case float64:
+		return time.Unix(int64(v), 0), nil
+	default:
+		return time.Time{}, fmt.Errorf("unsupported type: %T", v)
+	}
+}
+
+//IsInStringArray 判断数组中是否存在字符串
+func IsInStringArray(str string, arr []string) bool {
+	// 先对字符串数组进行排序
+	sort.Strings(arr)
+	// 使用二分查找算法查找字符串
+	pos := sort.SearchStrings(arr, str)
+	// 如果找到了则返回 true,否则返回 false
+	return pos < len(arr) && arr[pos] == str
+}