package main import ( "fmt" "github.com/cron" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "log" "mongodb" "qfw/util" "regexp" "time" ) func TimeTask() { GetData() c := cron.New() cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?" //每TaskTime分钟执行一次 _ = c.AddFunc(cronstr, func() { GetData() }) c.Start() } func GetData() { count := 0 lastid := "" sess := MgoJy.GetMgoConn() defer MgoJy.DestoryMongoConn(sess) fields := map[string]interface{}{"s_entname": 1, "l_createdate": 1} q := bson.M{"l_createdate": bson.M{"$gt": LastTime}} query := sess.DB(JyDb).C(CollJy).Find(q).Select(fields).Iter() c := MgoJy.Count(CollJy, q) util.Debug("total count ----", c) tmp := make(map[string]interface{}) for query.Next(&tmp) { lastid = mongodb.BsonIdToSId(tmp["_id"]) if count%200 == 0 { util.Debug("jy ----current----", count, lastid) } q := map[string]interface{}{"company_name": tmp["s_entname"]} ent, _ := MgoQy.FindOne(CollSave, q) if (*ent) == nil { qytmp, _ := MgoQy.FindOne(CollQy, q) if qytmp != nil && len(*qytmp) > 0 { if LastTime < util.Int64All(tmp["l_createdate"]) { LastTime = util.Int64All(tmp["l_createdate"]) util.Debug("lasttime", LastTime) } delete(tmp, "l_createdate") delete(tmp, "s_entname") tmp["changes"] = (*qytmp)["changes"] tmp["company_name"] = (*qytmp)["company_name"] tmp["company_id"] = (*qytmp)["company_id"] if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 { findEnt(tmp) count++ }else { util.Debug("ent changes size 为 0", tmp["_id"]) } }else { util.Debug("qyxy not find data", q) } } } util.Debug("jy 处理", count, "条数据") } func findEnt(tmp map[string]interface{}) { currentTime := time.Now().Unix() infoList := clearRepeat(tmp["changes"].([]interface{})) for _, item := range infoList { item1 := item.(map[string]interface{}) setMark(item1) } tmp["_id"] = primitive.NewObjectID() tmp["createtime"] = currentTime tmp["updatetime"] = currentTime tmp["datasource"] = "focus" update := make(map[string]interface{}) update["$set"] = tmp updateInfo := []map[string]interface{}{ { "_id": tmp["_id"], }, update, } MgoSaveCache <- updateInfo } func setMark(tmp map[string]interface{}) { for _, v := range ChangeMap { str := util.ObjToString(tmp["change_field"]) regArr := v["change_key_reg"].([]string) for _, v1 := range regArr { matched, _ := regexp.MatchString(v1, str) if matched { tmp["change_name_new"] = v["change_name"] return } } } } func clearRepeat(list []interface{}) []interface{} { var tmp []interface{} if len(list) > 1 { for k, v := range list{ if k < len(list)-1 { if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) { tmp = append(tmp, v) } }else { tmp = append(tmp, v) } } return tmp }else { return list } } var MgoSaveCache = make(chan []map[string]interface{}, 2000) var SP = make(chan bool, 5) func SaveData() { log.Println("Mgo Save...") arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-MgoSaveCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() MgoQy.UpSertBulk(CollSave, arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru [][]map[string]interface{}) { defer func() { <-SP }() MgoQy.UpSertBulk(CollSave, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }