Jianghan пре 2 година
родитељ
комит
ee7057d25f

+ 1 - 3
data_clear_sync/go.mod

@@ -1,4 +1,4 @@
-module data_sync
+module data_clear_sync
 
 go 1.16
 
@@ -8,5 +8,3 @@ require (
 	go.mongodb.org/mongo-driver v1.10.1
 	go.uber.org/zap v1.24.0
 )
-
-replace app.yhyue.com/data_processing/common_utils => ../../common_utils

+ 2 - 0
data_clear_sync/go.sum

@@ -1,3 +1,5 @@
+app.yhyue.com/data_processing/common_utils v0.0.0-20230510082025-6d175201278b h1:LuxfO97Gi4BzZlhNcZC9S3tk+5Z/0IMXpiyHMY4RcGk=
+app.yhyue.com/data_processing/common_utils v0.0.0-20230510082025-6d175201278b/go.mod h1:XMSY6tIzDnO/YQFjSb0OrOKl93ViGE0ejqcSCTlyHUs=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=

+ 78 - 0
data_project/common.toml

@@ -0,0 +1,78 @@
+[serve]
+udp = ":1782"
+thread = 3
+loadStart = 0
+validdays = 150
+statusdays = 15
+backupFlag = true
+siteColl = "site"
+
+[db]
+[db.mongoB]
+addr = "192.168.3.71:29099"
+dbname = "wjh"
+coll = "bidding"
+size = 15
+user = ""
+password = ""
+[db.mongoP]
+addr = "192.168.3.71:29099"
+dbname = "wjh"
+coll = "bidding"
+size = 15
+user = ""
+password = ""
+[db.mongoS]
+addr = "192.168.3.71:29099"
+dbname = "wjh"
+coll = "spider_compete"
+size = 15
+user = ""
+password = ""
+[db.redis]
+addr = "project=192.168.3.207:1679"
+addrQb = "qyxy_buyer=192.168.3.207:1679"
+dbQb = 3
+
+[db.es]
+addr = "http://192.168.3.241:9205"
+index = "projectset"
+itype = "projectset"
+pool = 10
+
+[nsq]
+addr = "192.168.3.166:4150"
+topic = "project-id"
+channel = "projectset"
+concurrent = 1
+
+[[udpNode]]
+addr = "192.168.20.104"
+port = 1783
+memo = "创建项目索引"
+[[udpNode]]
+addr = "127.0.0.1"
+port = 17833
+memo = "修改项目创建"
+
+[mail]
+send = false
+to = "wangjianghan@topnet.net.cn"
+api = "http://172.17.145.179:19281/_send/_mail"
+
+# 日志
+[log]
+# 日志路径,为空将输出控制台
+logpath = ""
+# log size (M)
+maxsize = 10
+# compress log
+compress = true
+# log save  time (day)
+maxage =  7
+# save total log file total
+maxbackups = 10
+# log level
+loglevel  = "debug"
+# text or json output
+format = "text"

+ 0 - 60
data_project/config.json

