Ver Fonte

Merge branch 'dev2.1' into dev3.0

* dev2.1:
  更新 高质量库
  更新 高质量库 id
  更新 高质量库 _id 错误
  添加同步 高质量库 过滤字段
Jianghan há 1 ano atrás
pai
commit
4dbfcf2e9e
4 ficheiros alterados com 69 adições e 31 exclusões
  1. 7 6
      highMark/config.json
  2. 60 25
      highMark/main.go
  3. 1 0
      src/github.com/josharian/intern
  4. 1 0
      src/github.com/mailru/easyjson

+ 7 - 6
highMark/config.json

@@ -1,6 +1,6 @@
 {
   "bidding": {
-    "addr": "127.0.0.1:27082",
+    "addr": "172.17.4.86:27080",
     "db": "jyqykhfw",
     "coll": "f_task",
     "size": 10,
@@ -8,12 +8,13 @@
     "password": ""
   },
   "bidding_high": {
-    "addr": "192.168.3.166:27082",
+    "addr": "172.17.189.140:27080",
     "db": "qfw_high",
-    "coll": "wcc_high_bidding",
+    "coll": "bidding",
     "size": 10,
-    "username": "",
-    "password": ""
+    "username": "SJZY_RWbid_ES",
+    "password": "SJZY@B4i4D5e6S"
   },
-  "spec": "0 */1 * * * *"
+  "spec": "0 00 01 * * *" ,
+  "no_fields": "matchkey,matchkey,field_source,purchasinglist_alltag,info,jytest_href,matchkey"
 }

+ 60 - 25
highMark/main.go

@@ -1,16 +1,19 @@
-package highMark
+package main
 
 import (
 	"fmt"
+	"github.com/cron"
 	"mongodb"
 	"os"
 	"qfw/util"
+	"strings"
 	"time"
 )
 
 var (
-	Mgo, MgoH                             *mongodb.MongodbSim
+	Mgo, MgoH, MgoB                       *mongodb.MongodbSim
 	Sysconfig, bidddingConf, biddingHConf map[string]interface{}
+	noFields                              string
 )
 
 func Init() {
@@ -41,23 +44,34 @@ func Init() {
 
 	MgoH.InitPool()
 
+	//bidding
+	MgoB = &mongodb.MongodbSim{
+		MongodbAddr: biddingHConf["addr"].(string),
+		Size:        util.IntAllDef(biddingHConf["size"], 5),
+		DbName:      "qfw",
+		UserName:    biddingHConf["username"].(string),
+		Password:    biddingHConf["password"].(string),
+		//Direct:      true,
+	}
+
+	MgoB.InitPool()
+	noFields = util.ObjToString(Sysconfig["no_fields"])
 }
 
-//func main() {
-//	Init()
-//	c := cron.New()
-//	err := c.AddFunc(Sysconfig["spec"].(string), Mark)
-//	if err != nil {
-//		util.Debug("err", err)
-//	}
-//
-//	c.Start()
-//	defer c.Stop()
-//
-//	select {}
-//
-//	//highMark()
-//}
+func main() {
+	Init()
+	c := cron.New()
+	err := c.AddFunc(Sysconfig["spec"].(string), Mark)
+	if err != nil {
+		util.Debug("err", err)
+	}
+
+	c.Start()
+	defer c.Stop()
+
+	select {}
+
+}
 
 func Mark() {
 	go highMark()
@@ -86,7 +100,7 @@ func highMark() {
 	util.Debug("本次处理任务总数:", len(*tasks))
 
 	for _, task := range *tasks {
-		util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"])
+		util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"], task["s_sourceinfo"])
 		taskID := mongodb.BsonIdToSId(task["_id"])
 		//任务对应的数据表
 		s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
@@ -98,6 +112,7 @@ func highMark() {
 
 		query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter()
 		count := 0
+		taskFinish := false
 		for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 			infoID := mongodb.BsonIdToSId(tmp["_id"])
 			if count%1000 == 0 {
@@ -115,20 +130,40 @@ func highMark() {
 			res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
 
 			if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
-				data["_id"] = tmp["_id"]
+				delete(data, "_id")
+				where := make(map[string]interface{})
+				if _, ok := data["id"]; ok {
+					bidd, _ := MgoB.FindById("bidding", util.ObjToString(data["id"]), nil)
+					if len(*bidd) > 0 {
+						where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
+					}
+				} else {
+					bidd, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(markedData["_id"]), nil)
+					if len(*bidd) > 0 {
+						where["_id"] = mongodb.StringTOBsonId(util.ObjToString(data["id"]))
+					} else {
+						continue
+					}
+				}
 				data["field_bitvalue"] = res
 				data["i_comeintime"] = time.Now().Unix()
 				data["i_updatetime"] = time.Now().Unix()
+				//删除多余无用字段
+				noField := strings.Split(noFields, ",")
+				if len(noField) > 0 {
+					for _, field := range noField {
+						delete(data, field)
+					}
+				}
 
 				update := make(map[string]interface{})
 				update["$set"] = data
-				where := map[string]interface{}{
-					"_id": tmp["_id"],
-				}
 
 				if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
-					util.Debug("任务 ", task["s_groupname"], infoID, "入库错误,请检查")
+					taskFinish = false
+					util.Debug("任务 ", task["s_groupname"], task["s_sourceinfo"], infoID, "入库错误,请检查")
 				} else {
+					taskFinish = true
 					//1、更新数据源信息
 					setResult := map[string]interface{}{ //更新字段集
 						"is_return_highdata":  1,
@@ -144,7 +179,7 @@ func highMark() {
 		}
 		util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
 
-		if count > 0 {
+		if count > 0 && taskFinish {
 			//当前任务结束
 			//3.更新任务表,
 			taskSetResult := map[string]interface{}{ //更新字段集
@@ -174,7 +209,7 @@ func highMark() {
 	util.Debug("所有任务处理完毕")
 }
 
-//calculateFlag 根据数据,返回被标注的字段数字
+// calculateFlag 根据数据,返回被标注的字段数字
 func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
 	var result uint64
 	for _, item := range data {

+ 1 - 0
src/github.com/josharian/intern

@@ -0,0 +1 @@
+Subproject commit 42b52b674af5a7cef9ce7e197bc21c78394ae65d

+ 1 - 0
src/github.com/mailru/easyjson

@@ -0,0 +1 @@
+Subproject commit a209843d8ea9db383ce1a1976e43c8e95f831c61