Browse Source

Merge commit '7fb67479479479bfdeacad863cd582f7122bf795'

* commit '7fb67479479479bfdeacad863cd582f7122bf795':
  人脉全量优化行业
  youhua
  优化全量人脉
  1
  人脉全量
  fix:清除阻塞

# Conflicts:
#	data_project/load_data.go
#	data_project/task.go
Jianghan 8 months ago
parent
commit
d71d43dc3d

+ 7 - 5
data_project/load_data.go

@@ -3,20 +3,22 @@ package main
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"strings"
+	"sync"
+	"time"
+
 	"go.uber.org/zap"
 	"go.uber.org/zap"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
-	"strings"
-	"sync"
-	"time"
 )
 )
 
 
 // 初始加载数据,默认加载最近6个月的数据
 // 初始加载数据,默认加载最近6个月的数据
 func (p *ProjectTask) loadData(starttime int64) {
 func (p *ProjectTask) loadData(starttime int64) {
 	log.Info("load project start..", zap.Int64("starttime", starttime))
 	log.Info("load project start..", zap.Int64("starttime", starttime))
-	//p.findLock.Lock()
-	//defer p.findLock.Unlock()
+	//清除进程时 会卡住注释掉
+	// p.findLock.Lock()
+	// defer p.findLock.Unlock()
 	sess := MgoP.GetMgoConn()
 	sess := MgoP.GetMgoConn()
 	defer MgoP.DestoryMongoConn(sess)
 	defer MgoP.DestoryMongoConn(sess)
 	loadOver := make(chan bool)
 	loadOver := make(chan bool)

+ 5 - 4
data_project/project.go

@@ -3,16 +3,17 @@ package main
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"math"
+	"sort"
+	"strings"
+	"time"
+
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
-	"math"
-	"sort"
-	"strings"
-	"time"
 )
 )
 
 
 /**
 /**

+ 9 - 10
data_project/task.go

@@ -3,6 +3,13 @@ package main
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"project/config"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+	"unicode/utf8"
+
 	"github.com/goinggo/mapstructure"
 	"github.com/goinggo/mapstructure"
 	"github.com/robfig/cron"
 	"github.com/robfig/cron"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson"
@@ -12,12 +19,6 @@ import (
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
-	"project/config"
-	"regexp"
-	"strings"
-	"sync"
-	"time"
-	"unicode/utf8"
 )
 )
 
 
 /**
 /**
@@ -171,10 +172,8 @@ func (p *ProjectTask) clearMem() {
 	// 创建项目的时间大于7天
 	// 创建项目的时间大于7天
 	//在内存中保留最近6个月的信息
 	//在内存中保留最近6个月的信息
 	//跑全量时每5分钟跑一次,跑增量时400分钟跑一次
 	//跑全量时每5分钟跑一次,跑增量时400分钟跑一次
-	_ = c.AddFunc("0 0/1 * * * ?", func() {
-		log.Info("1")
-		if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 80 {
-			log.Info("2")
+	_ = c.AddFunc("0 */30 * * * ?", func() {
+		if (p.currentType == "ql" && SingleClear == 0) || p.clearContimes >= 20 {
 			SingleClear = 1
 			SingleClear = 1
 			//跳过的次数清零
 			//跳过的次数清零
 			p.clearContimes = 0
 			p.clearContimes = 0

+ 2 - 0
data_project_wy_all/README.md

@@ -0,0 +1,2 @@
+一、基于项目数据(project),生成物业项目信息
+二、基于增量招标信息(bidding),生成物业项目信息

+ 45 - 0
data_project_wy_all/clickhouse.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"github.com/ClickHouse/clickhouse-go/v2"
+	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"time"
+)
+
+// 创建clickhouse连接
+func InitClickHouse(addr []string, size int, database, username, password string) (driver.Conn, error) {
+	var (
+		ctx       = context.Background()
+		conn, err = clickhouse.Open(&clickhouse.Options{
+			//Addr: []string{"cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000"}, //内网
+			//Addr: []string{"cc-2ze9tv451wov14w9e.public.clickhouse.ads.aliyuncs.com:9000"}, //外网
+			Addr:         addr,
+			DialTimeout:  10 * time.Second,
+			MaxIdleConns: 3,
+			MaxOpenConns: size,
+			Auth: clickhouse.Auth{
+				//Database: "information",
+				//Username: "biservice",
+				//Password: "Bi_top95215#",
+				Database: database,
+				Username: username,
+				Password: password,
+			},
+			Debugf: func(format string, v ...interface{}) {
+				fmt.Printf(format, v)
+			},
+		})
+	)
+	if err != nil {
+		return nil, err
+	}
+	if err := conn.Ping(ctx); err != nil {
+		if exception, ok := err.(*clickhouse.Exception); ok {
+			fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
+		}
+		return nil, err
+	}
+	return conn, nil
+}

+ 33 - 0
data_project_wy_all/config.json

@@ -0,0 +1,33 @@
+{
+  "mgob": {
+    "addr": "192.168.3.166:27082",
+    "dbname": "qfw",
+    "size": 5,
+    "username": "",
+    "password": ""
+  },
+  "mgopro": {
+    "addr": "192.168.3.166:27082",
+    "dbname": "qfw",
+    "size": 5,
+    "username": "",
+    "password": ""
+  },
+  "clickhouse": {
+    "addr": ["192.168.3.207:19000"],
+    "database": "information",
+    "size": 20,
+    "username": "jytop",
+    "password": "pwdTopJy123"
+  },
+  "es": {
+    "addr": "http://192.168.3.149:9201",
+    "size": 10,
+    "index": "transaction_info",
+    "username": "",
+    "password": ""
+  },
+  "bidstarttime": 1713196800,
+  "prostarttime": 1713196800,
+  "startcron": "0 0 9 ? * *"
+}

BIN
data_project_wy_all/data_project_wy_all


+ 46 - 0
data_project_wy_all/go.mod

