Jianghan 2 년 전
부모
커밋
2e698b9163
13개의 변경된 파일147개의 추가작업 그리고 39개의 파일을 삭제
  1. 1 1
      README.md
  2. 1 0
      field_py/common.toml
  3. 1 0
      field_py/config/conf.go
  4. BIN
      field_py/field_dispose_linux
  5. 1 0
      field_py/go.mod
  6. 2 0
      field_py/go.sum
  7. 18 13
      field_py/task.go
  8. 1 0
      field_sync/go.mod
  9. 2 0
      field_sync/go.sum
  10. 7 1
      field_sync/task.go
  11. 2 0
      processing_ids/go.mod
  12. 32 0
      processing_ids/go.sum
  13. 79 24
      processing_ids/main.go

+ 1 - 1
README.md

@@ -16,7 +16,7 @@
 
 ## processing_ids 数据处理流程-id段保存
 + 定时5分钟,保存id段
-+ 保存id段(dataprocess=0,updatetime)—>招标分类(dataprocess=1,updatetime)—>标的物识别(dataprocess=2,updatetime)—>抽取(dataprocess=3,updatetime)—>字段清理(dataprocess=3,updatetime)—>行业分类(dataprocess=4,updatetime)—>判重(dataprocess=5,updatetime)—>bidding表字段同步(dataprocess=6,updatetime)
++ 保存id段(dataprocess=0,updatetime)—>招标分类(dataprocess=1,updatetime)—>标的物识别(dataprocess=2,updatetime)—>抽取(dataprocess=3,updatetime)—>字段清理(dataprocess=4,updatetime)—>业主分类(dataprocess=5,updatetime)—>判重(dataprocess=6,updatetime)—>bidding表字段同步(dataprocess=7,updatetime)
 
 ## data_tidb 数据处理流程-数据同步到tidb库(bidding、proejctset)  
 + bidding数据  

+ 1 - 0
field_py/common.toml

@@ -1,6 +1,7 @@
 
 [serve]
 grpcAddr = "192.168.3.12:10021"
+thread = 50
 
 [db]
 

+ 1 - 0
field_py/config/conf.go

