|
@@ -0,0 +1,307 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ util "app.yhyue.com/data_processing/common_utils"
|
|
|
+ "app.yhyue.com/data_processing/common_utils/log"
|
|
|
+ "app.yhyue.com/data_processing/common_utils/mongodb"
|
|
|
+ "app.yhyue.com/data_processing/common_utils/redis"
|
|
|
+ "data_fusion/config"
|
|
|
+ "fmt"
|
|
|
+ "go.uber.org/zap"
|
|
|
+ "reflect"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ config.Init("./common.toml")
|
|
|
+ InitLog()
|
|
|
+ InitMgo()
|
|
|
+ redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.Db)
|
|
|
+
|
|
|
+ initData()
|
|
|
+
|
|
|
+ updatePool = make(chan []map[string]interface{}, 5000)
|
|
|
+ updateSp = make(chan bool, 5)
|
|
|
+ recordPool = make(chan []map[string]interface{}, 5000)
|
|
|
+ recordSp = make(chan bool, 5)
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ go updateMethod()
|
|
|
+ go updateMethod1()
|
|
|
+
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ ch := make(chan bool, config.Conf.Serve.Thread)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+
|
|
|
+ q := map[string]interface{}{"_id": mongodb.StringTOBsonId("639751bb063a7b816e026aa1")}
|
|
|
+ it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding_fusion").Find(q).Select(nil).Iter()
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
|
|
|
+ if count%2000 == 0 {
|
|
|
+ log.Info("main", zap.Int("current:", count))
|
|
|
+ }
|
|
|
+ if repeat := util.IntAll(tmp["repeat"]); repeat != 1 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ repeatId := util.ObjToString(tmp["repeat_id"])
|
|
|
+ if str := redis.GetStr("fusion_id", repeatId); str != "" {
|
|
|
+ mid := strings.Split(str, "-")[0]
|
|
|
+ tmp1, _ := MgoB.FindById("bidding_fusion", mid, nil)
|
|
|
+ w, s := getWeight(tmp)
|
|
|
+ w1, s1 := getWeight(*tmp1)
|
|
|
+ util.Debug(w, s, w1, s1)
|
|
|
+ var update map[string]interface{}
|
|
|
+ if w > w1 {
|
|
|
+ update = mergeTmp(tmp, *tmp1)
|
|
|
+ //if len(update) > 0 {
|
|
|
+ // updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": tmp["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ record := make(map[string]interface{})
|
|
|
+ record["$set"] = map[string]interface{}{
|
|
|
+ "template_id": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ "template_weight": w,
|
|
|
+ }
|
|
|
+ update1 := util.DeepCopy(update).(map[string]interface{})
|
|
|
+ update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ update1["weight"] = w
|
|
|
+ if w == 0 {
|
|
|
+ update1["remark"] = s
|
|
|
+ }
|
|
|
+ record["$push"] = map[string]interface{}{
|
|
|
+ "ids": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ "record": update1,
|
|
|
+ }
|
|
|
+ recordPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.StringTOBsonId(repeatId)},
|
|
|
+ record,
|
|
|
+ }
|
|
|
+ redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), str))
|
|
|
+ } else {
|
|
|
+ update = mergeTmp(*tmp1, tmp)
|
|
|
+ //if len(update) > 0 {
|
|
|
+ // updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": (*tmp1)["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ record := make(map[string]interface{})
|
|
|
+ record["$set"] = map[string]interface{}{
|
|
|
+ "template_weight": w1,
|
|
|
+ }
|
|
|
+ update1 := util.DeepCopy(update).(map[string]interface{})
|
|
|
+ update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ update1["weight"] = w
|
|
|
+ if w == 0 {
|
|
|
+ update1["remark"] = s
|
|
|
+ }
|
|
|
+ record["$push"] = map[string]interface{}{
|
|
|
+ "ids": mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ "record": update1,
|
|
|
+ }
|
|
|
+ recordPool <- []map[string]interface{}{
|
|
|
+ {"_id": mongodb.StringTOBsonId(repeatId)},
|
|
|
+ record,
|
|
|
+ }
|
|
|
+ redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", str, mongodb.BsonIdToSId(tmp["_id"])))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ tmp1, _ := MgoB.FindById("bidding_fusion", repeatId, nil)
|
|
|
+ w, s := getWeight(tmp)
|
|
|
+ w1, s1 := getWeight(*tmp1)
|
|
|
+ var update map[string]interface{}
|
|
|
+ if w > w1 {
|
|
|
+ update = mergeTmp(tmp, *tmp1)
|
|
|
+ //if len(update) > 0 {
|
|
|
+ // updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": tmp["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ record := make(map[string]interface{})
|
|
|
+ record["_id"] = (*tmp1)["_id"]
|
|
|
+ record["template_id"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ record["template_weight"] = w
|
|
|
+ record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId(repeatId)}
|
|
|
+ var recordlist []map[string]interface{}
|
|
|
+ recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "weight": w})
|
|
|
+ update1 := util.DeepCopy(update).(map[string]interface{})
|
|
|
+ update1["infoid"] = mongodb.BsonIdToSId((*tmp1)["_id"])
|
|
|
+ update1["weight"] = w1
|
|
|
+ if w1 == 0 {
|
|
|
+ update1["remark"] = s1
|
|
|
+ }
|
|
|
+ recordlist = append(recordlist, update1)
|
|
|
+ record["record"] = recordlist
|
|
|
+ recordPool <- []map[string]interface{}{
|
|
|
+ {"_id": (*tmp1)["_id"]},
|
|
|
+ {"$set": record},
|
|
|
+ }
|
|
|
+ redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])))
|
|
|
+ } else {
|
|
|
+ update = mergeTmp(*tmp1, tmp)
|
|
|
+ //if len(update) > 0 {
|
|
|
+ // updatePool <- []map[string]interface{}{
|
|
|
+ // {"_id": (*tmp1)["_id"]},
|
|
|
+ // {"$set": update},
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ record := make(map[string]interface{})
|
|
|
+ record["_id"] = (*tmp1)["_id"]
|
|
|
+ record["template_id"] = mongodb.BsonIdToSId((*tmp1)["_id"])
|
|
|
+ record["template_weight"] = w1
|
|
|
+ record["ids"] = []string{mongodb.BsonIdToSId(tmp["_id"]), mongodb.BsonIdToSId((*tmp1)["_id"])}
|
|
|
+ var recordlist []map[string]interface{}
|
|
|
+ recordlist = append(recordlist, map[string]interface{}{"infoid": mongodb.BsonIdToSId((*tmp1)["_id"]), "weight": w1})
|
|
|
+ update1 := util.DeepCopy(update).(map[string]interface{})
|
|
|
+ update1["infoid"] = mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ update1["weight"] = w
|
|
|
+ if w == 0 {
|
|
|
+ update1["remark"] = s
|
|
|
+ }
|
|
|
+ recordlist = append(recordlist, update1)
|
|
|
+ record["record"] = recordlist
|
|
|
+ recordPool <- []map[string]interface{}{
|
|
|
+ {"_id": (*tmp1)["_id"]},
|
|
|
+ {"$set": record},
|
|
|
+ }
|
|
|
+ redis.PutCKV("fusion_id", repeatId, fmt.Sprintf("%s-%s", mongodb.BsonIdToSId((*tmp1)["_id"]), mongodb.BsonIdToSId(tmp["_id"])))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }(tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ c := make(chan bool, 1)
|
|
|
+ <-c
|
|
|
+}
|
|
|
+
|
|
|
+func getWeight(tmp map[string]interface{}) (int, string) {
|
|
|
+ var w int
|
|
|
+ if util.IntAll(tmp["publishtime"]) <= 0 {
|
|
|
+ return 0, "发布时间小于0"
|
|
|
+ }
|
|
|
+ if BinarySearch(CompeteSite, util.ObjToString(tmp["site"])) > -1 {
|
|
|
+ return 0, "竞品网站数据"
|
|
|
+ }
|
|
|
+ for k, v := range config.Conf.Serve.Weight {
|
|
|
+ if tmp[k] != nil {
|
|
|
+ util.Debug(k)
|
|
|
+ if reflect.TypeOf(tmp[k]).String() == "string" {
|
|
|
+ if util.ObjToString(tmp[k]) != "" {
|
|
|
+ w += v
|
|
|
+ }
|
|
|
+ } else if reflect.TypeOf(tmp[k]).String() == "float64" {
|
|
|
+ if util.Float64All(tmp[k]) > 0 {
|
|
|
+ w += v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ w += v
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return w, ""
|
|
|
+}
|
|
|
+
|
|
|
+// @Description tmp模版数据, tmp1补充数据
|
|
|
+// @Author J 2023/1/3 11:31
|
|
|
+func mergeTmp(tmp map[string]interface{}, tmp1 map[string]interface{}) map[string]interface{} {
|
|
|
+ update := make(map[string]interface{})
|
|
|
+ for _, v := range config.Conf.Serve.Fields {
|
|
|
+ if tmp[v] == nil && tmp1[v] != nil {
|
|
|
+ if reflect.TypeOf(tmp1[v]).String() == "string" && util.ObjToString(tmp1[v]) != "" {
|
|
|
+ update[v] = util.ObjToString(tmp1[v])
|
|
|
+ } else if reflect.TypeOf(tmp1[v]).String() == "[]interface {}" && len(tmp1[v].([]interface{})) > 0 {
|
|
|
+ update[v] = tmp1[v]
|
|
|
+ } else {
|
|
|
+ update[v] = tmp1[v]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return update
|
|
|
+}
|
|
|
+
|
|
|
+func updateMethod() {
|
|
|
+ arru := make([][]map[string]interface{}, 500)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-recordPool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 500 {
|
|
|
+ recordSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-recordSp
|
|
|
+ }()
|
|
|
+ MgoB.UpSertBulk("bidding_fusion_record", arru...)
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 500)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ recordSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-recordSp
|
|
|
+ }()
|
|
|
+ MgoB.UpSertBulk("bidding_fusion_record", arru...)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 500)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func updateMethod1() {
|
|
|
+ arru := make([][]map[string]interface{}, 500)
|
|
|
+ indexu := 0
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case v := <-updatePool:
|
|
|
+ arru[indexu] = v
|
|
|
+ indexu++
|
|
|
+ if indexu == 500 {
|
|
|
+ updateSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateSp
|
|
|
+ }()
|
|
|
+ MgoB.UpdateBulk("bidding_fusion", arru...)
|
|
|
+ }(arru)
|
|
|
+ arru = make([][]map[string]interface{}, 500)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ case <-time.After(1000 * time.Millisecond):
|
|
|
+ if indexu > 0 {
|
|
|
+ updateSp <- true
|
|
|
+ go func(arru [][]map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-updateSp
|
|
|
+ }()
|
|
|
+ MgoB.UpdateBulk("bidding_fusion", arru...)
|
|
|
+ }(arru[:indexu])
|
|
|
+ arru = make([][]map[string]interface{}, 500)
|
|
|
+ indexu = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|