@@ -0,0 +1,46 @@
+module data_project_wy_all
+
+go 1.21.5
+
+require (
+	github.com/ClickHouse/clickhouse-go/v2 v2.23.0
+	github.com/gogf/gf/v2 v2.7.0
+	github.com/olivere/elastic/v7 v7.0.32
+	github.com/robfig/cron v1.2.0
+	go.mongodb.org/mongo-driver v1.11.4
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240412074219-927f3f682cb3
+)
+
+require (
+	github.com/ClickHouse/ch-go v0.61.5 // indirect
+	github.com/PuerkitoBio/goquery v1.8.0 // indirect
+	github.com/andybalholm/brotli v1.1.0 // indirect
+	github.com/andybalholm/cascadia v1.3.1 // indirect
+	github.com/dchest/captcha v1.0.0 // indirect
+	github.com/go-faster/city v1.0.1 // indirect
+	github.com/go-faster/errors v0.7.1 // indirect
+	github.com/golang/snappy v0.0.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/josharian/intern v1.0.0 // indirect
+	github.com/klauspost/compress v1.17.7 // indirect
+	github.com/mailru/easyjson v0.7.7 // indirect
+	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
+	github.com/paulmach/orb v0.11.1 // indirect
+	github.com/pierrec/lz4/v4 v4.1.21 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/segmentio/asm v1.2.0 // indirect
+	github.com/shopspring/decimal v1.3.1 // indirect
+	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+	github.com/xdg-go/scram v1.1.1 // indirect
+	github.com/xdg-go/stringprep v1.0.3 // indirect
+	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
+	go.opentelemetry.io/otel v1.24.0 // indirect
+	go.opentelemetry.io/otel/trace v1.24.0 // indirect
+	golang.org/x/crypto v0.21.0 // indirect
+	golang.org/x/net v0.22.0 // indirect
+	golang.org/x/sync v0.6.0 // indirect
+	golang.org/x/sys v0.18.0 // indirect
+	golang.org/x/text v0.14.0 // indirect
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 241 - 0
data_project_wy_all/go.sum

@@ -0,0 +1,241 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4=
+github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg=
+github.com/ClickHouse/clickhouse-go/v2 v2.23.0 h1:srmRrkS0BR8gEut87u8jpcZ7geOob6nGj9ifrb+aKmg=
+github.com/ClickHouse/clickhouse-go/v2 v2.23.0/go.mod h1:tBhdF3f3RdP7sS59+oBAtTyhWpy0024ZxDMhgxra0QE=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
+github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
+github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
+github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/aws/aws-sdk-go v1.43.21/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
+github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
+github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
+github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
+github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/gogf/gf/v2 v2.7.0 h1:CjxhbMiE7oqf6K8ZtGuKt3dQEwK4vL6LhiI+dI7tJGU=
+github.com/gogf/gf/v2 v2.7.0/go.mod h1:Qu8nimKt9aupJQcdUL85tWF4Mfxocz97zUt8UC4abVI=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
+github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
+github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
+github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
+github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
+github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
+github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
+github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
+github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
+github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
+github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
+github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
+github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
+github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
+github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
+github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
+github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.11.4 h1:4ayjakA013OdpGyL2K3ZqylTac/rMjrJOMZ1EHizXas=
+go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
+go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
+go.opentelemetry.io/otel v1.5.0/go.mod h1:Jm/m+rNp/z0eqJc74H7LPwQ3G87qkU/AnnAydAjSAHk=
+go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
+go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
+go.opentelemetry.io/otel/trace v1.5.0/go.mod h1:sq55kfhjXYr1zVSyexg0w1mpa03AYXR5eyTkB9NPPdE=
+go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
+go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
+golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
+golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
+golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
+golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240412074219-927f3f682cb3 h1:mTokQIoOu/oZ2oCSAPayIFfnglIHP0qbOw1Ez6biKDo=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240412074219-927f3f682cb3/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=

+ 390 - 0
data_project_wy_all/history.go

@@ -0,0 +1,390 @@
+package main
+
+import (
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"strings"
+	"sync"
+)
+
+// HisTransactionDataFromBid 历史bidding(指定截止comeintime,采购意向)
+func HisTransactionDataFromBid() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"toptype": "采购意向",
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		//
+		"publishtime":           1,
+		"comeintime":            1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
+				return
+			}
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForBid(tmp, "采购意向", 3)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
+// HisTransactionDataFromBid2 历史bidding(指定截止comeintime,新增物业项目)
+func HisTransactionDataFromBid2() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": 1713715200,
+			"$lt":  1713801600,
+		},
+		"toptype": "拟建",
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		//
+		"owner":                 1,
+		"s_topscopeclass":       1,
+		"publishtime":           1,
+		"toptype":               1,
+		"comeintime":            1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//comeintime := gconv.Int64(tmp["comeintime"])
+			//if comeintime < 1609430400 || comeintime >= 1713715200 {
+			//	return
+			//}
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			if s_topscopeclass := gconv.String(tmp["s_topscopeclass"]); !strings.Contains(s_topscopeclass, "建筑工程") { //排除非建筑工程
+				return
+			}
+			if tag_topinformation := gconv.String(tmp["tag_topinformation"]); strings.Contains(tag_topinformation, "物业") { //排除物业
+				return
+			} else if tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"]); strings.Contains(tag_topinformation_ai, "物业") {
+				return
+			}
+			//if tmp["tag_topinformation"] != nil || tmp["tag_topinformation_ai"] != nil { //不包含物业
+			//	return
+			//}
+			project_bidstatus := 4 //拟建
+			business_type := "新增物业项目"
+			result := DealTransactionForBid(tmp, business_type, project_bidstatus)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy_nj", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy_nj", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
+// HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
+func HisTransactionDataFromProject() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"pici": map[string]interface{}{
+			"$lt": 1713196800,
+			//"$gt": 1711900800,
+		},
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		"zbtime":        1,
+		"jgtime":        1,
+		"bidstatus":     1,
+		//
+		"firsttime":             1,
+		"ids":                   1,
+		"pici":                  1,
+		"sourceinfoid":          1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForPro(tmp)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy_newback", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy_newback", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
+// HisTransactionDataAddInformation 补充字段信息
+func HisTransactionDataAddInformation() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 1)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		//"_id": mongodb.StringTOBsonId("662f01d8397fa006e2e75e6c"),
+		//项目
+		//"_id": map[string]interface{}{
+		//	"$gte": mongodb.StringTOBsonId("66308fa06f6c86a3960ae83f"),
+		//	"$lte": mongodb.StringTOBsonId("66308feb6f6c86a3960b0f4e"),
+		//},
+		//拟建
+		//"project_bidstatus": 4,
+		//"_id": map[string]interface{}{
+		//	"$lte": mongodb.StringTOBsonId("6627227819c5408c474c3802"),
+		//},
+		//采购意向
+		//"project_bidstatus": 3,
+		//"_id": map[string]interface{}{
+		//	"$lte": mongodb.StringTOBsonId("661f798b5a4e6cc01349dad0"),
+		//},
+
+		//历史projectset_wy
+		//"project_id": map[string]interface{}{
+		//	//"$gt":  "662143800000000000000000",
+		//	"$gt": "667c3b5166cf0db42ae965e6",
+		//},
+
+		"project_id": "6637ae0866cf0db42aeeb5d4",
+		//历史projectset_wy_back
+		//"update_time": map[string]interface{}{
+		//	"$gte": 1714959573,
+		//	"$lte": 1719795791,
+		//},
+	}
+	count := MgoPro.Count("projectset_wy_back", query)
+	fmt.Println("count:", count)
+	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			update := []map[string]interface{}{
+				{"_id": mongodb.StringTOBsonId(id)},
+			}
+			set := map[string]interface{}{}
+			//法人信息
+			buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
+			//更新
+			set["buyer_id"] = buyer_id
+			set["agency_id"] = agency_id
+			set["winner_id"] = winner_ids
+			//保存
+			tmp["buyer_id"] = buyer_id
+			tmp["agency_id"] = agency_id
+			tmp["winner_id"] = winner_ids
+
+			if from := gconv.String(tmp["from"]); from == "project" {
+				//项目信息补充业态
+				//project_id := gconv.String(tmp["project_id"])
+				//pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
+				//if len(*pro) > 0 && (*pro)["property_form"] != nil {
+				//	//更新
+				//	set["property_form"] = (*pro)["property_form"]
+				//	//保存
+				//	tmp["property_form"] = (*pro)["property_form"]
+				//}
+
+				//查询情报信息
+				ids := gconv.Strings(tmp["info_ids"])
+				info := FindInfomationData(ids...) //情报信息查询
+				//更新
+				set["information_id"] = info.Id
+				set["starttime"] = info.Starttime
+				set["endtime"] = info.Endtime
+				//保存
+				tmp["information_id"] = info.Id
+				tmp["starttime"] = info.Starttime
+				tmp["endtime"] = info.Endtime
+			} else {
+				if project_bidstatus := gconv.Int(tmp["project_bidstatus"]); project_bidstatus == 4 { //拟建新增物业项目,补充情报信息
+					//查询情报信息
+					id := gconv.String(tmp["info_id"])
+					info := FindInfomationData(id) //情报信息查询
+					//更新
+					set["information_id"] = info.Id
+					set["starttime"] = info.Starttime
+					set["endtime"] = info.Endtime
+					//保存
+					tmp["information_id"] = info.Id
+					tmp["starttime"] = info.Starttime
+					tmp["endtime"] = info.Endtime
+				}
+			}
+			delete(tmp, "from")     //无用字段删除
+			delete(tmp, "_id")      //无用字段删除
+			if !SaveDataToEs(tmp) { //保存、更新es
+				fmt.Println("数据保存es失败,数据类型  项目project_id", tmp["project_id"])
+			}
+			var err error
+			err = UpdateOrSaveDataToClickHouse(tmp) //保存、更新clickhouse
+			if err != nil {
+				fmt.Println("数据迁移失败,数据类型 项目project_id", tmp["project_id"], err)
+			}
+			//更新
+			update = append(update, map[string]interface{}{"$set": set})
+			lock.Lock()
+			arr = append(arr, update)
+			if len(arr) > 100 {
+				MgoPro.UpdateBulk("projectset_wy_back", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	fmt.Println("迁移结束...")
+}

+ 105 - 0
data_project_wy_all/init.go

@@ -0,0 +1,105 @@
+package main
+
+import (
+	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+)
+
+type conf struct {
+	MgoB   db `json:"mgob"`
+	MgoPro db `json:"mgopro"`
+	//MysqlDb    db  `json:"mysqldb"`
+	ClickHouse   ckh    `json:"clickhouse"`
+	Es           db     `json:"es"`
+	BidStartTime int64  `json:"bidstarttime"` //bidding增量起始id
+	ProStartTime int64  `json:"prostarttime"` //project增量起始id
+	StartCron    string `json:"startcron"`
+}
+type db struct {
+	Addr     string `json:"addr"`
+	DbName   string `json:"dbname"`
+	Size     int    `json:"size"`
+	Username string `json:"username"`
+	Password string `json:"password"`
+	Index    string `json:"index"`
+}
+type ckh struct {
+	Addr     []string `json:"addr"`
+	DataBase string   `json:"database"`
+	Size     int      `json:"size"`
+	Username string   `json:"username"`
+	Password string   `json:"password"`
+}
+
+var (
+	Config  conf
+	MgoB    *mongodb.MongodbSim //bidding
+	MgoPro  *mongodb.MongodbSim //project
+	CkhTool driver.Conn         //
+	Es      *elastic.Elastic
+	//MysqlTool *mysqldb.Mysql
+	BidStartTime int64
+	ProStartTime int64
+)
+
+//var (
+//	TransactionSaveCache = make(chan map[string]interface{}, 1000) //
+//	Transaction_Ch       = make(chan bool, 5)
+//)
+
+func InitMgo() {
+	//bidding
+	MgoB = &mongodb.MongodbSim{
+		MongodbAddr: Config.MgoB.Addr,
+		DbName:      Config.MgoB.DbName,
+		Size:        Config.MgoB.Size,
+		UserName:    Config.MgoB.Username,
+		Password:    Config.MgoB.Password,
+	}
+	MgoB.InitPool()
+	//project
+	MgoPro = &mongodb.MongodbSim{
+		MongodbAddr: Config.MgoPro.Addr,
+		DbName:      Config.MgoPro.DbName,
+		Size:        Config.MgoPro.Size,
+		UserName:    Config.MgoPro.Username,
+		Password:    Config.MgoPro.Password,
+	}
+	MgoPro.InitPool()
+}
+
+//func InitMysql() {
+//	MysqlTool = &mysqldb.Mysql{
+//		Address:  Config.MysqlDb.Addr,
+//		DBName:   Config.MysqlDb.DbName,
+//		UserName: Config.MysqlDb.Username,
+//		PassWord: Config.MysqlDb.Password,
+//	}
+//	MysqlTool.Init()
+//}
+
+func InitCkh() {
+	CkhTool, _ = InitClickHouse(
+		Config.ClickHouse.Addr,
+		Config.ClickHouse.Size,
+		Config.ClickHouse.DataBase,
+		Config.ClickHouse.Username,
+		Config.ClickHouse.Password,
+	)
+}
+
+func InitEs() {
+	Es = &elastic.Elastic{
+		S_esurl:  Config.Es.Addr,
+		I_size:   Config.Es.Size,
+		Username: Config.Es.Username,
+		Password: Config.Es.Password,
+	}
+	Es.InitElasticSize()
+}
+
+func InitOther() {
+	BidStartTime = Config.BidStartTime
+	ProStartTime = Config.ProStartTime
+}

+ 290 - 0
data_project_wy_all/main.go

@@ -0,0 +1,290 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"log"
+	"sync"
+
+	"github.com/gogf/gf/v2/util/gconv"
+	"github.com/olivere/elastic/v7"
+	"github.com/robfig/cron"
+)
+
+func init() {
+	ReadConfig(&Config) //初始化
+	InitMgo()           //mgo
+	InitCkh()           //clickhouse
+	InitEs()            //es
+	InitOther()
+}
+
+func main() {
+	c := cron.New()
+	c.AddFunc("0 0 4 ? * *", IncTransactionDataFromBid) //增量bidding和项目数据
+	c.Start()
+	d := cron.New()
+	d.AddFunc("0 0 5 ? * *", IncTransactionDataFromPro) //增量bidding和项目数据
+	d.Start()
+	//历史
+	//HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800采购意向 TODO 待补充法人信息
+	//HisTransactionDataFromBid2() //历史招标(bidding)数据,截止时间1713628800新增项目 TODO 待补充情报信息、法人信息
+	//HisTransactionDataFromProject() //历史项目数据(projectset_20230904)TODO 待补充业态、情报信息、法人信息
+	//临时处理(信息补充)
+	//HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
+	//IncTransactionDataMgoToCkhAndEs()  //数据迁移
+	ch := make(chan bool)
+	<-ch
+}
+
+func tmp() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		//"project_bidstatus": 4,
+		//"_id": map[string]interface{}{
+		//	"$gte": mongodb.StringTOBsonId("66213b290f6ba3eb160617ad"),
+		//},
+		//"update_time": map[string]interface{}{
+		//	"$lt": 1714959573,
+		//},
+		//"_id": mongodb.StringTOBsonId("6630eae76f6c86a3962f3a07"),
+		//"repeat": true,
+		"update_time": map[string]interface{}{
+			"$gte": 1714959573,
+			"$lte": 1719795791,
+		},
+	}
+	repeat := map[string]bool{}
+	count := MgoPro.Count("projectset_wy_back", query)
+	fmt.Println("count:", count)
+	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			update := []map[string]interface{}{}
+			project_id := gconv.String(tmp["project_id"])
+			//lock.Lock()
+			//if !repeat[project_id] {
+			//Es.DelById(Config.Es.Index, project_id)
+			//CkhTool.Exec(context.Background(), "ALTER TABLE information.transaction_info_copy DELETE WHERE project_id = ?", project_id)
+			//repeat[project_id] = true
+			//}
+			//lock.Unlock()
+			//err, result := Es.GetById(Config.Es.Index, project_id)
+			//Es.DelById()
+			//if err != nil || len(result) == 0 {
+			//	fmt.Println(project_id)
+			//	update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			//	update = append(update, map[string]interface{}{"$set": map[string]interface{}{"es": false}})
+			//} else {
+			//	if gconv.Int(result["project_bidstatus"]) != 0 {
+			//		fmt.Println("11", project_id)
+			//	}
+			//}
+
+			tt := map[string]bool{}
+			err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
+			if err != nil || len(result) == 0 {
+				tt["es"] = true
+			}
+			if FindClickHouseByProjectId(project_id) == 0 {
+				tt["click"] = true
+			}
+			if len(tt) > 0 {
+				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+				update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": tt}})
+			}
+			//if MgoPro.Count("projectset_wy_back", map[string]interface{}{"project_id": tmp["project_id"]}) > 1 {
+			//	fmt.Println("project_id")
+			//	update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			//	update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "project"}})
+			//} else if MgoB.Count("projectset_wy", map[string]interface{}{"project_id": tmp["project_id"]}) > 0 {
+			//	update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+			//	update = append(update, map[string]interface{}{"$set": map[string]interface{}{"repeat": "bidding"}})
+			//}
+			if len(update) > 0 {
+				lock.Lock()
+				arr = append(arr, update)
+				if len(arr) > 500 {
+					MgoPro.UpdateBulk("projectset_wy_back", arr...)
+					arr = [][]map[string]interface{}{}
+				}
+				lock.Unlock()
+			}
+		}(tmp)
+		if n%1000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	fmt.Println("迁移结束...", len(repeat))
+}
+
+func getBiddingData() {
+	url := "http://172.17.4.184:19908"
+	//url := "http://127.0.0.1:19908"
+	//url := "http://192.168.3.149:9200"
+	username := "jybid"
+	password := "Top2023_JEB01i@31"
+	index := "transaction_info" //索引名称
+	// 创建 Elasticsearch 客户端
+	client, err := elastic.NewClient(
+		elastic.SetURL(url),
+		elastic.SetBasicAuth(username, password),
+		elastic.SetSniff(false),
+	)
+	if err != nil {
+		log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
+	}
+
+	//rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
+	query := elastic.NewBoolQuery().
+		Must(elastic.NewTermQuery("id", "64d7146cb44bf08751e3c133")) //
+	//Must(rangeQuery)
+	//Must(elastic.NewTermQuery("subtype", "招标"))
+
+	//query := elastic.NewBoolQuery().
+	//        //北京,天津,河北,上海,江苏,浙江,安徽
+	//        //Must(elastic.NewTermQuery("area", "北京市")).
+	//        Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
+	//        Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
+	//        Must(rangeQuery)
+
+	ctx := context.Background()
+	//开始滚动搜索
+	scrollID := ""
+	scroll := "10m"
+	searchSource := elastic.NewSearchSource().
+		Query(query).
+		Size(500)
+	//Sort("_doc", true) //升序排序
+	//Sort("_doc", false) //降序排序
+
+	searchService := client.Scroll(index).
+		Size(500).
+		Scroll(scroll).
+		SearchSource(searchSource)
+
+	res, err := searchService.Do(ctx)
+
+	if err != nil {
+		if err == io.EOF {
+			fmt.Println("没有数据")
+		} else {
+			panic(err)
+		}
+
+	}
+	//defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
+	fmt.Println("总数是:", res.TotalHits())
+
+	total := 0
+	for len(res.Hits.Hits) > 0 {
+		for _, hit := range res.Hits.Hits {
+			var doc map[string]interface{}
+			err := json.Unmarshal(hit.Source, &doc)
+			if err != nil {
+				log.Printf("解析文档失败:%s", err)
+				continue
+			}
+			set := map[string]interface{}{}
+			id := gconv.String(doc["id"])
+			//情报
+			information_id := gconv.String(doc["information_id"])
+			info_ids := gconv.Strings(doc["info_ids"])
+			info := FindInfomationData(info_ids...)
+			if information_id != info.Id {
+				doc["information_id"] = info.Id
+				doc["starttime"] = info.Starttime
+				doc["endtime"] = info.Endtime
+				set["information_id"] = info.Id
+				set["starttime"] = info.Starttime
+				set["endtime"] = info.Endtime
+			}
+
+			//法人
+			buyer, agency := "", ""
+			winners := []string{}
+			if gconv.String(doc["buyer_id"]) == "" {
+				buyer = gconv.String(doc["buyer"])
+			}
+			if gconv.String(doc["agency_id"]) == "" {
+				agency = gconv.String(doc["agency"])
+			}
+			if len(gconv.Strings(doc["winner_id"])) == 0 {
+				if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
+					winners = winnersTmp
+				}
+			}
+			buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
+			if buyer_id != "" {
+				doc["buyer_id"] = buyer_id
+				set["buyer_id"] = buyer_id
+			}
+			if agency_id != "" {
+				doc["agency_id"] = agency_id
+				set["agency_id"] = agency_id
+			}
+			if len(winner_ids) > 0 {
+				doc["winner_id"] = winner_ids
+				set["winner_id"] = winner_ids
+			}
+			//
+			coll := "projectset_wy_back"
+			business_type := gconv.String(doc["business_type"])
+			if business_type == "新增项目" {
+				coll = "projectset_wy"
+				doc["business_type"] = "新增物业项目"
+				set["business_type"] = "新增物业项目"
+			} else if business_type == "采购意向" {
+				coll = "projectset_wy"
+			}
+			fmt.Println(set)
+			if len(set) > 0 {
+				//更新es
+				client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
+				//更新clickhouse
+				err := UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
+				fmt.Println("11", err)
+				//更新mgo
+				MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
+			}
+		}
+		total = total + len(res.Hits.Hits)
+		scrollID = res.ScrollId
+		res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
+		log.Println("current count:", total)
+		if err != nil {
+			if err == io.EOF {
+				// 滚动到最后一批数据,退出循环
+				break
+			}
+			log.Println("滚动搜索失败:", err, res)
+			break // 处理错误时退出循环
+		}
+	}
+	// 在循环外调用 ClearScroll
+	_, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
+	if err != nil {
+		log.Printf("清理滚动搜索失败:%s", err)
+	}
+	fmt.Println("结束~~~~~~~~~~~~~~~")
+}