@@ -32,6 +32,7 @@ type conf struct {
 
 type serve struct {
 	GrpcAddr string
+	Thread   int
 }
 
 type udp struct {

BIN
field_py/field_dispose_linux


+ 1 - 0
field_py/go.mod

@@ -6,6 +6,7 @@ require (
 	app.yhyue.com/BP/servicerd v0.0.0-20201203055056-87643512f867
 	app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f
 	github.com/BurntSushi/toml v1.2.0
+	go.mongodb.org/mongo-driver v1.11.0 // indirect
 	go.uber.org/zap v1.22.0
 	google.golang.org/grpc v1.49.0
 	google.golang.org/protobuf v1.27.1

+ 2 - 0
field_py/go.sum

@@ -114,6 +114,8 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
+go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=

+ 18 - 13
field_py/task.go

@@ -11,6 +11,7 @@ import (
 	"field-dispose/config"
 	"field-dispose/proto"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
@@ -29,9 +30,12 @@ var IpDialErrNum, IpGetErrNum, ExtractDialErrNum = int64(0), int64(0), int64(0)
 // @Author J 2022/8/31 14:57
 func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
 	defer util.Catch()
+
+	MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false)
+
 	sess := MgoB.GetMgoConn()
 	defer MgoB.DestoryMongoConn(sess)
-	ch := make(chan bool, 10)
+	ch := make(chan bool, config.Conf.Serve.Thread)
 	wg := &sync.WaitGroup{}
 	query := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -50,7 +54,7 @@ func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
 	it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(&query).Select(&field).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
-		if count%20000 == 0 {
+		if count%200 == 0 {
 			log.Info("getIntention", zap.Int("current:", count))
 		}
 		ch <- true
@@ -99,10 +103,6 @@ func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
 // @Description procurementlist
 // @Author J 2022/8/31 15:28
 func taskA(tmp map[string]interface{}, gtid, lteid string) map[string]interface{} {
-	toptype, _ := tmp["toptype"].(string)
-	if toptype != "采购意向" {
-		return nil
-	}
 	id := mongodb.BsonIdToSId(tmp["_id"])
 	delete(tmp, "_id")
 	delete(tmp, "detail")
@@ -183,7 +183,7 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	port := -1
 	//重试获取ip、port
 	for i := 1; i <= 3; i++ {
-		repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 0})
+		repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
 		if err != nil {
 			continue
 		} else {
@@ -211,9 +211,9 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	req := &proto.GoodsRequest{
 		Contents: reqStr,
 	}
-	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
-	defer cancel()
-	resp, err := client.GoodsExtract(ctx, req)
+	//ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
+	//defer cancel()
+	resp, err := client.GoodsExtract(context.Background(), req)
 	if err != nil {
 		return nil, errors.New("Deal Data Error")
 	}
@@ -221,6 +221,11 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	if json.Unmarshal([]byte(resp.Goods), &result) != nil {
 		return nil, errors.New("Json Unmarshal Error")
 	}
+	// 服务中心释放服务
+	_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
+	if err != nil {
+		return nil, err
+	}
 	if time.Since(start).Minutes() > 5 {
 		// py接口字段识别超过5分钟
 		log.Info("rpcGetFieldP 字段识别超过5min", zap.Any("serve", "goods_service"), zap.Any("reqStr", reqStr), zap.Any("ip+port", addr))
@@ -271,9 +276,9 @@ func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
 	req := &proto.ContentRequest{
 		Contents: reqStr,
 	}
-	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
-	defer cancel()
-	resp, err := client.Extract(ctx, req)
+	//ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
+	//defer cancel()
+	resp, err := client.Extract(context.Background(), req)
 	if err != nil {
 		return nil, errors.New("Deal Data Error")
 	}

+ 1 - 0
field_sync/go.mod

@@ -6,6 +6,7 @@ require (
 	app.yhyue.com/data_processing/common_utils v0.0.0-20220830011833-76d58ef43f4f
 	github.com/BurntSushi/toml v1.2.0
 	github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
+	go.mongodb.org/mongo-driver v1.11.0 // indirect
 	go.uber.org/zap v1.22.0
 	golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
 	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect

+ 2 - 0
field_sync/go.sum

@@ -57,6 +57,8 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
+go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=

+ 7 - 1
field_sync/task.go

@@ -10,11 +10,13 @@ import (
 	"field_sync/config"
 	"field_sync/oss"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"net"
 	"reflect"
 	"regexp"
 	"strings"
+	"time"
 )
 
 var (
@@ -25,6 +27,11 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	defer util.Catch()
 
 	stype := util.ObjToString(mapInfo["stype"])
+	if stype == "bidding" {
+		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
+			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
+		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 6, "updatetime": time.Now().Unix()}}, false, false)
+	}
 	// 领域标签处理的数据 id段
 	if stype == "bidding_history" {
 		MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
@@ -502,7 +509,6 @@ func taskinfo(id string) {
 			update["isValidFile"] = true
 		}
 	}
-	util.Debug(update)
 	if len(update) > 0 {
 		MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update})
 		//updateBidPool <- []map[string]interface{}{{

+ 2 - 0
processing_ids/go.mod

@@ -5,4 +5,6 @@ go 1.16
 require (
 	app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d
 	github.com/robfig/cron v1.2.0
+	go.mongodb.org/mongo-driver v1.11.0 // indirect
+	go.uber.org/zap v1.23.0
 )

+ 32 - 0
processing_ids/go.sum

@@ -1,50 +1,77 @@
 app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d h1:Nh2rC3LBqh0alvam2vr4is/vbUaPkl0rbZxVETx3nmk=
 app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
+github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
 github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
 github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
 github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
 github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
 github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
 github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
 github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
 github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
 github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
+go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
 go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
+go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -56,6 +83,7 @@ golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -68,6 +96,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
@@ -76,15 +105,18 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/olivere/elastic.v2 v2.0.61/go.mod h1:CTVyl1gckiFw1aLZYxC00g3f9jnHmhoOKcWF7W3c6n4=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 79 - 24
processing_ids/main.go

@@ -1,10 +1,14 @@
 package main
 
 import (
+	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"flag"
 	"fmt"
 	"github.com/robfig/cron"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.uber.org/zap"
 	"io/ioutil"
 	"net/http"
@@ -28,52 +32,103 @@ func init() {
 		Password:    "SJZY@O17t8herB3B",
 	}
 	MgoBid.InitPool()
-
-	startId = ""
 }
 
 func main() {
-	TimeTask()
+	flag.StringVar(&startId, "gtid", "", "开始id")
+	flag.Parse()
+	if startId != "" {
+		TimeTask()
+	} else {
+		flag.PrintDefaults()
+	}
 }
 
 func TimeTask() {
-	c := cron.New()
+	crn := cron.New()
 	cronstr := "0 */5 * * * ?" // 每5min执行一次
-	_ = c.AddFunc(cronstr, func() {
+	ct := 0
+	_ = crn.AddFunc(cronstr, func() {
+		ct += 1
+		util.Debug(fmt.Sprintf("task count: %d", ct))
 		taskinfo()
 	})
-	c.Start()
+	crn.Start()
+
+	c := make(chan bool, 1)
+	<-c
 }
 
 func taskinfo() {
-	info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
+	currentTime := time.Now()
+	m, _ := time.ParseDuration("-5m") // 5分钟之前
+	rtime := currentTime.Add(m)
+	eid := primitive.NewObjectIDFromTimestamp(rtime)
+	qfid := bson.M{"_id": bson.M{"$lte": eid}}
+	info, _ := MgoBid.Find("bidding", qfid, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
 	if info != nil {
 		lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
 	} else {
 		sendMail("bidding表id查询失败")
 		return
 	}
-	q := map[string]interface{}{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}
-	count := MgoBid.Count("bidding", q)
-	ids := fmt.Sprintf("%s-%s", startId, lastId)
-	if count <= 0 {
-		sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
-		return
+	info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2)
+	if len(*info1) > 1 {
+		startId = util.ObjToString((*info1)[0]["gtid"])
+		q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
+		count := MgoBid.Count("bidding", q)
+		ids := fmt.Sprintf("%s-%s", startId, lastId)
+		if count <= 0 {
+			sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
+			return
+		}
+		if count > 10000 {
+			startId = util.ObjToString((*info1)[0]["lteid"])
+			q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
+			util.Debug(q)
+			count = MgoBid.Count("bidding", q)
+			save := make(map[string]interface{})
+			save["gtid"] = startId
+			save["lteid"] = lastId
+			save["count"] = count
+			now := time.Now().Unix()
+			save["dataprocess"] = 0
+			save["createtime"] = now
+			save["updatetime"] = now
+			MgoBid.Save("bidding_processing_ids", save)
+			startId = lastId
+		} else {
+			update := make(map[string]interface{})
+			update["lteid"] = lastId
+			startId = util.ObjToString((*info1)[0]["gtid"])
+			update["count"] = count
+			update["updatetime"] = time.Now().Unix()
+			MgoBid.UpdateById("bidding_processing_ids", (*info1)[0]["_id"], map[string]interface{}{"$set": update})
+			startId = lastId
+		}
+	} else {
+		q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
+		count := MgoBid.Count("bidding", q)
+		ids := fmt.Sprintf("%s-%s", startId, lastId)
+		if count <= 0 {
+			sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
+			return
+		}
+		save := make(map[string]interface{})
+		save["gtid"] = startId
+		save["lteid"] = lastId
+		save["count"] = count
+		now := time.Now().Unix()
+		save["dataprocess"] = 0
+		save["createtime"] = now
+		save["updatetime"] = now
+		MgoBid.Save("bidding_processing_ids", save)
+		startId = lastId
 	}
-	save := make(map[string]interface{})
-	save["gtid"] = startId
-	save["lteid"] = lastId
-	save["count"] = count
-	now := time.Now().Unix()
-	save["dataprocess"] = 0
-	save["createtime"] = now
-	save["updatetime"] = now
-	MgoBid.Save("bidding_processing_ids", save)
-
 }
 
 func sendMail(content string) {
-	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_api, "processing_ids-send-fail", content))
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content))
 	if err == nil {
 		defer res.Body.Close()
 		read, err := ioutil.ReadAll(res.Body)