@@ -1,64 +1,4 @@
 {
-    "udpport": ":1782",
-    "loadStart": 0,
-	"validdays":150,
-    "statusdays": 15,
-    "redis-addr": "project=192.168.3.207:1679",
-	"mongodbServers": "192.168.3.207:29099",
-    "mongodbPoolSize": 10,
-    "mongodbName": "wjh",
-	"hints":"_id_1_publishtime_1",
-    "extractColl": "extract",
-    "extractColl1": "extract",
-    "projectColl": "projectset",
-    "backupFlag": true,
-    "siteColl": "site",
-    "thread": 1,
-    "jkmail": {
-        "to": "wangjianghan@topnet.net.cn",
-        "api": "http://172.17.145.179:19281/_send/_mail"
-    },
-    "redis": {
-        "dbname": "qyxy_buyer",
-        "addr": "192.168.3.207:1679",
-        "db": "3"
-    },
-    "bidding": {
-        "addr": "192.168.3.207:29099",
-        "dbname": "qfw",
-        "dbsize": 5,
-        "uname": "",
-        "upwd": ""
-    },
-    "spider": {
-        "addr": "192.168.3.207:29099",
-        "dbname": "wjh",
-        "dbsize": 2
-    },
-    "es": {
-        "addr": "http://127.0.0.1:9801",
-        "index": "projectset",
-        "itype": "projectset",
-        "pool": 10
-    },
-    "nsq_id": {
-        "addr": "192.168.3.166:4150",
-        "topic": "project-id",
-        "channel": "projectset",
-        "concurrent": 1
-    },
-    "nextNode": [
-        {
-            "addr": "192.168.20.104",
-            "port": 1483,
-            "memo": "创建项目索引new"
-        },
-        {
-            "addr": "127.0.0.1",
-            "port": 17833,
-            "memo": "修改项目创建new"
-        }
-    ],
     "rp_blacklist": ["采购项目", "项目", "工程", "开标", "建设项目", "改造", "扩容", "中标", "公告", "中标公告", "成交", "入围", "招标", "变更", "开标结果", "入围招标变更", "成交信息", "PPP", "资格预审", "资格", "预审", "采购", "供应商", "工程",
         "结果", "购置合同", "购置", "五标段", "六标段", "合同", "竞争性谈判", "竞争性", "谈判", "公示时间", "采购", "公开", "招标公告", "废标", "流标", "废标公告", "流标公告", "磋商", "询价", "项目联系人", "中标工期", "随机抽取", "全过程",
         "中标信息", "定标日期", "单价", "中标标的名称", "单位名称", "经济类", "法律类", "云南", "贵州", "其他事项", "其它事项", "法律", "行政法规", "单位", "学历", "修改", "澄清", "无法律", "法规", "成员", "说明", "公示期", "时至", "时止)",

+ 101 - 0
data_project/config/conf.go

@@ -0,0 +1,101 @@
+package config
+
+import (
+	"fmt"
+	"github.com/BurntSushi/toml"
+	"os"
+)
+
+var (
+	// Conf crocodile conf
+	Conf *conf
+)
+
+// Init Config
+func Init(conf string) {
+	_, err := toml.DecodeFile(conf, &Conf)
+	if err != nil {
+		fmt.Printf("Err %v", err)
+		os.Exit(1)
+	}
+}
+
+type conf struct {
+	Serve   serve
+	DB      db
+	Nsq     nsq
+	UdpNode []node
+	Mail    mail
+	Log     log
+}
+
+type serve struct {
+	Udp        string
+	LoadStart  int64
+	Thread     int
+	SiteColl   string
+	ValidDays  int
+	StatusDays int
+	BackupFlag bool
+}
+
+type node struct {
+	Addr string
+	Port int
+	Memo string
+}
+
+type mail struct {
+	Send bool
+	To   string
+	Api  string
+}
+
+// Log Config
+type log struct {
+	LogPath    string
+	MaxSize    int
+	Compress   bool
+	MaxAge     int
+	MaxBackups int
+	LogLevel   string
+	Format     string
+}
+
+type db struct {
+	MongoP mgo
+	MongoB mgo
+	MongoS mgo
+	Es     es
+	Redis  redis
+}
+
+type mgo struct {
+	Addr     string
+	Dbname   string
+	Coll     string
+	Size     int
+	User     string
+	Password string
+}
+
+type nsq struct {
+	Addr       string
+	Topic      string
+	Channel    string
+	Concurrent int
+}
+
+type redis struct {
+	Addr   string
+	AddrQb string
+	DbQb   int
+}
+
+type es struct {
+	Addr     string
+	Size     int
+	Index    string
+	User     string
+	Password string
+}

+ 33 - 0
data_project/config/conf_test.go

@@ -0,0 +1,33 @@
+package config
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+)
+
+var confs = `
+
+[serve]
+
+[db]
+
+[udp]
+
+[[udpNode]]
+addr = "192.168.20.104"
+port = 1783
+memo = "创建项目索引"
+[[udpNode]]
+addr = "127.0.0.1"
+port = 17833
+memo = "修改项目创建"
+`
+
+func TestInit(t *testing.T) {
+	testfile := "/tmp/crocodile.toml"
+	ioutil.WriteFile(testfile, []byte(confs), 0644)
+	Init(testfile)
+	t.Logf("%+v", Conf.UdpNode)
+	os.Remove(testfile)
+}

+ 6 - 3
data_project/go.mod

@@ -3,7 +3,10 @@ module project
 go 1.16
 
 require (
-	app.yhyue.com/data_processing/common_utils v0.0.0-20230519053026-75421af90e41 // indirect
-	github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9 // indirect
-	github.com/robfig/cron v1.2.0 // indirect
+	app.yhyue.com/data_processing/common_utils v0.0.0-20230519053026-75421af90e41
+	github.com/BurntSushi/toml v1.2.0
+	github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9
+	github.com/robfig/cron v1.2.0
+	go.mongodb.org/mongo-driver v1.11.6
+	go.uber.org/zap v1.24.0 // indirect
 )

+ 43 - 0
data_project/go.sum

@@ -2,17 +2,20 @@ app.yhyue.com/data_processing/common_utils v0.0.0-20230519053026-75421af90e41 h1
 app.yhyue.com/data_processing/common_utils v0.0.0-20230519053026-75421af90e41/go.mod h1:XMSY6tIzDnO/YQFjSb0OrOKl93ViGE0ejqcSCTlyHUs=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
 github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
 github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
 github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
 github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
 github.com/aws/aws-sdk-go v1.43.21/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
 github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
@@ -20,6 +23,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
@@ -39,7 +43,9 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
 github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -48,22 +54,33 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
 github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
 github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
+github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
 github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
 github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
+github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
 github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
 github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
@@ -72,26 +89,45 @@ github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYl
 github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
 github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
 github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
 github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
 github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K+o=
+go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY=
 go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
 go.opentelemetry.io/otel v1.5.0/go.mod h1:Jm/m+rNp/z0eqJc74H7LPwQ3G87qkU/AnnAydAjSAHk=
 go.opentelemetry.io/otel/trace v1.5.0/go.mod h1:sq55kfhjXYr1zVSyexg0w1mpa03AYXR5eyTkB9NPPdE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
 go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.22.0 h1:Zcye5DUgBloQ9BaT4qc9BnjOFog5TvBSAGkJ3Nf70c0=
 go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
+go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -115,6 +151,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -131,6 +168,7 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -142,6 +180,7 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -163,13 +202,17 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

+ 70 - 57
data_project/init.go

@@ -2,13 +2,15 @@ package main
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/redis"
 	"app.yhyue.com/data_processing/common_utils/udp"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson/primitive"
-	"log"
 	"math"
+	"os"
+	"project/config"
 	"reflect"
 	"regexp"
 	"sort"
@@ -17,11 +19,9 @@ import (
 )
 
 var (
-	Sysconfig                                      map[string]interface{} //读取配置文件
-	MongoTool, MgoBidding, MgoSpider               *mongodb.MongodbSim    //mongodb连接
-	ExtractColl, ProjectColl, BackupColl, SiteColl string                 //抽取表、项目表、项目快照表、站点表
-	ExtractColl1                                   string
-	Thread                                         int //配置项线程数
+	MgoP, MgoB, MgoS                               *mongodb.MongodbSim //mongodb连接
+	BiddingColl, ProjectColl, BackupColl, SiteColl string              //抽取表、项目表、项目快照表、站点表
+	Thread                                         int                 //配置项线程数
 	BlackList                                      []interface{}
 	BlaskListMap                                   map[string]bool
 
@@ -56,60 +56,55 @@ var (
 )
 
 func init() {
-	util.ReadConfig(&Sysconfig)
-	MongoTool = &mongodb.MongodbSim{
-		MongodbAddr: Sysconfig["mongodbServers"].(string),
-		Size:        util.IntAll(Sysconfig["mongodbPoolSize"]),
-		DbName:      Sysconfig["mongodbName"].(string),
+
+	config.Init("./common.toml")
+	InitLog()
+
+	MgoP = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoP.Addr,
+		Size:        config.Conf.DB.MongoP.Size,
+		DbName:      config.Conf.DB.MongoP.Dbname,
 	}
-	MongoTool.InitPool()
-	bidding, _ := Sysconfig["bidding"].(map[string]interface{})
-	MgoBidding = &mongodb.MongodbSim{
-		MongodbAddr: bidding["addr"].(string),
-		Size:        util.IntAll(bidding["dbsize"]),
-		DbName:      bidding["dbname"].(string),
-		UserName:    bidding["uname"].(string),
-		Password:    bidding["upwd"].(string),
+	MgoP.InitPool()
+	MgoB = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoB.Addr,
+		Size:        config.Conf.DB.MongoB.Size,
+		DbName:      config.Conf.DB.MongoB.Dbname,
+		UserName:    config.Conf.DB.MongoB.User,
+		Password:    config.Conf.DB.MongoB.Password,
 	}
-	MgoBidding.InitPool()
-	spider, _ := Sysconfig["spider"].(map[string]interface{})
-	MgoSpider = &mongodb.MongodbSim{
-		MongodbAddr: spider["addr"].(string),
-		Size:        util.IntAll(spider["dbsize"]),
-		DbName:      spider["dbname"].(string),
+	MgoB.InitPool()
+	MgoS = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoS.Addr,
+		Size:        config.Conf.DB.MongoS.Size,
+		DbName:      config.Conf.DB.MongoS.Dbname,
 	}
-	MgoSpider.InitPool()
-
-	ExtractColl = Sysconfig["extractColl"].(string)
-	ExtractColl1 = Sysconfig["extractColl1"].(string)
-	ProjectColl = Sysconfig["projectColl"].(string)
-	BackupColl = Sysconfig["projectColl"].(string) + "_back"
-	SiteColl = Sysconfig["siteColl"].(string)
-	Thread = util.IntAll(Sysconfig["thread"])
-	//NextNode = Sysconfig["nextNode"].([]interface{})
-	udpport, _ := Sysconfig["udpport"].(string)
-	udpclient = udp.UdpClient{Local: udpport, BufSize: 1024}
+	MgoS.InitPool()
+
+	BiddingColl = config.Conf.DB.MongoB.Coll
+	ProjectColl = config.Conf.DB.MongoP.Coll
+	BackupColl = config.Conf.DB.MongoP.Coll + "_back"
+	SiteColl = config.Conf.Serve.SiteColl
+	Thread = config.Conf.Serve.Thread
+	udpclient = udp.UdpClient{Local: config.Conf.Serve.Udp, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", udpport)
+	log.Info("udp init port:" + udpclient.Local)
 
-	BlackList = Sysconfig["rp_blacklist"].([]interface{})
+	//"qyxy_buyer=172.17.4.189:8379"
+	RedisCode = "qyxy_buyer"
+	redis.InitRedis1(config.Conf.DB.Redis.AddrQb, config.Conf.DB.Redis.DbQb) // 采购单位与中标单位初次合作项目
+	redis.InitRedis1(config.Conf.DB.Redis.Addr, 0)
+
+	cof := make(map[string]interface{})
+	util.ReadConfig(&cof)
+	BlackList = cof["rp_blacklist"].([]interface{})
 	BlaskListMap = make(map[string]bool)
 	for _, v := range BlackList {
 		BlaskListMap[util.ObjToString(v)] = true
 	}
-
-	redisC := Sysconfig["redis-addr"].(string)
-	redisCon := Sysconfig["redis"].(map[string]interface{})
-	RedisCode = util.ObjToString(redisCon["dbname"])
-	redis_addr := util.ObjToString(redisCon["addr"])
-	redis_db := util.IntAll(redisCon["db"])
-	//"qyxy_buyer=172.17.4.189:8379"
-	redis.InitRedis1(fmt.Sprintf("%s=%s", RedisCode, redis_addr), redis_db) // 采购单位与中标单位初次合作项目
-	redis.InitRedis1(redisC, 0)
-
-	initWinnerRegexp()
-	initBuyerRegexp()
-	initAgencyRegexp()
+	initWinnerRegexp(cof)
+	initBuyerRegexp(cof)
+	initAgencyRegexp(cof)
 
 	//加载项目数据
 	//---不能通过
@@ -500,8 +495,8 @@ func CosineSimilar(srcWords1, dstWords1 string) float64 {
 	return v1
 }
 
-func initWinnerRegexp() {
-	winRegMap := Sysconfig["winner"].(map[string]interface{})
+func initWinnerRegexp(cof map[string]interface{}) {
+	winRegMap := cof["winner"].(map[string]interface{})
 	//preRegexps := winRegMap["pre_regexp"].([]interface{})
 	//backRegexps := winRegMap["back_regexp"].([]interface{})
 	//backRepRegexps := winRegMap["back_rep_regexp"].([]interface{})
@@ -541,8 +536,8 @@ func initWinnerRegexp() {
 	BlackRegexp["winner"] = winBlackRegexps
 }
 
-func initBuyerRegexp() {
-	buyRegMap := Sysconfig["buyer"].(map[string]interface{})
+func initBuyerRegexp(cof map[string]interface{}) {
+	buyRegMap := cof["buyer"].(map[string]interface{})
 	//preRegexps := buyRegMap["pre_regexp"].([]interface{})
 	//backRegexps := buyRegMap["back_regexp"].([]interface{})
 	//backRepRegexps := buyRegMap["back_rep_regexp"].([]interface{})
@@ -582,8 +577,8 @@ func initBuyerRegexp() {
 	BlackRegexp["buyer"] = winBlackRegexps
 }
 
-func initAgencyRegexp() {
-	buyRegMap := Sysconfig["agency"].(map[string]interface{})
+func initAgencyRegexp(cof map[string]interface{}) {
+	buyRegMap := cof["agency"].(map[string]interface{})
 	//preRegexps := buyRegMap["pre_regexp"].([]interface{})
 	//backRegexps := buyRegMap["back_regexp"].([]interface{})
 	//backRepRegexps := buyRegMap["back_rep_regexp"].([]interface{})
@@ -622,3 +617,21 @@ func initAgencyRegexp() {
 	}
 	BlackRegexp["agency"] = winBlackRegexps
 }
+
+func InitLog() {
+	logcfg := config.Conf.Log
+
+	err := log.InitLog(
+		log.Path(logcfg.LogPath),
+		log.Level(logcfg.LogLevel),
+		log.Compress(logcfg.Compress),
+		log.MaxSize(logcfg.MaxSize),
+		log.MaxBackups(logcfg.MaxBackups),
+		log.MaxAge(logcfg.MaxAge),
+		log.Format(logcfg.Format),
+	)
+	if err != nil {
+		fmt.Printf("InitLog failed: %v\n", err)
+		os.Exit(1)
+	}
+}

+ 18 - 17
data_project/load_data.go

@@ -2,9 +2,10 @@ package main
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/redis"
 	"encoding/json"
-	"log"
+	"go.uber.org/zap"
 	"strings"
 	"sync"
 	"time"
@@ -12,17 +13,17 @@ import (
 
 //  初始加载数据,默认加载最近6个月的数据
 func (p *ProjectTask) loadData(starttime int64) {
-	log.Println("load project start..", starttime)
+	log.Info("load project start..", zap.Int64("starttime", starttime))
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
-	sess := MongoTool.GetMgoConn()
-	defer MongoTool.DestoryMongoConn(sess)
+	sess := MgoP.GetMgoConn()
+	defer MgoP.DestoryMongoConn(sess)
 	loadOver := make(chan bool)
 	q := map[string]interface{}{
 		"lasttime": map[string]interface{}{"$gte": starttime},
 	}
 	field := map[string]interface{}{"list": 0}
-	it := sess.DB(MongoTool.DbName).C(p.coll).Find(&q).Select(field).Iter()
+	it := sess.DB(MgoP.DbName).C(p.coll).Find(&q).Select(field).Iter()
 	n, count := 0, 0
 	pool := make(chan *ProjectCache, 1000)
 	go func() {
@@ -31,7 +32,7 @@ func (p *ProjectTask) loadData(starttime int64) {
 			case tmp := <-pool:
 				n++
 				if n%10000 == 0 {
-					log.Println("current", n, "\n", tmp.Id, len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref)) //, tmp.ProjectName, tmp.MPN, tmp.ProjectCode, tmp.MPC, tmp.Buyer)
+					util.Debug("current", n, "mapPn", len(p.mapPn), "mapPc", len(p.mapPc), "mapPb", len(p.mapPb), "mapHref", len(p.mapHref)) //, tmp.ProjectName, tmp.MPN, tmp.ProjectCode, tmp.MPC, tmp.Buyer)
 				}
 				if tmp != nil {
 					id := tmp.Id.Hex()
@@ -108,19 +109,19 @@ func (p *ProjectTask) loadData(starttime int64) {
 	wg.Wait()
 	time.Sleep(2 * time.Second)
 	loadOver <- true
-	log.Println("load project over..", n)
+	log.Info("load project over..", zap.Int("n", n))
 }
 
 func (p *ProjectTask) loadSite() {
-	log.Println("load site start..")
+	log.Info("load site start..")
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
 	p.mapSiteLock.Lock()
 	defer p.mapSiteLock.Unlock()
-	sess := MongoTool.GetMgoConn()
-	defer MongoTool.DestoryMongoConn(sess)
+	sess := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess)
 	q := map[string]interface{}{}
-	it := sess.DB(MongoTool.DbName).C(SiteColl).Find(&q).Iter()
+	it := sess.DB(MgoS.DbName).C(SiteColl).Find(&q).Iter()
 	n := 0
 	pool := make(chan *Site, 100)
 	over := make(chan bool)
@@ -153,7 +154,7 @@ func (p *ProjectTask) loadSite() {
 	}
 	time.Sleep(2 * time.Second)
 	over <- true
-	log.Println("load site over..", n)
+	log.Info("load site over..", zap.Int("n", n))
 
 }
 
@@ -194,16 +195,16 @@ func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectCache) {
 
 //  加载spidercode数据,isflow字段
 func (p *ProjectTask) loadSpiderCode() {
-	log.Println("load spider code start..")
+	log.Info("load spider code start..")
 	p.findLock.Lock()
 	defer p.findLock.Unlock()
 	p.mapSpiderLock.Lock()
 	defer p.mapSpiderLock.Unlock()
-	sess := MgoSpider.GetMgoConn()
-	defer MgoSpider.DestoryMongoConn(sess)
+	sess := MgoS.GetMgoConn()
+	defer MgoS.DestoryMongoConn(sess)
 	q := map[string]interface{}{}
 	field := map[string]interface{}{"code": 1, "isflow": 1}
-	it := sess.DB(MgoSpider.DbName).C("luaconfig").Find(&q).Select(field).Iter()
+	it := sess.DB(MgoS.DbName).C("luaconfig").Find(&q).Select(field).Iter()
 	n := 0
 	pool := make(chan map[string]interface{}, 100)
 	over := make(chan bool)
@@ -231,6 +232,6 @@ func (p *ProjectTask) loadSpiderCode() {
 	}
 	time.Sleep(2 * time.Second)
 	over <- true
-	log.Println("load spider over..", n)
+	log.Info("load spider over..", zap.Int("n", n))
 
 }

+ 29 - 38
data_project/main.go

@@ -3,15 +3,17 @@ package main
 import (
 	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/elastic"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/nsq"
 	"app.yhyue.com/data_processing/common_utils/udp"
 	"encoding/json"
 	"fmt"
-	"log"
+	"go.uber.org/zap"
 	"net"
 	"os"
 	"os/signal"
+	"project/config"
 	"strings"
 	"syscall"
 	"time"
@@ -24,8 +26,6 @@ var (
 	toaddr       = []*net.UDPAddr{} //下节点对象
 	ChSign       = make(chan os.Signal)
 	Es           *elastic.Elastic
-	Index        string
-	Itype        string
 	Mcmer        *gonsq.Consumer
 
 	sid, eid string //测试使用
@@ -36,20 +36,19 @@ var (
 func init() {
 	signal.Notify(ChSign)
 	go DealSign()
-	nextNode := util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
-	for _, m := range nextNode {
+
+	for _, m := range config.Conf.UdpNode {
 		toaddr = append(toaddr, &net.UDPAddr{
-			IP:   net.ParseIP(m["addr"].(string)),
-			Port: util.IntAll(m["port"]),
+			IP:   net.ParseIP(m.Addr),
+			Port: util.IntAll(m.Port),
 		})
 	}
-	es := Sysconfig["es"].(map[string]interface{})
 	Es = &elastic.Elastic{
-		S_esurl: util.ObjToString(es["addr"]),
-		I_size:  util.IntAllDef(es["pool"], 10),
+		S_esurl:  config.Conf.DB.Es.Addr,
+		I_size:   config.Conf.DB.Es.Size,
+		Username: config.Conf.DB.Es.User,
+		Password: config.Conf.DB.Es.Password,
 	}
-	Index = util.ObjToString(es["index"])
-	Itype = util.ObjToString(es["itype"])
 	Es.InitElasticSize()
 
 	P_QL = NewPT()
@@ -67,7 +66,7 @@ func DealSign() {
 		case sign := <-ChSign:
 			//log.Println("receive:", sign)
 			if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
-				log.Println("receice signal..,start close iter")
+				log.Info("receice signal..,start close iter")
 				if P_QL.Brun {
 					queryClose <- true
 					select {
@@ -75,8 +74,7 @@ func DealSign() {
 					case <-time.After(30 * time.Second):
 					}
 				}
-				util.ReadConfig(&Sysconfig)
-				log.Println("signal deal over")
+				log.Info("signal deal over")
 			}
 		}
 	}
@@ -86,13 +84,10 @@ func main() {
 
 	P_QL.loadSpiderCode()
 	P_QL.loadSite()
-	if Sysconfig["loadStart"] != nil {
-		loadStart := util.Int64All(Sysconfig["loadStart"])
-		if loadStart > -1 {
-			P_QL.loadData(loadStart)
-		}
+	if config.Conf.Serve.LoadStart > 0 {
+		P_QL.loadData(config.Conf.Serve.LoadStart)
 	}
-	//go checkMapJob()
+	go checkMapJob()
 	go P_QL.nsqMethod()
 
 	for {
@@ -185,22 +180,19 @@ func mainT() {
 
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
-		log.Println("sid, eid参数不能为空")
+		log.Info("sid, eid参数不能为空")
 		os.Exit(0)
 	}
 	mapinfo["gtid"] = sid
 	mapinfo["lteid"] = eid
 	mapinfo["stype"] = "ql"
 	mapinfo["ip"] = "127.0.0.1"
-	mapinfo["port"] = Sysconfig["udpport"]
+	mapinfo["port"] = "1782"
 
 	P_QL.loadSpiderCode()
 	P_QL.loadSite()
-	if Sysconfig["loadStart"] != nil {
-		loadStart := util.Int64All(Sysconfig["loadStart"])
-		if loadStart > -1 {
-			P_QL.loadData(loadStart)
-		}
+	if config.Conf.Serve.LoadStart > 0 {
+		P_QL.loadData(config.Conf.Serve.LoadStart)
 	}
 	P_QL.loadSite()
 	P_QL.currentType = mapinfo["stype"].(string)
@@ -215,7 +207,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	case udp.OP_TYPE_DATA: //上个节点的数据
 		var mapInfo map[string]interface{}
 		err := json.Unmarshal(data, &mapInfo)
-		log.Println("err:", err, "mapInfo:", mapInfo)
+		log.Info("udp---", zap.Any("mapInfo:", mapInfo))
 		if err != nil {
 			_ = udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
 		} else if mapInfo != nil {
@@ -230,7 +222,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		ok := string(data)
 		if ok != "" {
 			udptaskmap.Delete(ok)
-			log.Println("ok:", ok)
+			log.Info("ok:" + ok)
 		}
 	}
 }
@@ -238,28 +230,27 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 // @Description nsq处理id不变,内容替换的竞品数据
 // @Author J 2022/8/10 11:40
 func (p *ProjectTask) nsqMethod() {
-	cof := Sysconfig["nsq_id"].(map[string]interface{})
 	var err error
 	Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
 		IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
-		Addr:         util.ObjToString(cof["addr"]),
+		Addr:         config.Conf.Nsq.Addr,
 		ConnectType:  0, //默认连接nsqd
-		Topic:        util.ObjToString(cof["topic"]),
-		Channel:      util.ObjToString(cof["channel"]),
-		Concurrent:   util.IntAllDef(cof["concurrent"], 1), //并发数
+		Topic:        config.Conf.Nsq.Topic,
+		Channel:      config.Conf.Nsq.Channel,
+		Concurrent:   config.Conf.Nsq.Concurrent, //并发数
 	})
 	if err != nil {
-		util.Debug("nsqMethod err: ", err.Error())
+		log.Info("nsqMethod err: " + err.Error())
 	}
 	for {
 		select {
 		case obj := <-Mcmer.Ch: //从通道读取即可
-			util.Debug("project nsq: " + fmt.Sprint(obj))
+			log.Info("project nsq: " + fmt.Sprint(obj))
 			id := strings.Split(util.ObjToString(obj), "=")
 			if mongodb.IsObjectIdHex(id[1]) {
 				p.taskinfo(id[1])
 			} else {
-				util.Debug("jy nsq id err: ", id[1])
+				log.Info("jy nsq id err: " + id[1])
 			}
 		}
 	}

+ 4 - 3
data_project/project.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/redis"
 	"encoding/json"
@@ -688,13 +689,13 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info, pInfo *ProjectCache, weight int, comStr string, ex int) {
 	if p.currentType != "updateInfo" {
 		if BinarySearch(pInfo.Ids, thisinfo.Id) > -1 {
-			util.Debug("repeat", thisinfo.Id, ",pid", pInfo.Id)
+			log.Info("repeat" + thisinfo.Id + ",pid" + pInfo.Id.Hex())
 			return
 		}
 	}
 	pdata := redis.Get("project", pInfo.Id.Hex())
 	if pdata == nil {
-		util.Debug("redis err, not find project ", pInfo.Id.Hex())
+		log.Info("redis err, not find project " + pInfo.Id.Hex())
 		return
 	}
 	projectMap := pdata.(map[string]interface{})
@@ -702,7 +703,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	var project *Project
 	err := json.Unmarshal(bys, &project)
 	if err != nil {
-		util.Debug("project Unmarshal err,", err)
+		log.Info("project Unmarshal err," + err.Error())
 		return
 	}
 	set := map[string]interface{}{}

+ 49 - 52
data_project/task.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/redis"
 	"app.yhyue.com/data_processing/common_utils/udp"
@@ -10,7 +11,8 @@ import (
 	"github.com/goinggo/mapstructure"
 	"github.com/robfig/cron"
 	"go.mongodb.org/mongo-driver/bson"
-	"log"
+	"go.uber.org/zap"
+	"project/config"
 	"regexp"
 	"strings"
 	"sync"
@@ -117,8 +119,8 @@ func NewPT() *ProjectTask {
 		//saveSign:   make(chan bool, 1),
 		//updateSign: make(chan bool, 1),
 		coll:       ProjectColl,
-		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
-		statusTime: int64(util.IntAllDef(Sysconfig["statusdays"], 15) * 86400),
+		validTime:  int64(util.IntAllDef(config.Conf.Serve.ValidDays, 150)) * 86400,
+		statusTime: int64(util.IntAllDef(config.Conf.Serve.StatusDays, 15) * 86400),
 		jgTime:     int64(util.IntAllDef(7, 7) * 86400),
 	}
 	return p
@@ -141,7 +143,7 @@ func (p *ProjectTask) updateAllQueue() {
 					defer func() {
 						<-sp
 					}()
-					MongoTool.UpSertBulk(p.coll, arru...)
+					MgoP.UpSertBulk(p.coll, arru...)
 				}(arru)
 				arru = make([][]map[string]interface{}, p.saveSize)
 				indexu = 0
@@ -153,7 +155,7 @@ func (p *ProjectTask) updateAllQueue() {
 					defer func() {
 						<-sp
 					}()
-					MongoTool.UpSertBulk(p.coll, arru...)
+					MgoP.UpSertBulk(p.coll, arru...)
 				}(arru[:indexu])
 				arru = make([][]map[string]interface{}, p.saveSize)
 				indexu = 0
@@ -182,7 +184,7 @@ func (p *ProjectTask) clearMem() {
 			//所有内存中的项目信息
 			p.AllIdsMapLock.Lock()
 			p.mapHrefLock.Lock()
-			log.Println("清除开始")
+			log.Info("清除开始")
 			//清除计数
 			clearNum := 0
 			for kHref, pid := range p.mapHref { //删除mapHref,p.AllIdsMap删除之前执行
@@ -244,7 +246,8 @@ func (p *ProjectTask) clearMem() {
 			p.AllIdsMapLock.Unlock()
 			p.findLock.Unlock()
 			SingleClear = 0
-			log.Println("清除完成:", clearNum, len(p.AllIdsMap), len(p.mapPn), len(p.mapPc), len(p.mapPb), len(p.mapHref))
+			log.Info("清除完成:", zap.Int("clearNum", clearNum), zap.Int("AllIdsMap:", len(p.AllIdsMap)), zap.Int("mapPn:", len(p.mapPn)),
+				zap.Int("mapPc:", len(p.mapPc)), zap.Int("mapPb:", len(p.mapPb)), zap.Int("mapHref:", len(p.mapHref)))
 		} else {
 			p.clearContimes++
 		}
@@ -277,9 +280,13 @@ func (p *ProjectTask) taskQl(udpInfo map[string]interface{}) {
 			q["_id"] = idmap
 		}
 	}
+	if util.ObjToString(udpInfo["coll"]) != "" {
+		BiddingColl = "bidding_back"
+	} else {
+		BiddingColl = "bidding"
+	}
 	//生成查询语句执行
-	log.Println("查询语句:", q)
-	p.enter(MongoTool.DbName, ExtractColl, q)
+	p.enter(MgoB.DbName, BiddingColl, q)
 
 }
 
@@ -289,11 +296,11 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 	//1、检查pubilshtime索引
 	db, _ := udpInfo["db"].(string)
 	if db == "" {
-		db = MongoTool.DbName
+		db = MgoB.DbName
 	}
 	coll, _ := udpInfo["coll"].(string)
 	if coll == "" {
-		coll = ExtractColl
+		BiddingColl = "bidding"
 	}
 	thread := util.IntAllDef(Thread, 1)
 	if thread > 0 {
@@ -309,7 +316,7 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 			"$lte": mongodb.StringTOBsonId(lteid),
 		},
 	}
-	p.enter(db, coll, q)
+	p.enter(db, BiddingColl, q)
 	if udpInfo["stop"] == nil {
 		for i := 0; i < 1; i++ {
 			sp <- true
@@ -318,7 +325,7 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 			<-sp
 		}
 	}
-	log.Println("保存完成,生索引", p.pici)
+	log.Info("保存完成,生索引", zap.Int64("pici:", p.pici))
 	time.Sleep(5 * time.Second)
 	nextNode(udpInfo, p.pici)
 }
@@ -327,33 +334,32 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 func (p *ProjectTask) taskUpdateInfo(udpInfo map[string]interface{}) {
 	defer util.Catch()
 	infoid := udpInfo["infoid"].(string)
-	infoMap, _ := MgoBidding.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
+	infoMap, _ := MgoB.FindById(util.ObjToString(udpInfo["coll"]), infoid, nil)
 	if (*infoMap)["modifyinfo"] == nil {
-		util.Debug("does not exist modifyinfo ---,", infoid)
+		log.Info("does not exist modifyinfo ---," + infoid)
 		return
 	}
 	client := Es.GetEsConn()
 	defer Es.DestoryEsConn(client)
 	esquery := `{"query": {"bool": {"must": [{"match": {"ids": "` + infoid + `"}}]}}}`
-	data := Es.Get(Index, esquery)
+	data := Es.Get(config.Conf.DB.Es.Index, esquery)
 	if len(*data) > 0 {
 		pid := util.ObjToString(((*data)[0])["_id"])
 		p.updateJudge(*infoMap, pid)
 	} else {
-		util.Debug("not find project---,", infoid)
+		log.Info("not find project---," + infoid)
 	}
 }
 
 func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 	defer util.Catch()
-	util.Debug(udpInfo)
 	pid := util.ObjToString(udpInfo["pid"])
 	updateMap := util.ObjToMap(udpInfo["updateField"])
 	if pid == "" || len(*updateMap) == 0 {
-		util.Debug("参数有误")
+		log.Error("参数有误")
 		return
 	}
-	proMap, _ := MongoTool.FindById(ProjectColl, pid, nil)
+	proMap, _ := MgoP.FindById(ProjectColl, pid, nil)
 	if len(*proMap) > 1 {
 		(*proMap)["reason"] = "直接修改项目字段信息"
 		backupPro(*proMap)
@@ -369,8 +375,7 @@ func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 			modifyInfo[k] = true
 		}
 		updataMap["modifyinfo"] = modifyInfo
-		util.Debug(updataMap)
-		bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
+		bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updataMap})
 		if bol {
 			//es索引
 			by, _ := json.Marshal(map[string]interface{}{
@@ -381,14 +386,13 @@ func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 					}},
 				"stype": "project",
 			})
-			util.Debug(string(by))
 			_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 		}
 		// 内存
 		var pro ProjectCache
 		err := mapstructure.Decode(proMap, &pro)
 		if err != nil {
-			util.Debug(err)
+			log.Error(err.Error())
 		}
 		p.AllIdsMapLock.Lock()
 		if v, ok := p.AllIdsMap[pid]; ok {
@@ -396,27 +400,25 @@ func (p *ProjectTask) taskUpdatePro(udpInfo map[string]interface{}) {
 		}
 		p.AllIdsMapLock.Unlock()
 	} else {
-		util.Debug("Not find project---", pid)
+		log.Info("Not find project---" + pid)
 	}
 }
 
 func (p *ProjectTask) delInfoPro(udpInfo map[string]interface{}) {
 	defer util.Catch()
-	util.Debug(udpInfo)
 	infoid := util.ObjToString(udpInfo["infoid"])
 	if infoid == "" {
-		util.Debug("参数有误")
 		return
 	}
 	client := Es.GetEsConn()
 	defer Es.DestoryEsConn(client)
 	esquery := `{"query": {"bool": {"must": [{"term": {"ids": "` + infoid + `"}}]}}}`
-	data := Es.Get(Index, esquery)
+	data := Es.Get(config.Conf.DB.Es.Index, esquery)
 	if len(*data) > 0 {
 		pid := util.ObjToString(((*data)[0])["_id"])
 		p.delJudge(infoid, pid)
 	} else {
-		util.Debug("not find project---,", infoid)
+		log.Info("not find project---," + infoid)
 	}
 }
 
@@ -444,9 +446,9 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	countRepeat := 0
 
 	pool := make(chan bool, p.thread)
-	util.Debug("start project", q, p.pici)
-	sess := MongoTool.GetMgoConn()
-	defer MongoTool.DestoryMongoConn(sess)
+	log.Info("start project", zap.Any("q:", q), zap.String("coll:", coll), zap.Int64("pici", p.pici))
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
 	infoPool := make(chan map[string]interface{}, 2000)
 	over := make(chan bool)
 	go func() {
@@ -489,7 +491,7 @@ func (p *ProjectTask) enter(db, coll string, q map[string]interface{}) {
 	fields := map[string]interface{}{"detail": 0, "contenthtml": 0, "jsondata": 0, "regions_log": 0, "field_source": 0}
 	if p.currentType == "project" || p.currentType == "project_history" {
 		c, _ := sess.DB(db).C(coll).Find(q).Count()
-		util.Debug("共查询:", c, "条")
+		log.Info(fmt.Sprintf("共查询: %d条", c))
 	}
 	ms := sess.DB(db).C(coll).Find(q).Select(fields).Sort("publishtime")
 	query := ms.Iter()
@@ -498,8 +500,8 @@ L:
 	for {
 		select {
 		case <-queryClose:
-			log.Println("receive interrupt sign")
-			log.Println("close iter..", lastid, query.Cursor.Close(nil))
+			log.Info("receive interrupt sign")
+			log.Info("close iter..", zap.Any("lastid:", lastid), zap.Any("err:", query.Cursor.Close(nil)))
 			queryCloseOver <- true
 			break L
 		default:
@@ -508,11 +510,11 @@ L:
 				lastid = tmp["_id"]
 				if P_QL.currentType == "ql" {
 					if count%50000 == 0 {
-						util.Debug("current", count, lastid)
+						log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
 					}
 				} else {
 					if count%2000 == 0 {
-						util.Debug("current", count, lastid)
+						log.Info("current---", zap.Int("count", count), zap.String("lastid", mongodb.BsonIdToSId(lastid)))
 					}
 				}
 				//extracttype -1: 重复,1: 不重复
@@ -531,7 +533,7 @@ L:
 				} else {
 					countRepeat++
 					//if P_QL.currentType == "project" {
-					//	util.Debug("repeat err---", tmp["_id"])
+					//	log.Info("repeat err---", tmp["_id"])
 					//}
 				}
 				count++
@@ -546,7 +548,7 @@ L:
 	for n := 0; n < p.thread; n++ {
 		pool <- true
 	}
-	log.Println("所有线程执行完成...", count, countRepeat)
+	log.Info("所有线程执行完成...", zap.Int("count:", count), zap.Int("countRepeat", countRepeat))
 
 }
 
@@ -729,7 +731,7 @@ func ParseInfo(tmp map[string]interface{}) (info *Info) {
 }
 
 func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
-	tmpPro, _ := MongoTool.FindById(ProjectColl, pid, nil)
+	tmpPro, _ := MgoB.FindById(ProjectColl, pid, nil)
 	modifyProMap := make(map[string]interface{}) // 修改项目的字段
 	if modifyMap, ok := infoMap["modifyinfo"].(map[string]interface{}); ok {
 		for k := range modifyMap {
@@ -739,7 +741,7 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 		}
 	}
 	if len(modifyProMap) == 0 {
-		util.Debug("修改招标公告信息不需要修改项目信息字段", infoMap["_id"])
+		log.Info("修改招标公告信息不需要修改项目信息字段", zap.Any("id", infoMap["_id"]))
 		return
 	}
 	p.AllIdsMapLock.Lock()
@@ -772,17 +774,17 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 				if pid == tempId {
 					p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
 				} else {
-					util.Debug("projecthref data id err---pid="+pid, "---"+tempId)
+					log.Info("projecthref data id err---pid=" + pid + "---" + tempId)
 				}
 			} else {
 				f := modifyEle(modifyProMap)
 				if f {
 					//合并、修改
-					util.Debug("合并修改更新", "----------------------------")
+					log.Info("合并修改更新" + "----------------------------")
 					p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
 				} else {
 					//修改
-					util.Debug("修改更新", "----------------------------")
+					log.Info("修改更新" + "----------------------------")
 					p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
 				}
 			}
@@ -790,17 +792,17 @@ func (p *ProjectTask) updateJudge(infoMap map[string]interface{}, pid string) {
 			f := modifyEle(modifyProMap)
 			if f {
 				//合并、修改
-				util.Debug("合并修改更新", "----------------------------")
+				log.Info("合并修改更新" + "----------------------------")
 				p.mergeAndModify(pid, index, position, infoMap, *tmpPro, modifyProMap)
 			} else {
 				//修改
-				util.Debug("修改更新", "----------------------------")
+				log.Info("修改更新" + "----------------------------")
 				p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
 			}
 		}
 	} else {
 		// 周期外
-		util.Debug("周期外数据直接修改", "----------------------------")
+		log.Info("周期外数据直接修改" + "----------------------------")
 		p.modifyUpdate(pid, index, position, *tmpPro, modifyProMap)
 	}
 }
@@ -877,12 +879,7 @@ func (p *ProjectTask) fillInPlace(tmp map[string]interface{}) {
 func deleteSlice(arr []string, v, stype string) []string {
 	for k, v1 := range arr {
 		if v1 == v {
-			ts := time.Now().Unix()
 			arr = append(arr[:k], arr[k+1:]...)
-			rt := time.Now().Unix() - ts
-			if rt > 0 {
-				log.Println("deleteSlice", stype, rt, v, len(arr))
-			}
 			return arr
 		}
 	}

+ 3 - 8
data_project/udptaskmap.go

@@ -7,13 +7,12 @@ import (
 	"log"
 	"net"
 	"net/http"
+	"project/config"
 	"sync"
 	"time"
 )
 
 var udptaskmap = &sync.Map{}
-var tomail string
-var api string
 
 type udpNode struct {
 	data      []byte
@@ -24,11 +23,7 @@ type udpNode struct {
 
 func checkMapJob() {
 	//阿里云内网无法发送邮件
-	jkmail, _ := Sysconfig["jkmail"].(map[string]interface{})
-	if jkmail != nil {
-		tomail, _ = jkmail["to"].(string)
-		api, _ = jkmail["api"].(string)
-		log.Println("start checkMapJob", tomail, Sysconfig["jkmail"])
+	if config.Conf.Mail.Send {
 		for {
 			udptaskmap.Range(func(k, v interface{}) bool {
 				now := time.Now().Unix()
@@ -38,7 +33,7 @@ func checkMapJob() {
 					if node.retry > 5 {
 						log.Println("udp重试失败", k)
 						udptaskmap.Delete(k)
-						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "project-send-fail", k.(string)))
+						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "project-send-fail", k.(string)))
 						if err == nil {
 							defer res.Body.Close()
 							read, err := ioutil.ReadAll(res.Body)

+ 66 - 58
data_project/update.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
 	"app.yhyue.com/data_processing/common_utils/mongodb"
 	"app.yhyue.com/data_processing/common_utils/redis"
 	"app.yhyue.com/data_processing/common_utils/udp"
@@ -10,8 +11,9 @@ import (
 	"github.com/goinggo/mapstructure"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
-	"log"
+	"go.uber.org/zap"
 	"math"
+	"project/config"
 	"reflect"
 	"sort"
 	"strings"
@@ -30,7 +32,7 @@ func (p *ProjectTask) modifyUpdate(pid string, index, position int, tmpPro, modi
 	updateSet := UpdateValue(tmpPro, index, position, modifyProMap)
 	updateSet["list"] = tmpPro["list"]
 	// 1-2 修改mgo
-	bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updateSet})
+	bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": updateSet})
 	if bol {
 		// 1-3 修改es
 		by, _ := json.Marshal(map[string]interface{}{
@@ -41,7 +43,7 @@ func (p *ProjectTask) modifyUpdate(pid string, index, position int, tmpPro, modi
 				}},
 			"stype": "project",
 		})
-		util.Debug(string(by))
+		log.Info(string(by))
 		_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 	}
 	// 修改内存
@@ -53,7 +55,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 	info := ParseInfo(tmp)
 	//项目中list大小等于1
 	if len(proList) == 1 {
-		log.Println("list大小等于1    合并", "------1------")
+		log.Info("list大小等于1    合并" + "------1------")
 		// 2.1 重新合并
 		merge := p.ReMerge(info, tmp, tmpPro)
 		if merge {
@@ -89,11 +91,11 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 			flag := true // 标记是否需要删除原有项目信息
 			for _, v := range proList {
 				v1 := v.(map[string]interface{})
-				temp, _ := MongoTool.FindById(ExtractColl, util.ObjToString(v1["infoid"]), nil)
+				temp, _ := MgoB.FindById(BiddingColl, util.ObjToString(v1["infoid"]), nil)
 				if len(*temp) == 0 {
-					temp, _ = MongoTool.FindById(ExtractColl1, util.ObjToString(v1["infoid"]), nil)
+					temp, _ = MgoB.FindById("bidding_back", util.ObjToString(v1["infoid"]), nil)
 					if len(*temp) == 0 {
-						util.Debug("extract not find id...", v1["infoid"])
+						log.Info("extract not find id...", zap.Any("id", v1["infoid"]))
 						continue
 					}
 				}
@@ -111,7 +113,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 			} else {
 				//合并到原有项目中, 不用继续重新合并, 原有项目内部合并即可
 				p.innerMerge(proList, tmpPro)
-				bol := MongoTool.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
+				bol := MgoP.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
 				if bol {
 					by, _ := json.Marshal(map[string]interface{}{
 						"query": map[string]interface{}{
@@ -121,7 +123,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 							}},
 						"stype": "project",
 					})
-					util.Debug(string(by))
+					log.Info(string(by))
 					_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 				}
 				// 修改内存
@@ -145,7 +147,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 			backupPro(tmpPro)
 			delete(tmpPro, "reason")
 			p.innerMerge1(proList, util.ObjToString(tmp["infoid"]), tmpPro)
-			bol := MongoTool.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
+			bol := MgoP.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
 			if bol {
 				by, _ := json.Marshal(map[string]interface{}{
 					"query": map[string]interface{}{
@@ -155,7 +157,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 						}},
 					"stype": "project",
 				})
-				util.Debug(string(by))
+				log.Info(string(by))
 				_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 			}
 		} else {
@@ -174,7 +176,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 			w := len(proList) - 1
 			proList = deleteSlice1(proList, proList[w])
 			p.innerMerge(proList, tmpPro)
-			bol := MongoTool.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
+			bol := MgoP.UpdateById(ProjectColl, pInfoId, map[string]interface{}{"$set": tmpPro})
 			if bol {
 				by, _ := json.Marshal(map[string]interface{}{
 					"query": map[string]interface{}{
@@ -184,7 +186,7 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 						}},
 					"stype": "project",
 				})
-				util.Debug(string(by))
+				log.Info(string(by))
 				_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 			}
 		} else {
@@ -198,17 +200,17 @@ func (p *ProjectTask) mergeAndModify(pInfoId string, index, position int, tmp ma
 
 // 删除
 func (p *ProjectTask) delJudge(infoid, pid string) {
-	tmpPro, _ := MongoTool.FindById(ProjectColl, pid, nil)
+	tmpPro, _ := MgoP.FindById(ProjectColl, pid, nil)
 	ids := (*tmpPro)["ids"].([]interface{})
 	proList := (*tmpPro)["list"].([]interface{})
 	if len(ids) == 1 {
 		(*tmpPro)["reason"] = "删除项目信息"
 		backupPro(*tmpPro)
-		c := MongoTool.Delete(ProjectColl, map[string]interface{}{"_id": mongodb.StringTOBsonId(pid)})
+		c := MgoP.Delete(ProjectColl, map[string]interface{}{"_id": mongodb.StringTOBsonId(pid)})
 		if c > 0 {
 			client := Es.GetEsConn()
 			defer Es.DestoryEsConn(client)
-			Es.DelById(Itype, pid)
+			Es.DelById(config.Conf.DB.Es.Index, pid)
 		}
 		return
 	}
@@ -233,11 +235,11 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 		flag := true // 标记是否需要删除原有项目信息
 		for _, v := range proList {
 			v1 := v.(map[string]interface{})
-			temp, _ := MongoTool.FindById(ExtractColl, util.ObjToString(v1["infoid"]), nil)
+			temp, _ := MgoB.FindById(BiddingColl, util.ObjToString(v1["infoid"]), nil)
 			if len(*temp) == 0 {
-				temp, _ = MongoTool.FindById(ExtractColl1, util.ObjToString(v1["infoid"]), nil)
+				temp, _ = MgoB.FindById("bidding_back", util.ObjToString(v1["infoid"]), nil)
 				if len(*temp) == 0 {
-					util.Debug("extract not find id...", v1["infoid"])
+					log.Info("extract not find id...", zap.Any("id", v1["infoid"]))
 					continue
 				}
 			}
@@ -254,7 +256,7 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 			delOldPro(pid)
 		} else {
 			*tmpPro, pro = p.innerMerge(proList, *tmpPro)
-			bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
+			bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
 			if bol {
 				by, _ := json.Marshal(map[string]interface{}{
 					"query": map[string]interface{}{
@@ -264,7 +266,6 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 						}},
 					"stype": "project",
 				})
-				util.Debug(string(by))
 				_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 			}
 			// 修改内存
@@ -280,7 +281,7 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 		delete(*tmpPro, "reason")
 		var pro *ProjectCache
 		*tmpPro, pro = p.innerMerge1(proList, infoid, *tmpPro)
-		bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
+		bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
 		if bol {
 			by, _ := json.Marshal(map[string]interface{}{
 				"query": map[string]interface{}{
@@ -290,7 +291,6 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 					}},
 				"stype": "project",
 			})
-			util.Debug(string(by))
 			_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 		}
 		// 内存
@@ -307,7 +307,7 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 		proList = deleteSlice1(proList, proList[w])
 		var pro *ProjectCache
 		*tmpPro, pro = p.innerMerge(proList, *tmpPro)
-		bol := MongoTool.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
+		bol := MgoP.UpdateById(ProjectColl, pid, map[string]interface{}{"$set": tmpPro})
 		if bol {
 			by, _ := json.Marshal(map[string]interface{}{
 				"query": map[string]interface{}{
@@ -317,7 +317,6 @@ func (p *ProjectTask) delJudge(infoid, pid string) {
 					}},
 				"stype": "project",
 			})
-			util.Debug(string(by))
 			_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 		}
 		// 内存
@@ -462,11 +461,10 @@ func (p *ProjectTask) innerMerge(infoList []interface{}, tmpPro map[string]inter
 	var p1 *ProjectCache
 	for k, m := range infoList {
 		m1 := m.(map[string]interface{})
-		temp, _ := MongoTool.FindById(ExtractColl, util.ObjToString(m1["infoid"]), nil)
+		temp, _ := MgoB.FindById(BiddingColl, util.ObjToString(m1["infoid"]), nil)
 		if len(*temp) == 0 {
-			temp, _ = MongoTool.FindById(ExtractColl1, util.ObjToString(m1["infoid"]), nil)
+			temp, _ = MgoB.FindById("bidding_back", util.ObjToString(m1["infoid"]), nil)
 			if len(*temp) == 0 {
-				util.Debug("extract not find id...", m1["infoid"])
 				continue
 			}
 		}
@@ -489,11 +487,11 @@ func (p *ProjectTask) innerMerge1(infoList []interface{}, infoid string, tmpPro
 		if m1["infoid"] == infoid {
 			continue
 		}
-		temp, _ := MongoTool.FindById(ExtractColl, util.ObjToString(m1["infoid"]), nil)
+		temp, _ := MgoB.FindById(BiddingColl, util.ObjToString(m1["infoid"]), nil)
 		if len(*temp) == 0 {
-			temp, _ = MongoTool.FindById(ExtractColl1, util.ObjToString(m1["infoid"]), nil)
+			temp, _ = MgoB.FindById("bidding_back", util.ObjToString(m1["infoid"]), nil)
 			if len(*temp) == 0 {
-				util.Debug("extract not find id...", m1["infoid"])
+				log.Info("extract not find id...", zap.Any("id", m1["infoid"]))
 				continue
 			}
 		}
@@ -515,7 +513,7 @@ func (p *ProjectTask) updateProFiled(tmp map[string]interface{}, thisinfo *Info,
 	var project *Project
 	err := json.Unmarshal(bys, &project)
 	if err != nil {
-		util.Debug("project Unmarshal err,", err)
+		log.Info("project Unmarshal err," + err.Error())
 		return
 	}
 	set := map[string]interface{}{}
@@ -564,9 +562,13 @@ func (p *ProjectTask) updateProFiled(tmp map[string]interface{}, thisinfo *Info,
 			project.Jgtime = thisinfo.Publishtime
 		}
 	}
-	if thisinfo.Bidopentime > project.Bidopentime {
-		project.Bidopentime = thisinfo.Bidopentime
-		set["bidopentime"] = thisinfo.Bidopentime
+	if thisinfo.BidOpenTime > project.BidOpenTime {
+		project.BidOpenTime = thisinfo.BidOpenTime
+		set["bidopentime"] = thisinfo.BidOpenTime
+	}
+	if thisinfo.BidEndTime > 0 {
+		project.BidEndTime = thisinfo.BidEndTime
+		set["bidendtime"] = project.BidEndTime
 	}
 	// bidtype、bidstatus
 	pInfo.Bidtype, pInfo.Bidstatus = p.GetBidTypeAndBidStatus(thisinfo)
@@ -775,7 +777,7 @@ func (p *ProjectTask) updateProFiled(tmp map[string]interface{}, thisinfo *Info,
 		"list": push,
 		"ids":  thisinfo.Id,
 	}
-	bol := MongoTool.UpdateById(ProjectColl, pInfo.Id.Hex(), update)
+	bol := MgoP.UpdateById(ProjectColl, pInfo.Id.Hex(), update)
 	if bol {
 		by, _ := json.Marshal(map[string]interface{}{
 			"query": map[string]interface{}{
@@ -785,7 +787,7 @@ func (p *ProjectTask) updateProFiled(tmp map[string]interface{}, thisinfo *Info,
 				}},
 			"stype": "project",
 		})
-		util.Debug(string(by))
+		log.Info(string(by))
 		_ = udpclient.WriteUdp(by, udp.OP_TYPE_DATA, toaddr[1])
 	}
 }
@@ -911,7 +913,7 @@ func (p *ProjectTask) updateOldProField(pInfo *ProjectCache, thisinfo *Info, tmp
 	var project *Project
 	err := json.Unmarshal(bys, &project)
 	if err != nil {
-		util.Debug("project Unmarshal err,", err)
+		log.Error("project Unmarshal err," + err.Error())
 		return
 	}
 	if len(pInfo.Ids) > 30 {
@@ -957,9 +959,13 @@ func (p *ProjectTask) updateOldProField(pInfo *ProjectCache, thisinfo *Info, tmp
 			project.Jgtime = thisinfo.Publishtime
 		}
 	}
-	if thisinfo.Bidopentime > project.Bidopentime {
-		project.Bidopentime = thisinfo.Bidopentime
-		tmpPro["bidopentime"] = thisinfo.Bidopentime
+	if thisinfo.BidOpenTime > project.BidOpenTime {
+		project.BidOpenTime = thisinfo.BidOpenTime
+		tmpPro["bidopentime"] = thisinfo.BidOpenTime
+	}
+	if thisinfo.BidEndTime > 0 {
+		project.BidEndTime = thisinfo.BidEndTime
+		tmpPro["bidendtime"] = thisinfo.BidEndTime
 	}
 	// bidtype、bidstatus
 	pInfo.Bidtype, pInfo.Bidstatus = p.GetBidTypeAndBidStatus(thisinfo)
@@ -1212,20 +1218,18 @@ func backupPro(tmp map[string]interface{}) {
 	for k, v := range tmp {
 		tmp1[k] = v
 	}
-	if Sysconfig["backupFlag"].(bool) {
+	if config.Conf.Serve.BackupFlag {
 		tmp1["sourceprojectid"] = mongodb.BsonIdToSId(tmp1["_id"])
 		delete(tmp1, "_id")
-		MongoTool.Save(BackupColl, tmp1)
+		MgoP.Save(BackupColl, tmp1)
 	}
 }
 
 // 删除原有项目数据
 func delOldPro(pid string) {
-	t := MongoTool.Delete(ProjectColl, map[string]interface{}{"_id": mongodb.StringTOBsonId(pid)})
+	t := MgoP.Delete(ProjectColl, map[string]interface{}{"_id": mongodb.StringTOBsonId(pid)})
 	if t >= 0 {
-		client := Es.GetEsConn()
-		defer Es.DestoryEsConn(client)
-		Es.DelById(Index, pid)
+		Es.DelById(config.Conf.DB.Es.Index, pid)
 	}
 }
 
@@ -1235,7 +1239,7 @@ func (p *ProjectTask) modifyMem(tmpPro map[string]interface{}) {
 	var pro ProjectCache
 	err := mapstructure.Decode(tmpPro, &pro)
 	if err != nil {
-		util.Debug(err)
+		log.Info(err.Error())
 	}
 	pro.Id = mongodb.StringTOBsonId(pid)
 	tmpMap := make(map[string]InfoField)
@@ -1258,7 +1262,7 @@ func (p *ProjectTask) modifyMem(tmpPro map[string]interface{}) {
 func (p *ProjectTask) printMemPro(pid string) {
 	p.AllIdsMapLock.Lock()
 	if v, ok := p.AllIdsMap[pid]; ok {
-		util.Debug("mem pro ----------", *v.P)
+		log.Info("mem pro ----------", zap.Any("pro", *v.P))
 	}
 	p.AllIdsMapLock.Unlock()
 }
@@ -1266,9 +1270,9 @@ func (p *ProjectTask) printMemPro(pid string) {
 // @Description id不变,内容变化 重新进行项目合并
 // @Author J 2022/8/10 14:51
 func (p *ProjectTask) taskinfo(id string) {
-	tmpPro, _ := MongoTool.FindOneByField(ProjectColl, map[string]interface{}{"ids": id}, nil)
+	tmpPro, _ := MgoP.FindOneByField(ProjectColl, map[string]interface{}{"ids": id}, nil)
 	if tmpPro == nil || len(*tmpPro) == 0 {
-		util.Debug(fmt.Sprintf("taskinfo bidding id=%s 未查询到项目数据", id))
+		log.Info(fmt.Sprintf("taskinfo bidding id=%s 未查询到项目数据", id))
 		return
 	}
 	pid := mongodb.BsonIdToSId((*tmpPro)["_id"])
@@ -1278,11 +1282,11 @@ func (p *ProjectTask) taskinfo(id string) {
 	var newP map[string]interface{}
 	var p1 *ProjectCache
 	for i, s := range ids {
-		temp, _ := MongoTool.FindById(ExtractColl, s, nil)
+		temp, _ := MgoB.FindById(BiddingColl, s, nil)
 		if temp == nil || len(*temp) == 0 {
-			temp, _ = MongoTool.FindById(ExtractColl1, s, nil)
+			temp, _ = MgoB.FindById("bidding_back", s, nil)
 			if temp == nil || len(*temp) == 0 {
-				util.Debug("extract not find id...", s)
+				log.Info("extract not find id..." + s)
 				return
 			}
 		}
@@ -1529,7 +1533,7 @@ func (p *ProjectTask) updatePro(set, tmp map[string]interface{}, pInfo *ProjectC
 	var project *Project
 	err := json.Unmarshal(bys, &project)
 	if err != nil {
-		util.Debug("project Unmarshal err,", err)
+		log.Info("project Unmarshal err," + err.Error())
 		return
 	}
 
@@ -1576,9 +1580,13 @@ func (p *ProjectTask) updatePro(set, tmp map[string]interface{}, pInfo *ProjectC
 			set["jgtime"] = tmp["publishtime"]
 		}
 	}
-	if thisinfo.Bidopentime > project.Bidopentime {
-		project.Bidopentime = thisinfo.Bidopentime
-		set["bidopentime"] = project.Bidopentime
+	if thisinfo.BidOpenTime > project.BidOpenTime {
+		project.BidOpenTime = thisinfo.BidOpenTime
+		set["bidopentime"] = project.BidOpenTime
+	}
+	if thisinfo.BidEndTime > 0 {
+		project.BidEndTime = thisinfo.BidEndTime
+		set["bidendtime"] = project.BidEndTime
 	}
 
 	bt := util.ObjToString(tmp["toptype"])

+ 76 - 0
field_sync/task.go

@@ -15,6 +15,7 @@ import (
 	"net"
 	"reflect"
 	"regexp"
+	"sort"
 	"strings"
 	"time"
 )
@@ -308,6 +309,12 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//处理分类
 		if compare != nil { //extract
 			fieldFun(compare, update)
+			// publishtime 20230523
+			if util.IntAll(tmp["publishtime"]) == -1 {
+				if pb := methodPb(compare); pb > 0 {
+					update["publishtime"] = pb
+				}
+			}
 			compare = nil
 		}
 		//------------------对比结束
@@ -326,6 +333,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 				extractMap["entidlist"] = cid
 			}
 		}
+
 		// 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
 		typeFunc(tmp, update, extractMap)
 		if len(extractMap) > 0 {
@@ -390,6 +398,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 }
 
 // @Description subscopeclass、topscopeclass、package
+// 20230523 多包处理 subpackage = 1
 // @Author J 2022/6/7 5:54 PM
 func fieldFun(compare, update map[string]interface{}) {
 	subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
@@ -435,6 +444,41 @@ func fieldFun(compare, update map[string]interface{}) {
 	} else {
 		update["multipackage"] = 0
 	}
+
+	// subpackage
+	if compare["package"] != nil && compare["s_winner"] != nil && compare["bidamount"] != nil {
+		pg := compare["package"].(map[string]interface{})
+		if len(pg) > 1 {
+			var bmt []float64
+			var swn []string
+			for _, p := range pg {
+				p1 := p.(map[string]interface{})
+				if p1["bidamount"] != nil {
+					bmt = append(bmt, util.Float64All(p1["bidamount"]))
+				}
+				if w := util.ObjToString(p1["winner"]); w != "" {
+					swn = append(swn)
+				}
+			}
+
+			if len(bmt) > 1 && len(swn) > 1 {
+				sn := strings.Split(util.ObjToString(compare["s_winner"]), ",")
+				sort.Strings(sn)
+				sort.Strings(swn)
+				swn1 := util.ObjArrToStringArr(Duplicate(swn)) // 去重
+				if strings.Join(swn1, ",") == strings.Join(sn, ",") {
+					bidamount := 0.0
+					for _, f := range bmt {
+						bidamount += f
+					}
+					if bidamount == util.Float64All(compare["bidamount"]) {
+						update["subpackage"] = 1
+					}
+				}
+			}
+
+		}
+	}
 }
 
 // @Description entidlist
@@ -652,3 +696,35 @@ func taskinfo(id string) {
 	log.Info("nsq data over", zap.Any("es", next), zap.String("mapinfo", string(datas)))
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
 }
+
+var DateTimeSelect = []string{"bidopentime", "bidendtime", "signaturedate", "comeintime"}
+
+// @Description 发布时间处理
+// @Author J 2023/5/23 14:32
+func methodPb(tmp map[string]interface{}) int64 {
+	if tmp["ext_publishtime"] != nil {
+		if newPb := util.Int64All(tmp["ext_publishtime"]); newPb < time.Now().Unix() && newPb > 1420041600 {
+			return newPb
+		}
+	}
+	for _, d := range DateTimeSelect {
+		if tmp[d] != nil {
+			return util.Int64All(tmp[d])
+		}
+	}
+	return 0
+}
+
+// Duplicate
+//@Description 去重
+// @Author J 2023/5/24 09:53
+func Duplicate(a interface{}) (ret []interface{}) {
+	va := reflect.ValueOf(a)
+	for i := 0; i < va.Len(); i++ {
+		if i > 0 && reflect.DeepEqual(va.Index(i-1).Interface(), va.Index(i).Interface()) {
+			continue
+		}
+		ret = append(ret, va.Index(i).Interface())
+	}
+	return ret
+}