浏览代码

更新 企业变更程序

wcc 2 年之前
父节点
当前提交
f2d3868aeb
共有 4 个文件被更改,包括 152 次插入85 次删除
  1. 30 11
      qyxy_change_new/main.go
  2. 109 66
      qyxy_change_new/task.go
  3. 12 8
      qyxy_inc_data_new/main.go
  4. 1 0
      qyxy_std_new/main.go

+ 30 - 11
qyxy_change_new/main.go

@@ -15,7 +15,7 @@ var (
 	MgoMix, Mgo      *mongodb.MongodbSim
 	dbname           string
 	CollQy, CollSave string
-	lastId           int64 // company_change 开始读取的ID
+	jyUpdatetime     int64 // company_change 开始读取的更新时间
 	ChangeMap        []map[string]interface{}
 
 	timeReg, _ = regexp.Compile(`^[\d]{4}-[\d]{1,2}-[\d]{1,2}`)
@@ -48,7 +48,6 @@ func init() {
 
 	CollQy = Sysconfig["coll_qy"].(string)       //qyxy_std,全量同步的时候用到
 	CollSave = Sysconfig["coll_change"].(string) //qyxy_change,
-	lastId = util.Int64All(Sysconfig["lastId"])
 
 	ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
 	initChangeMap()
@@ -74,6 +73,7 @@ func main() {
 	UdpClient.Listen(processUdpMsg)
 	util.Debug("Udp服务监听======= port:", localPort)
 
+	go SaveData()
 	ch := make(chan bool, 1)
 	<-ch
 }
@@ -84,19 +84,38 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
 		util.Debug("processUdpMsg  mapinfo :=>", mapInfo)
+		if err != nil {
+			util.Debug("Unmarshal err :=>", err)
+		} else if mapInfo != nil {
+			key, _ := mapInfo["key"].(string)
+			if key == "" {
+				key = "udpok"
+			}
+			go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
 
-		//拿到同步信号,开始同步数据
-		if _, ok := mapInfo["start"]; ok {
-			if start_id, ok := mapInfo["start_id"]; ok {
-				if util.Int64All(start_id) > 0 {
-					lastId = util.Int64All(start_id)
+			//拿到同步信号,开始同步增量数据
+			if _, ok := mapInfo["start"]; ok {
+				if jy_updatetime, ok := mapInfo["jy_updatetime"]; ok {
+					if util.Int64All(jy_updatetime) > 0 {
+						jyUpdatetime = util.Int64All(jy_updatetime)
+					}
+				}
+				go IncData() //增量数据
+			}
+
+			if _, ok := mapInfo["stype"]; ok {
+				tasktype, _ := mapInfo["stype"].(string)
+				switch tasktype {
+				//全量数据
+				case "change_all":
+					go TaskAll()
+				case "change_by_id":
+					go IncByID(mapInfo)
+				default:
+					fmt.Println("tasktype", tasktype)
 				}
 			}
-			go IncData() //增量数据
-		}
 
-		if err != nil {
-			util.Debug("Unmarshal err :=>", err)
 		}
 
 	default:

+ 109 - 66
qyxy_change_new/task.go

@@ -15,41 +15,41 @@ var MgoSaveCache = make(chan map[string]interface{}, 5000)
 var SP = make(chan bool, 5)
 var updatePool = make(chan []map[string]interface{}, 5000)
 var updateSp = make(chan bool, 5)
+var mutex sync.Mutex
 
-//GetData 增量数据
-func GetData() {
+//IncData 增量处理数据
+func IncData() {
 	defer util.Catch()
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 
-	pool := make(chan bool, 10)
-	wg := &sync.WaitGroup{}
-
-	q := bson.M{"_id": bson.M{"$gt": lastId}}
-	//q := bson.M{"_id": 368454367}
+	q := bson.M{"jy_updatetime": bson.M{"$gt": jyUpdatetime}}
 	var zid int
 	it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
 	count := 0
+	ch := make(chan bool, 16)
+	wg := &sync.WaitGroup{}
 
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
-		if count%20000 == 0 {
+		if count%2000 == 0 {
 			log.Println("current:", count)
 		}
+
 		zid = util.IntAll(tmp["_id"])
-		pool <- true
+		ch <- true
 		wg.Add(1)
-		fmt.Println("zid =>", zid)
-		if util.Int64All(zid) > lastId {
-			lastId = util.Int64All(zid)
-		}
 		go func(tmp map[string]interface{}) {
 			defer func() {
-				<-pool
+				<-ch
 				wg.Done()
 			}()
+			mutex.Lock()
+			defer mutex.Unlock()
+			//
 			currentTime := time.Now().Unix()
 			query := bson.M{"company_id": tmp["company_id"]}
 			info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
+			//原来数据有changes 字段就更新,追加数据
 			if b && len(*info) > 0 {
 				if util.ObjToString(tmp["_operation_type"]) == "insert" {
 					update := make(map[string]interface{})
@@ -62,10 +62,10 @@ func GetData() {
 					//update["changes"] = changes
 					update["update_time"] = currentTime
 					saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
-					//updatePool <- saveInfo
-					MgoMix.UpdateById(CollSave, tmp["company_id"], saveInfo)
+					MgoMix.Update("qyxy_change", map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false)
 				}
 			} else {
+				//没有的直接写入
 				query := bson.M{"_id": tmp["company_id"]}
 				qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
 				if b1 && len(*qyxy) > 0 {
@@ -79,85 +79,125 @@ func GetData() {
 					setMark(item) //change_name_new
 					changes = append(changes, item)
 
-					save["company_name"] = (*qyxy)["company_name"]
+					//save["company_name"] = (*qyxy)["company_name"]
 					save["company_id"] = (*qyxy)["_id"]
 					save["changes"] = changes
 					save["create_time"] = currentTime
 					save["update_time"] = currentTime
-					//save["_id"] = primitive.NewObjectID()
-					saveInfo := map[string]interface{}{"$set": save}
-					//updatePool <- saveInfo
-					MgoMix.Save(CollSave, saveInfo)
+					//saveInfo := map[string]interface{}{"$set": save}
+					MgoMix.Save(CollSave, save)
 				}
 			}
 		}(tmp)
+		tmp = map[string]interface{}{}
 	}
+	wg.Wait()
 	util.Debug("over---", count, zid)
+
 }
 
-//IncData 增量处理数据
-func IncData() {
+//IncByID 通过传入id 更新数据
+func IncByID(mapinfo map[string]interface{}) {
 	defer util.Catch()
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 
-	q := bson.M{"_id": bson.M{"$gt": lastId}}
-	//q := bson.M{"_id": 368454367}
+	ch := make(chan bool, 16)
+	wg := &sync.WaitGroup{}
+
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt": util.Int64All(mapinfo["gtid"]),
+		},
+	}
+
 	var zid int
 	it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
 	count := 0
+
+	total := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
-		if count%2000 == 0 {
+		if count%20000 == 0 {
 			log.Println("current:", count)
+			log.Println("current _id:", tmp["_id"], "total", total)
+
+		}
+		//表更时间
+		if util.ObjToString(tmp["change_date"]) < "2023-01-01" {
+			continue
 		}
+
+		if util.ObjToString(tmp["_operation_type"]) == "update" {
+			continue
+		}
+		//废弃
+		if util.Int64All(tmp["use_flag"]) > 8 {
+			continue
+		}
+
 		zid = util.IntAll(tmp["_id"])
-		currentTime := time.Now().Unix()
-		query := bson.M{"company_id": tmp["company_id"]}
-		info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
-		//原来数据有changes 字段就更新,追加数据
-		if b && len(*info) > 0 {
-			if util.ObjToString(tmp["_operation_type"]) == "insert" {
-				update := make(map[string]interface{})
-				item := make(map[string]interface{})
-				item["change_field"] = tmp["change_field"]
-				item["content_before"] = tmp["content_before"]
-				item["content_after"] = tmp["content_after"]
-				item["change_date"] = tmp["change_date"]
-				setMark(item) //change_name_new
-				//update["changes"] = changes
-				update["update_time"] = currentTime
-				saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
-				MgoMix.UpdateById(CollSave, tmp["company_id"], saveInfo)
-			}
-		} else {
-			//没有的直接写入
-			query := bson.M{"_id": tmp["company_id"]}
-			qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
-			if b1 && len(*qyxy) > 0 {
-				save := make(map[string]interface{})
-				var changes []map[string]interface{}
-				item := make(map[string]interface{})
-				item["change_field"] = tmp["change_field"]
-				item["content_before"] = tmp["content_before"]
-				item["content_after"] = tmp["content_after"]
-				item["change_date"] = tmp["change_date"]
-				setMark(item) //change_name_new
-				changes = append(changes, item)
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			mutex.Lock()
+			defer mutex.Unlock()
+			currentTime := time.Now().Unix()
+			query := bson.M{"company_id": tmp["company_id"]}
+			info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
+			//原来数据有changes 字段就更新,追加数据
+			if b && len(*info) > 0 {
+				if util.ObjToString(tmp["_operation_type"]) == "insert" {
+					update := make(map[string]interface{})
+					item := make(map[string]interface{})
+					item["change_field"] = tmp["change_field"]
+					item["content_before"] = tmp["content_before"]
+					item["content_after"] = tmp["content_after"]
+					item["change_date"] = tmp["change_date"]
+					setMark(item) //change_name_new
+					//update["changes"] = changes
+					update["update_time"] = currentTime
+					saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
+					MgoMix.Update("qyxy_change", map[string]interface{}{"company_id": util.ObjToString(tmp["company_id"])}, saveInfo, true, false)
+					total++
+				}
+			} else {
+				total++
+				//没有的直接写入
+				query := bson.M{"_id": tmp["company_id"]}
+				qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
+				if b1 && len(*qyxy) > 0 {
+					save := make(map[string]interface{})
+					var changes []map[string]interface{}
+					item := make(map[string]interface{})
+					item["change_field"] = tmp["change_field"]
+					item["content_before"] = tmp["content_before"]
+					item["content_after"] = tmp["content_after"]
+					item["change_date"] = tmp["change_date"]
+					setMark(item) //change_name_new
+					changes = append(changes, item)
 
-				save["company_name"] = (*qyxy)["company_name"]
-				save["company_id"] = (*qyxy)["_id"]
-				save["changes"] = changes
-				save["create_time"] = currentTime
-				save["update_time"] = currentTime
-				saveInfo := map[string]interface{}{"$set": save}
-				MgoMix.Save(CollSave, saveInfo)
+					//save["company_name"] = (*qyxy)["company_name"]
+					save["company_id"] = (*qyxy)["_id"]
+					save["changes"] = changes
+					save["create_time"] = currentTime
+					save["update_time"] = currentTime
+					//saveInfo := map[string]interface{}{"$set": save}
+					MgoMix.Save(CollSave, save)
+				}
 			}
-		}
 
+		}(tmp)
+		tmp = map[string]interface{}{}
 	}
+	wg.Wait()
 	util.Debug("over---", count, zid)
 
 }
+
 func setMark(tmp map[string]interface{}) {
 	for _, v := range ChangeMap {
 		str := util.ObjToString(tmp["change_field"])
@@ -223,7 +263,9 @@ func TaskAll() {
 				disposeFuc(*info, tmp)
 			}
 		}(tmp)
+		tmp = map[string]interface{}{}
 	}
+	wg.Wait()
 }
 
 func taskInfo(id string) {
@@ -268,6 +310,7 @@ func disposeFuc(maps []map[string]interface{}, tmp map[string]interface{}) {
 	MgoSaveCache <- tmp
 }
 
+//SaveData 存量保存
 func SaveData() {
 	log.Println("Mgo Save...")
 	arru := make([]map[string]interface{}, 200)

+ 12 - 8
qyxy_inc_data_new/main.go

@@ -32,9 +32,9 @@ var (
 	saveArr   [][]map[string]interface{}
 	UdpClient udp.UdpClient
 
-	changeAddr *net.UDPAddr
-	readPath   string //文件夹目录
-	//lastID     int64
+	changeAddr   *net.UDPAddr
+	readPath     string //文件夹目录
+	jyUpdatetime int64
 )
 
 func init() {
@@ -100,10 +100,11 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 func task(path string) {
 	files, _ := ioutil.ReadDir(path)
 
+	jyUpdatetime = time.Now().Unix() //数据更新时间,更新quxy_change 使用
 	//拿到company_change 插入之前的ID
-	dats, _ := MongoTool.Find("company_change", nil, `{"_id":-1}`, nil, true, -1, 1)
-	ds := *dats
-	lastID := ds[0]["_id"]
+	//dats, _ := MongoTool.Find("company_change", nil, `{"_id":-1}`, nil, true, -1, 1)
+	//ds := *dats
+	//lastID := ds[0]["_id"]
 
 	//annual_report_base/20221122/split.json.gz
 	for _, f := range files {
@@ -146,8 +147,8 @@ func task(path string) {
 
 	//执行完毕,通知qyxy_change,更新企业变更信息
 	data := map[string]interface{}{
-		"start":    true,
-		"start_id": lastID,
+		"start":         true,
+		"jy_updatetime": jyUpdatetime,
 	}
 	SendUdpMsg(data, changeAddr)
 
@@ -209,6 +210,9 @@ func hookfn(line []byte, count int) int {
 	collCount++
 	tmp["_id"] = util.IntAll(tmp["id"])
 	tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"]))
+	if CurrentColl == "company_change" {
+		tmp["jy_updatetime"] = jyUpdatetime
+	}
 
 	saveInfo := []map[string]interface{}{
 		{"_id": tmp["_id"]},

+ 1 - 0
qyxy_std_new/main.go

@@ -129,6 +129,7 @@ func dealPath(path string) {
 		"end_time":   time.Now().Unix(),
 	}
 
+	log.Info("dealPath", zap.String(path, "数据同步结束"))
 	SendUdpMsg(data, qyxyEsAddr)
 }