+ 702 - 0
data_project_wy_all/task.go

@@ -0,0 +1,702 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/gogf/gf/v2/util/gconv"
+	"go.mongodb.org/mongo-driver/bson"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+)
+
+type Transaction struct {
+	Project_Id        string   `bson:"project_id"`
+	Project_Name      string   `bson:"project_name"`
+	Project_Budget    float64  `bson:"project_budget"`
+	Project_Bidamount float64  `bson:"project_bidamount"`
+	Project_Money     float64  `bson:"project_money"`
+	Business_Type     string   `bson:"business_type"`
+	Project_Bidstatus int      `bson:"project_bidstatus"`
+	Info_Id           string   `bson:"info_id"`
+	Info_Ids          []string `bson:"info_ids"`
+	Information_Id    string   `bson:"information_id"`
+	BuyerClass        string   `bson:"buyerclass"`
+	Buyer             string   `bson:"buyer"`
+	Buyer_Id          string   `bson:"buyer_id"`
+	Winner            []string `bson:"winner"`
+	Winner_Id         []string `bson:"winner_id"`
+	Agency            string   `bson:"agency"`
+	Agency_Id         string   `bson:"agency_id"`
+	Property_Form     []string `bson:"property_form"`
+	SubClass          []string `bson:"subclass"`
+	MultiPackage      int      `bson:"multipackage"`
+	Topscopeclass     []string `bson:"topscopeclass"`
+	Area              string   `bson:"area"`
+	City              string   `bson:"city"`
+	District          string   `bson:"district"`
+	ZbTime            int64    `bson:"zbtime"`
+	JgTime            int64    `bson:"jgtime"`
+	StartTime         int64    `bson:"starttime"`
+	EndTime           int64    `bson:"endtime"`
+	Create_Time       int64    `bson:"create_time"`
+	Update_Time       int64    `bson:"update_time"`
+	//
+	// From string `bson:"from"`
+}
+
+var regLetter = regexp.MustCompile("[a-z]*")
+
+func IncTransactionDataFromBidAndPro() {
+	// IncTransactionDataFromBid() //bidding
+	IncTransactionDataFromPro() //project
+	// IncTransactionDataMgoToCkhAndEs() //mongodb迁移至clickhouse
+}
+
+// IncTransactionDataFromBid 增量bidding
+func IncTransactionDataFromBid() {
+	stime := time.Now().AddDate(0, 0, -1)
+	BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
+	fmt.Println("开始执行增量采购意向、拟建信息")
+	query := map[string]interface{}{
+		"pici": map[string]interface{}{
+			"$gte": BidStartTime,
+			"$lt":  BidStartTime + 86400,
+		},
+	}
+	fmt.Println("增量bidding采购意向query:", query)
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 1)
+	wg := &sync.WaitGroup{}
+	// lock := &sync.Mutex{}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		"buyerclass":    1,
+		//
+		"owner":                 1,
+		"s_topscopeclass":       1,
+		"publishtime":           1,
+		"toptype":               1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	// arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Sort("-_id").Iter()
+	n := 0
+	// count := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			toptype := gconv.String(tmp["toptype"])
+			// tag_topinformation := gconv.String(tmp["tag_topinformation"])
+			// tag_topinformation_ai := gconv.String(tmp["tag_topinformation_ai"])
+			var business_type string
+			var project_bidstatus int
+			if toptype == "采购意向" { //采购意向数据
+				// if !strings.Contains(tag_topinformation, "物业") && !strings.Contains(tag_topinformation_ai, "物业") {
+				// 	return
+				// }
+				business_type = "采购意向"
+				project_bidstatus = 3
+			} else if toptype == "拟建" {
+				s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
+				// if !strings.Contains(s_topscopeclass, "建筑工程") || strings.Contains(tag_topinformation, "物业") || strings.Contains(tag_topinformation_ai, "物业") {
+				if !strings.Contains(s_topscopeclass, "建筑工程") {
+					return
+				}
+				business_type = "新增物业项目"
+				project_bidstatus = 4
+			} else {
+				return
+			}
+			result := DealTransactionForBid(tmp, business_type, project_bidstatus)
+			if !SaveDataToEs(result) { //保存、更新es
+				fmt.Println("数据保存es失败,项目project_id", result["project_id"])
+			}
+			SaveDataToClickHouse(result)
+		}(tmp)
+		if n%1000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	fmt.Println("增量采购意向、拟建信息结束")
+}
+
+// DealTransactionForBid bidding采购意向、拟建数据处理
+func DealTransactionForBid(tmp map[string]interface{}, business_type string, project_bidstatus int) map[string]interface{} {
+	//基本信息封装
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	buyerclass := gconv.String(tmp["buyerclass"])
+	buyer := gconv.String(tmp["buyer"])
+	if buyer == "" {
+		buyer = gconv.String(tmp["owner"])
+	}
+	winner := gconv.String(tmp["s_winner"])
+	agency := gconv.String(tmp["agency"])
+	property_form := []string{}
+	if tmp["property_form"] != nil {
+		property_form = gconv.Strings(tmp["property_form"])
+	}
+	bidamount := gconv.Float64(tmp["bidamount"])
+	budget := gconv.Float64(tmp["budget"])
+	money := bidamount
+	if money <= 0 {
+		money = budget
+	}
+
+	//物业分类
+	subclass := []string{}
+	if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil {
+		subclass = gconv.Strings(tag_subinformation)
+	} else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil {
+		subclass = gconv.Strings(tag_subinformation_ai)
+	}
+
+	//情报信息查询
+	// info := FindInfomationData(id)
+	topscopeclass := []string{}
+	s_topscopeclass := gconv.String(tmp["s_topscopeclass"])
+	if s_topscopeclass != "" {
+		topscopeclass = strings.Split(s_topscopeclass, ",")
+	}
+	//法人信息
+	winners := []string{}
+	if winner != "" {
+		winners = strings.Split(winner, ",")
+	}
+	buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
+	//物业信息
+	t := &Transaction{
+		Project_Id:        id,
+		Project_Name:      gconv.String(tmp["projectname"]),
+		Project_Budget:    budget,
+		Project_Bidamount: bidamount,
+		Project_Money:     money,
+		Business_Type:     business_type,
+		Project_Bidstatus: project_bidstatus,
+		Info_Id:           id,
+		Info_Ids:          []string{id},
+		// Information_Id:    info.Id,
+		BuyerClass:    buyerclass,
+		Buyer:         buyer,
+		Topscopeclass: topscopeclass,
+		Winner:        winners,
+		Agency:        agency,
+		Buyer_Id:      buyer_id,
+		Winner_Id:     winner_ids,
+		Agency_Id:     agency_id,
+		Property_Form: property_form,
+		SubClass:      subclass,
+		MultiPackage:  gconv.Int(tmp["multipackage"]),
+		Area:          gconv.String(tmp["area"]),
+		City:          gconv.String(tmp["city"]),
+		District:      gconv.String(tmp["district"]),
+		ZbTime:        gconv.Int64(tmp["publishtime"]),
+		JgTime:        int64(0),
+		// StartTime:         info.Starttime,
+		// EndTime:           info.Endtime,
+		Create_Time: time.Now().Unix(),
+		Update_Time: time.Now().Unix(),
+		//
+		// From: "bidding",
+	}
+	result := map[string]interface{}{}
+	infomation, _ := bson.Marshal(t)
+	bson.Unmarshal(infomation, &result)
+	return result
+}
+
+// IncTransactionDataFromProject 增量project
+func IncTransactionDataFromPro() {
+	stime := time.Now().AddDate(0, 0, -1)
+	BidStartTime := time.Date(stime.Year(), stime.Month(), stime.Day(), 0, 0, 0, 0, stime.Location()).Unix()
+	fmt.Println("开始执行增量项目信息")
+	query := map[string]interface{}{
+		"pici": map[string]interface{}{
+			"$gte": BidStartTime,
+			"$lt":  BidStartTime + 86400,
+		},
+	}
+	fmt.Println("增量项目查询query:", query)
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 1)
+	wg := &sync.WaitGroup{}
+	// lock := &sync.Mutex{}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		"zbtime":        1,
+		"jgtime":        1,
+		"bidstatus":     1,
+		"buyerclass":    1,
+		"topscopeclass": 1,
+		//
+		"firsttime":             1,
+		"pici":                  1,
+		"ids":                   1,
+		"sourceinfoid":          1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			bidstatus := gconv.String(tmp["bidstatus"])
+			if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" || bidstatus == "招标" {
+				result := DealTransactionForPro(tmp)
+				project_id := gconv.String(result["project_id"])
+				if !SaveDataToEs(result) { //保存、更新es
+					fmt.Println("数据保存es失败,项目project_id", result["project_id"])
+				}
+				count := FindClickHouseByProjectId(project_id) //查询
+				if count > 0 {                                 //更新
+					delete(result, "create_time") //不更新创建时间
+					delete(result, "project_id")  //不更新项目id(主键)
+					err := UpdateDataToClickHouse(result, map[string]interface{}{"project_id": project_id})
+					if err != nil {
+						fmt.Println("clickhouse更新失败", project_id, result)
+					}
+				} else { //插入
+					err := SaveDataToClickHouse(result)
+					if err != nil {
+						fmt.Println("clickhouse保存失败", project_id, result)
+					}
+				}
+			}
+		}(tmp)
+		if n%1000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	fmt.Println("增量项目信息结束")
+}
+
+// DealTransactionForPro project数据处理
+func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
+	//基本信息封装
+	id := mongodb.BsonIdToSId(data["_id"])
+	buyerclass := gconv.String(data["buyerclass"])
+	buyer := gconv.String(data["buyer"])
+	winner := gconv.String(data["s_winner"])
+	agency := gconv.String(data["agency"])
+	zbtime := gconv.Int64(data["zbtime"])
+	if zbtime == 0 {
+		zbtime = gconv.Int64(data["firsttime"])
+	}
+	property_form := []string{}
+	if data["property_form"] != nil {
+		property_form = gconv.Strings(data["property_form"])
+	}
+	bidamount := gconv.Float64(data["bidamount"])
+	budget := gconv.Float64(data["budget"])
+	money := bidamount
+	if money <= 0 {
+		money = budget
+	}
+
+	//物业分类
+	subclass := []string{}
+	if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil {
+		subclass = gconv.Strings(tag_subinformation)
+	} else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil {
+		subclass = gconv.Strings(tag_subinformation_ai)
+	}
+
+	//项目状态、商机类型
+	business_type := ""
+	project_bidstatus := 2
+	bidstatus := gconv.String(data["bidstatus"])
+	if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" {
+		project_bidstatus = 1
+		business_type = "合约到期项目"
+	} else if bidstatus == "废标" || bidstatus == "流标" {
+		project_bidstatus = 0
+	} else if bidstatus == "拟建" {
+		project_bidstatus = 4
+	} else if bidstatus == "招标" {
+		business_type = "招标项目"
+	}
+	//查询情报信息
+	ids := gconv.Strings(data["ids"])
+	// info := FindInfomationData(ids...) //情报信息查询
+	topscopeclass := []string{}
+	// s_topscopeclass := gconv.String(data["s_topscopeclass"])
+	// if s_topscopeclass != "" {
+	// 	topscopeclass = strings.Split(s_topscopeclass, ",")
+	// }
+	if data["topscopeclass"] != nil {
+		if topscopeclasss, ok := data["topscopeclass"].([]interface{}); ok {
+			for _, v := range topscopeclasss {
+				tclass := regLetter.ReplaceAllString(gconv.String(v), "") // 去除字母
+				topscopeclass = append(topscopeclass, tclass)
+			}
+		}
+	}
+	//查询法人信息
+	winners := []string{}
+	if winner != "" {
+		winners = strings.Split(winner, ",")
+	}
+	buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
+	//物业信息
+	t := &Transaction{
+		Project_Id:        id,
+		Project_Name:      gconv.String(data["projectname"]),
+		Project_Budget:    budget,
+		Project_Bidamount: bidamount,
+		Project_Money:     money,
+		Business_Type:     business_type,
+		Project_Bidstatus: project_bidstatus,
+		Info_Id:           gconv.String(data["sourceinfoid"]),
+		Info_Ids:          ids,
+		// Information_Id:    info.Id,
+		BuyerClass:    buyerclass,
+		Buyer:         buyer,
+		Topscopeclass: topscopeclass,
+		Winner:        winners,
+		Agency:        agency,
+		Buyer_Id:      buyer_id,
+		Winner_Id:     winner_ids,
+		Agency_Id:     agency_id,
+		Property_Form: property_form,
+		SubClass:      subclass,
+		MultiPackage:  gconv.Int(data["multipackage"]),
+		Area:          gconv.String(data["area"]),
+		City:          gconv.String(data["city"]),
+		District:      gconv.String(data["district"]),
+		ZbTime:        zbtime,
+		JgTime:        gconv.Int64(data["jgtime"]),
+		// StartTime:         info.Starttime,
+		// EndTime:           info.Endtime,
+		Create_Time: time.Now().Unix(),
+		Update_Time: time.Now().Unix(),
+		//
+		// From: "project",
+	}
+	result := map[string]interface{}{}
+	infomation, _ := bson.Marshal(t)
+	bson.Unmarshal(infomation, &result)
+	return result
+}
+
+// IncTransactionDataMgoToCkhAndEs 数据迁移
+func IncTransactionDataMgoToCkhAndEs() {
+	/*
+		数据根据update_time查询
+		1、采购意向数据(from=bidding)只插入
+		2、项目信息先查,有则更新,无则插入
+	*/
+	fmt.Println("开始执行迁移...")
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 1)
+	wg := &sync.WaitGroup{}
+	query := map[string]interface{}{
+		"update_time": map[string]interface{}{
+			"$gte": GetTime(0),
+		},
+	}
+	it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			from := gconv.String(tmp["from"])
+			delete(tmp, "from")     //无用字段删除
+			delete(tmp, "_id")      //无用字段删除
+			if !SaveDataToEs(tmp) { //保存、更新es
+				fmt.Println("数据保存es失败,项目project_id", tmp["project_id"])
+			}
+			if from == "bidding" { //采购意向、拟建,插入
+				SaveDataToClickHouse(tmp)
+			} else { //项目信息,更新,插入
+				UpdateOrSaveDataToClickHouse(tmp)
+			}
+		}(tmp)
+		if n%100 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	fmt.Println("迁移结束...")
+}
+
+type Infomation struct {
+	Id        string
+	Starttime int64
+	Endtime   int64
+}
+
+// FindInfomationData 情报信息查询
+func FindInfomationData(ids ...string) (info Infomation) {
+	for _, id := range ids {
+		query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
+		rows, err := CkhTool.Query(context.Background(), query, id)
+		if err != nil {
+			continue
+		}
+		for rows.Next() {
+			info = Infomation{}
+			if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil {
+				fmt.Println("查询情报信息异常:", id, err)
+			}
+			if info.Id != "" {
+				return
+			}
+			//break //目前只有一条结果
+		}
+	}
+	return
+}
+
+// FindEntInfoData 法人信息查询
+func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
+	winner_ids = []string{}
+	winnerMap := map[string]bool{} //记录所有中标单位
+	values := []interface{}{}
+	placeholders := []string{}
+	if buyer != "" {
+		placeholders = append(placeholders, "?")
+		values = append(values, buyer)
+	}
+	if len(winners) > 0 {
+		for _, w := range winners {
+			winnerMap[w] = true
+			placeholders = append(placeholders, "?")
+			values = append(values, w)
+		}
+	}
+	if agency != "" {
+		placeholders = append(placeholders, "?")
+		values = append(values, agency)
+	}
+	if len(values) == 0 {
+		return
+	}
+	query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ","))
+	rows, err := CkhTool.Query(context.Background(), query, values...)
+	if err != nil {
+		return
+	}
+	for rows.Next() {
+		var id, company_name string
+		if err := rows.Scan(&id, &company_name); err == nil {
+			if company_name == buyer {
+				buyer_id = id
+			} else if company_name == agency {
+				agency_id = id
+			} else if winnerMap[company_name] {
+				winner_ids = append(winner_ids, id)
+			}
+		} else {
+			fmt.Println("查询法人信息异常:", err, bid)
+		}
+	}
+	return
+}
+
+// UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
+func UpdateOrSaveDataToClickHouse(data map[string]interface{}) (err error) {
+	project_id := gconv.String(data["project_id"])
+	count := FindClickHouseByProjectId(project_id) //查询
+	if count > 0 {                                 //更新
+		delete(data, "create_time") //不更新创建时间
+		delete(data, "project_id")  //不更新项目id(主键)
+		err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
+		if err != nil {
+			fmt.Println("clickhouse更新失败", project_id, data)
+		}
+	} else { //插入
+		err = SaveDataToClickHouse(data)
+		if err != nil {
+			fmt.Println("clickhouse保存失败", project_id, data)
+		}
+	}
+	return
+}
+
+// SaveDataToClickHouse 数据保存clickhouse
+func SaveDataToClickHouse(data map[string]interface{}) error {
+	fields, placeholders := []string{}, []string{}
+	values := []interface{}{}
+	for k, v := range data {
+		fields = append(fields, k)
+		values = append(values, v)
+		placeholders = append(placeholders, "?")
+	}
+	query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(fields, ","), strings.Join(placeholders, ","))
+	return CkhTool.Exec(context.Background(), query, values...)
+}
+
+// FindClickHouseByProjectId 根据条件count clickhouse
+func FindClickHouseByProjectId(project_id string) int {
+	query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info_all")
+	row := CkhTool.QueryRow(context.Background(), query, project_id)
+	var count uint64
+	row.Scan(&count)
+	return gconv.Int(count)
+}
+
+// UpdateDataToClickHouse 数据更新clickhouse
+func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
+	sets := []string{}
+	values := []interface{}{}
+	for k, v := range data {
+		sets = append(sets, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	qs := []string{}
+	for k, v := range querys {
+		qs = append(qs, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info_all", strings.Join(sets, ","), strings.Join(qs, ","))
+	//query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
+	return CkhTool.Exec(context.Background(), query, values...)
+}
+
+// SaveDataToEs es存储
+func SaveDataToEs(data map[string]interface{}) bool {
+	tmp := map[string]interface{}{}
+	for k, v := range data {
+		if k == "project_id" {
+			k = "_id"
+		} else if k == "winner" || k == "winner_id" { //winner和winner_id无值不进es
+			if len(gconv.Strings(v)) == 0 {
+				continue
+			}
+		}
+		tmp[k] = v
+	}
+	err, result := Es.GetById(Config.Es.Index, gconv.String(tmp["_id"]))
+	if err == nil && len(result) > 0 { //存在,更新
+		tmp["create_time"] = result["create_time"] //不更新create_time
+	}
+	return Es.Save(Config.Es.Index, tmp)
+}
+
+func FindEntInfoData2(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
+	query := fmt.Sprintf(`SELECT id FROM %s WHERE company_name = ?`, Config.ClickHouse.DataBase+".ent_info")
+	if buyer != "" {
+		buyer_id = GetClickHouseData(bid, query, buyer)
+	}
+	if agency != "" {
+		agency_id = GetClickHouseData(bid, query, agency)
+	}
+	if len(winners) > 0 {
+		for _, w := range winners {
+			winner_id := GetClickHouseData(bid, query, w)
+			if winner_id != "" {
+				winner_ids = append(winner_ids, winner_id)
+			}
+		}
+	}
+	return
+}
+
+func GetClickHouseData(bid, query, value string) string {
+	rows, err := CkhTool.Query(context.Background(), query, value)
+	if err != nil {
+		return ""
+	}
+	for rows.Next() {
+		var id string
+		if err := rows.Scan(&id); err == nil {
+			return id
+		} else {
+			fmt.Println("查询情报信息异常:", err, bid)
+		}
+	}
+	return ""
+}
+
+/*// SaveTransactionData 保存增量物业信息
+func SaveTransactionData() {
+	fmt.Println("save projectset_wy...")
+	savearr := make([]map[string]interface{}, 100)
+	indexdb := 0
+	for {
+		select {
+		case v := <-TransactionSaveCache:
+			savearr[indexdb] = v
+			indexdb++
+			if indexdb == 100 {
+				Transaction_Ch <- true
+				go func(tmp []map[string]interface{}) {
+					defer func() {
+						<-Transaction_Ch
+					}()
+					MgoPro.SaveBulk("projectset_wy", tmp...)
+				}(savearr)
+				savearr = make([]map[string]interface{}, 100)
+				indexdb = 0
+			}
+		case <-time.After(30 * time.Second):
+			if indexdb > 0 {
+				Transaction_Ch <- true
+				go func(tmp []map[string]interface{}) {
+					defer func() {
+						<-Transaction_Ch
+					}()
+					MgoPro.SaveBulk("projectset_wy", tmp...)
+				}(savearr[:indexdb])
+				savearr = make([]map[string]interface{}, 100)
+				indexdb = 0
+			}
+		}
+	}
+}*/

+ 64 - 0
data_project_wy_all/util.go

@@ -0,0 +1,64 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"time"
+)
+
+const (
+	Date_Full_Layout    = "2006-01-02 15:04:05"
+	Date_Short_Layout   = "2006-01-02"
+	Date_Small_Layout   = "01-02"
+	Date_Time_Layout    = "15:04"
+	Date_yyyyMMdd       = "20060102"
+	Date_yyyyMMdd_Point = "2006.01.02"
+)
+
+func ReadConfig(config ...interface{}) {
+	var r *os.File
+	if len(config) > 1 {
+		filepath, _ := config[0].(string)
+		r, _ = os.Open(filepath)
+		defer r.Close()
+		bs, _ := ioutil.ReadAll(r)
+		json.Unmarshal(bs, config[1])
+	} else {
+		r, _ = os.Open("./config.json")
+		defer r.Close()
+		bs, _ := ioutil.ReadAll(r)
+		json.Unmarshal(bs, config[0])
+	}
+}
+
+func GetTime(day int) int64 {
+	nowTime := time.Now().AddDate(0, 0, day)
+	timeStr := FormatDate(&nowTime, Date_Short_Layout)
+	t, _ := time.ParseInLocation(Date_Short_Layout, timeStr, time.Local)
+	return t.Unix()
+}
+func FormatDateByInt64(src *int64, layout string) string {
+	var tmp int64
+	if *src > 0 {
+		if len(fmt.Sprint(*src)) >= 12 {
+			tmp = (*src) / 1000
+		} else {
+			tmp = (*src)
+		}
+	} else {
+		if len(fmt.Sprint(*src)) >= 13 {
+			tmp = (*src) / 1000
+		} else {
+			tmp = (*src)
+		}
+	}
+	date := time.Unix(tmp, 0)
+	return FormatDate(&date, layout)
+}
+
+// 日期格式化
+func FormatDate(src *time.Time, layout string) string {
+	return (*src).Local().Format(layout)
+}