123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package main
- import (
- "fmt"
- "github.com/cron"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "log"
- "mongodb"
- "qfw/util"
- "regexp"
- "time"
- )
- func TimeTask() {
- GetData()
- c := cron.New()
- cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime分钟执行一次
- _ = c.AddFunc(cronstr, func() { GetData() })
- c.Start()
- }
- func GetData() {
- count := 0
- lastid := ""
- sess := MgoJy.GetMgoConn()
- defer MgoJy.DestoryMongoConn(sess)
- fields := map[string]interface{}{"s_entname": 1, "l_createdate": 1}
- q := bson.M{"l_createdate": bson.M{"$gt": LastTime}}
- query := sess.DB(JyDb).C(CollJy).Find(q).Select(fields).Iter()
- c := MgoJy.Count(CollJy, q)
- util.Debug("total count ----", c)
- tmp := make(map[string]interface{})
- for query.Next(&tmp) {
- lastid = mongodb.BsonIdToSId(tmp["_id"])
- if count%200 == 0 {
- util.Debug("jy ----current----", count, lastid)
- }
- q := map[string]interface{}{"company_name": tmp["s_entname"]}
- ent, _ := MgoQy.FindOne(CollSave, q)
- if (*ent) == nil {
- qytmp, _ := MgoQy.FindOne(CollQy, q)
- if qytmp != nil && len(*qytmp) > 0 {
- if LastTime < util.Int64All(tmp["l_createdate"]) {
- LastTime = util.Int64All(tmp["l_createdate"])
- util.Debug("lasttime", LastTime)
- }
- delete(tmp, "l_createdate")
- delete(tmp, "s_entname")
- tmp["changes"] = (*qytmp)["changes"]
- tmp["company_name"] = (*qytmp)["company_name"]
- tmp["company_id"] = (*qytmp)["company_id"]
- if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 {
- findEnt(tmp)
- count++
- }else {
- util.Debug("ent changes size 为 0", tmp["_id"])
- }
- }else {
- util.Debug("qyxy not find data", q)
- }
- }
- }
- util.Debug("jy 处理", count, "条数据")
- }
- func findEnt(tmp map[string]interface{}) {
- currentTime := time.Now().Unix()
- infoList := clearRepeat(tmp["changes"].([]interface{}))
- for _, item := range infoList {
- item1 := item.(map[string]interface{})
- setMark(item1)
- }
- tmp["_id"] = primitive.NewObjectID()
- tmp["createtime"] = currentTime
- tmp["updatetime"] = currentTime
- tmp["datasource"] = "focus"
- update := make(map[string]interface{})
- update["$set"] = tmp
- updateInfo := []map[string]interface{}{
- {
- "_id": tmp["_id"],
- },
- update,
- }
- MgoSaveCache <- updateInfo
- }
- func setMark(tmp map[string]interface{}) {
- for _, v := range ChangeMap {
- str := util.ObjToString(tmp["change_field"])
- regArr := v["change_key_reg"].([]string)
- for _, v1 := range regArr {
- matched, _ := regexp.MatchString(v1, str)
- if matched {
- tmp["change_name_new"] = v["change_name"]
- return
- }
- }
- }
- }
- func clearRepeat(list []interface{}) []interface{} {
- var tmp []interface{}
- if len(list) > 1 {
- for k, v := range list{
- if k < len(list)-1 {
- if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) {
- tmp = append(tmp, v)
- }
- }else {
- tmp = append(tmp, v)
- }
- }
- return tmp
- }else {
- return list
- }
- }
- var MgoSaveCache = make(chan []map[string]interface{}, 2000)
- var SP = make(chan bool, 5)
- func SaveData() {
- log.Println("Mgo Save...")
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-MgoSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- SP <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoQy.UpSertBulk(CollSave, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MgoQy.UpSertBulk(CollSave, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|