|
@@ -20,19 +20,20 @@ var mutex sync.Mutex
|
|
//IncData 增量处理数据
|
|
//IncData 增量处理数据
|
|
func IncData() {
|
|
func IncData() {
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
- sess := Mgo.GetMgoConn()
|
|
|
|
|
|
+ sess := Mgo.GetMgoConn() //181 凭安
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
defer Mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
- q := bson.M{"jy_updatetime": bson.M{"$gt": jyUpdatetime}}
|
|
|
|
|
|
+ q := bson.M{"jy_updatetime": bson.M{"$gte": jyUpdatetime}}
|
|
var zid int
|
|
var zid int
|
|
|
|
+ queryCount, _ := sess.DB("mixdata").C("company_change").Find(q).Count()
|
|
|
|
+ util.Debug("queryCount", queryCount)
|
|
it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
|
|
it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
|
|
count := 0
|
|
count := 0
|
|
- ch := make(chan bool, 16)
|
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
|
-
|
|
|
|
|
|
+ realCount := 0
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
if count%20000 == 0 {
|
|
if count%20000 == 0 {
|
|
log.Println("current:", count)
|
|
log.Println("current:", count)
|
|
|
|
+ log.Println("current: realCount", realCount)
|
|
}
|
|
}
|
|
if util.ObjToString(tmp["_operation_type"]) == "update" {
|
|
if util.ObjToString(tmp["_operation_type"]) == "update" {
|
|
continue
|
|
continue
|
|
@@ -43,62 +44,53 @@ func IncData() {
|
|
}
|
|
}
|
|
|
|
|
|
zid = util.IntAll(tmp["_id"])
|
|
zid = util.IntAll(tmp["_id"])
|
|
- ch <- true
|
|
|
|
- wg.Add(1)
|
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
|
- defer func() {
|
|
|
|
- <-ch
|
|
|
|
- wg.Done()
|
|
|
|
- }()
|
|
|
|
- mutex.Lock()
|
|
|
|
- defer mutex.Unlock()
|
|
|
|
- //
|
|
|
|
- currentTime := time.Now().Unix()
|
|
|
|
- query := bson.M{"company_id": tmp["company_id"]}
|
|
|
|
- info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
|
|
|
|
- //原来数据有changes 字段就更新,追加数据
|
|
|
|
- if b && len(*info) > 0 {
|
|
|
|
- if util.ObjToString(tmp["_operation_type"]) == "insert" {
|
|
|
|
- update := make(map[string]interface{})
|
|
|
|
- item := make(map[string]interface{})
|
|
|
|
- item["change_field"] = tmp["change_field"]
|
|
|
|
- item["content_before"] = tmp["content_before"]
|
|
|
|
- item["content_after"] = tmp["content_after"]
|
|
|
|
- item["change_date"] = tmp["change_date"]
|
|
|
|
- setMark(item) //change_name_new
|
|
|
|
- //update["changes"] = changes
|
|
|
|
- update["update_time"] = currentTime
|
|
|
|
- saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
|
|
|
|
- MgoMix.Update("qyxy_change", map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false)
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- //没有的直接写入
|
|
|
|
- query := bson.M{"_id": tmp["company_id"]}
|
|
|
|
- qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
|
|
|
|
- if b1 && len(*qyxy) > 0 {
|
|
|
|
- save := make(map[string]interface{})
|
|
|
|
- var changes []map[string]interface{}
|
|
|
|
- item := make(map[string]interface{})
|
|
|
|
- item["change_field"] = tmp["change_field"]
|
|
|
|
- item["content_before"] = tmp["content_before"]
|
|
|
|
- item["content_after"] = tmp["content_after"]
|
|
|
|
- item["change_date"] = tmp["change_date"]
|
|
|
|
- setMark(item) //change_name_new
|
|
|
|
- changes = append(changes, item)
|
|
|
|
|
|
+ currentTime := time.Now().Unix()
|
|
|
|
+ query := bson.M{"company_id": tmp["company_id"]}
|
|
|
|
+ info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
|
|
|
|
+ //原来数据有changes 字段就更新,追加数据
|
|
|
|
+ if b && len(*info) > 0 {
|
|
|
|
+ realCount++
|
|
|
|
+ if util.ObjToString(tmp["_operation_type"]) == "insert" {
|
|
|
|
+ update := make(map[string]interface{})
|
|
|
|
+ item := make(map[string]interface{})
|
|
|
|
+ item["change_field"] = tmp["change_field"]
|
|
|
|
+ item["content_before"] = tmp["content_before"]
|
|
|
|
+ item["content_after"] = tmp["content_after"]
|
|
|
|
+ item["change_date"] = tmp["change_date"]
|
|
|
|
+ setMark(item) //change_name_new
|
|
|
|
+ //update["changes"] = changes
|
|
|
|
+ update["update_time"] = currentTime
|
|
|
|
+ saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
|
|
|
|
+ MgoMix.Update(CollSave, map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false)
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ //没有的直接写入
|
|
|
|
+ query := bson.M{"_id": tmp["company_id"]}
|
|
|
|
+ qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
|
|
|
|
+ if b1 && len(*qyxy) > 0 {
|
|
|
|
+ realCount++
|
|
|
|
+ save := make(map[string]interface{})
|
|
|
|
+ var changes []map[string]interface{}
|
|
|
|
+ item := make(map[string]interface{})
|
|
|
|
+ item["change_field"] = tmp["change_field"]
|
|
|
|
+ item["content_before"] = tmp["content_before"]
|
|
|
|
+ item["content_after"] = tmp["content_after"]
|
|
|
|
+ item["change_date"] = tmp["change_date"]
|
|
|
|
+ setMark(item) //change_name_new
|
|
|
|
+ changes = append(changes, item)
|
|
|
|
|
|
- //save["company_name"] = (*qyxy)["company_name"]
|
|
|
|
- save["company_id"] = (*qyxy)["_id"]
|
|
|
|
- save["changes"] = changes
|
|
|
|
- save["create_time"] = currentTime
|
|
|
|
- save["update_time"] = currentTime
|
|
|
|
- //saveInfo := map[string]interface{}{"$set": save}
|
|
|
|
- MgoMix.Save(CollSave, save)
|
|
|
|
- }
|
|
|
|
|
|
+ //save["company_name"] = (*qyxy)["company_name"]
|
|
|
|
+ save["company_id"] = (*qyxy)["_id"]
|
|
|
|
+ save["changes"] = changes
|
|
|
|
+ save["create_time"] = currentTime
|
|
|
|
+ save["update_time"] = currentTime
|
|
|
|
+ //saveInfo := map[string]interface{}{"$set": save}
|
|
|
|
+ MgoMix.Save(CollSave, save)
|
|
}
|
|
}
|
|
- }(tmp)
|
|
|
|
- tmp = map[string]interface{}{}
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|
|
- wg.Wait()
|
|
|
|
|
|
+
|
|
util.Debug("over---", count, zid)
|
|
util.Debug("over---", count, zid)
|
|
|
|
|
|
}
|
|
}
|