Jianghan пре 2 година
родитељ
комит
b4cef30271

+ 9 - 0
README.md

@@ -38,6 +38,15 @@
 2. 数据进行项目合并,
 3. tidb库数据迁移
 
+####  拟在建关联
+1. 建立关联关系,拟建项目与在建项目
+2. 关联数据更新到tidb库
+   + 基本信息表的跟进数量、项目阶段
+   + 跟进记录表的信息
+3. 关联数据增量处理
+   + 拟建项目增量处理
+   + 在建项目增量数据处理
+
 
 ### medical_project(医疗领域数据合并)
 

+ 9 - 3
proposed_project/common.toml

@@ -2,8 +2,7 @@
 [serve]
 pici = 1669942800
 thread = 1
-tagrule = "nijian_rule"
-procoll = "projectset_proposed"
+tagrule = "nzj_rule"
 jyhref = "https://www.jianyu360.cn/article/content/%s.html"
 
 [udp]
@@ -33,11 +32,18 @@ password = ""
 [db.mongoP]
 addr = "192.168.3.207:29099"
 dbname = "wjh"
-coll = "projectset_proposed"
+proposedColl = "projectset_proposed"
+combColl = "projectset_comb"
+projectColl = "projectset_20221103"
 size = 15
 user = ""
 password = ""
 
+[redis]
+addr = "192.168.3.207:2679"
+pcode = "project"
+db = 1
+
 [db.es]
 addr = "http://127.0.0.1:19800"
 size = 5

+ 16 - 7
proposed_project/config/conf.go

@@ -31,9 +31,9 @@ type conf struct {
 }
 
 type serve struct {
+	Pici    int64
 	Thread  int
 	TagRule string
-	ProColl string
 	JyHref  string
 }
 
@@ -70,15 +70,24 @@ type db struct {
 	MongoP mgo
 	Mysql  mysql
 	Es     es
+	Redis  redis
 }
 
 type mgo struct {
-	Addr     string
-	Dbname   string
-	Coll     string
-	Size     int
-	User     string
-	Password string
+	Addr         string
+	Dbname       string
+	ProposedColl string
+	CombColl     string
+	ProjectColl  string
+	Size         int
+	User         string
+	Password     string
+}
+
+type redis struct {
+	Addr  string
+	Pcode string
+	Db    int
 }
 
 type mysql struct {

+ 5 - 124
proposed_project/config/conf_test.go

@@ -49,24 +49,10 @@ size = 15
 user = ""
 password = ""
 
-[db.es]
-addr = "http://192.168.3.206:9800"
-size = 5
-indexb = "bidding"
-typeb = "bidding"
-indexp = "projectset"
-typep = "projectset"
-indexwinner = "winner"
-typewinner = "winner"
-indexbuyer = "buyer"
-typebuyer = "buyer"
-detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
-    "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",
-    "千里马[\\w\\W]{0,15}[a-z0-9:\\/\\/.]{0,20}qianlima[a-z0-9.\\/\\/]{0,10}",
-    "[\\((]?(网址)?[::;;]?(http|https|htpps)*[::]?(\\/\\/)?(www|jinan|WWW)?.(zhaobiao|chinabidding|Chinabidding|CHINABIDDING|infobidding|zbytb|okcis|qianlima|youzhicai).(com|cn|COM|CN)?(.cn|.CN)?\\/?[\\))]?",
-    "[\\((]?(网址)?(::)?(http|https|htpps)*(:|:)?\\/\\/www.bidcenter.com.cn\\/",
-    "千里马(平台|网站)+", "[“\"]?优质采(平台|电子交易平台|云采购平台|交易平台)?[”\"]?", "《?(中国采购与|中国)?招(投)?标(与采购|采购导航)?网》?",
-    "《?元博网(采购与招标网)?》?", "《?(中国)?招标采购导航网》?", "中\\W{0,3}国采\\W{0,3}招\\W{0,3}网\\W*[((]?(bidcenter.com.cn)?[))]?", "已方宝", "中国招标与采购"]
+[db.redis]
+addr = "192.168.3.207:2679"
+pcode = "project"
+db = 1
 
 [mail]
 send = false
@@ -90,117 +76,12 @@ loglevel  = "debug"
 # text or json output
 format = "text"
 
-[db.es.fieldes]
-"_id" = ""
-"buyerzipcode" = "string"
-"winnertel" = "string"
-"winnerperson" = "string"
-"contractcode" = "string"
-"winneraddr" = "string"
-"agencyaddr" = "string"
-"buyeraddr" = "string"
-"signaturedate" = "int64"
-"projectperiod" = "string"
-"projectaddr" = "string"
-"agencytel" = "string"
-"agencyperson" = "string"
-"buyerperson" = "string"
-"agency" = "string"
-"projectscope" = "string"
-"projectcode" = "string"
-"bidopentime" = "int64"
-"supervisorrate" = "float64"
-"buyertel" = "string"
-"bidamount" = "float64"
-"winner" = "string"
-"buyer" = "string"
-"budget" = "float64"
-"projectname" = "string"
-"bidstatus" = "string"
-"buyerclass" = "string"
-"topscopeclass" = ""
-"s_topscopeclass" = "string"
-"s_subscopeclass" = "string"
-"area" = "string"
-"city" = "string"
-"district" = "string"
-"s_winner" = "string"
-"title" = "string"
-"detail" = "string"
-"site" = "string"
-"comeintime" = "int64"
-"href" = "string"
-"infoformat" = "int32"
-"publishtime" = "int64"
-"s_sha" = "string"
-"spidercode" = "string"
-"subtype" = "string"
-"toptype" = "string"
-"projectinfo" = ""
-"purchasing" = "string"
-"purchasinglist" = ""
-"channel" = "string"
-"winnerorder" = ""
-"project_scale" = "string"
-"project_duration" = "int32"
-"project_timeunit" = "string"
-"project_startdate" = "int64"
-"project_completedate" = "int64"
-"payway" = "string"
-"contract_guarantee" = "bool"
-"bid_guarantee" = "bool"
-"qualifies" = ""
-"entidlist" = ""
-"funds" = "string"
-"review_experts" = "string"
-"bidmethod" = "string"
-"bidendtime" = "int64"
-"bidopenaddress" = "string"
-"docamount" = "float64"
-"agencyrate" = "float64"
-"agencyfee" = "float64"
-"bidway" = "string"
-"getdocmethod" = "string"
-"china_bidding" = "string"
-"purchasing_tag" = "string"
-"multipackage" = "int32"
-"isValidFile" = "bool"
-"bid_field" = "string"
-[db.es.fieldprojectinfo]
-"approvecode" = "string"
-"approvecontent" = "string"
-"approvestatus" = "string"
-"approvetime" = "string"
-"approvedept" = "string"
-"approvenumber" = "string"
-"projecttype" = "string"
-"approvecity" = "string"
-[db.es.fieldpurchasinglist]
-"itemname" = "string"
-"item" = "string"
-"brandname" = "string"
-"model" = "string"
-"unitname" = "string"
-"number" = "float64"
-"unitprice" = "float64"
-"totalprice" = "float64"
-[db.es.fieldprocurementlist]
-"projectname" = "string"
-"buyer" = "string"
-"item" = "string"
-"projectscope" = "string"
-"expurasingtime" = "string"
-"totalprice" = "float64"
-[db.es.fieldwinnerorder]
-"sort" = "int"
-"sortstr" = "string"
-"entname" = "string"
 `
 
 func TestInit(t *testing.T) {
 	testfile := "/tmp/crocodile.toml"
 	ioutil.WriteFile(testfile, []byte(confs), 0644)
 	Init(testfile)
-	t.Logf("%+v", Conf.DB.Mysql.DbnameMedical)
+	t.Logf("%+v", Conf.DB.Redis.Addr)
 	os.Remove(testfile)
 }

+ 2 - 0
proposed_project/init.go

@@ -197,3 +197,5 @@ type Info struct {
 	ProjectPhone        string   `json:"project_phone"`
 	ProjectPerson       string   `json:"project_person"`
 }
+
+var Project_Stage = []string{"可研", "初设", "环评", "审批备案", "设计", "施工准备", "施工", "竣工验收"}

+ 66 - 2
proposed_project/main.go

@@ -41,6 +41,8 @@ func main() {
 	rootCmd.AddCommand(redisSave())
 	rootCmd.AddCommand(esSave())
 	rootCmd.AddCommand(projectComb())
+	rootCmd.AddCommand(projectCombTidb())
+	rootCmd.AddCommand(projectCombAdd())
 
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
@@ -261,6 +263,7 @@ func esSave() *cobra.Command {
 			cronstr := "0 */30 * * * ?" // 每30min执行一次
 			_ = crn.AddFunc(cronstr, func() {
 				if TaskSingle {
+					TaskSingle = false
 					esDisp()
 				} else {
 					log.Info("上次任务未执行完成")
@@ -277,6 +280,8 @@ func esSave() *cobra.Command {
 	return cmdClient
 }
 
+// @Description 拟在建项目关联
+// @Author J 2023/4/17 09:05
 func projectComb() *cobra.Command {
 	cmdClient := &cobra.Command{
 		Use:   "project-comb",
@@ -286,6 +291,8 @@ func projectComb() *cobra.Command {
 			initSeg()
 			InitEs()
 
+			redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Pcode, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Db)
+
 			go SavePpMethod()
 			taskC()
 			c := make(chan bool, 1)
@@ -295,6 +302,62 @@ func projectComb() *cobra.Command {
 	return cmdClient
 }
 
+// @Description 拟在建项目关联后入tidb处理
+// @Author J 2023/4/17 09:06
+func projectCombTidb() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "project-comb-tidb",
+		Short: "Start processing combined project",
+		Run: func(cmd *cobra.Command, args []string) {
+
+			InitMysql()
+			initStage()
+
+			redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Pcode, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Db)
+			go SaveEntFunc1("dwd_f_nzj_ent", EntField)
+			taskD()
+			c := make(chan bool, 1)
+			<-c
+		},
+	}
+	return cmdClient
+}
+
+// @Description 拟在建关联数据 增量数据处理
+// @Author J 2023/4/24 13:51
+func projectCombAdd() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "project-comb-add",
+		Short: "Start processing combined project add",
+		Run: func(cmd *cobra.Command, args []string) {
+
+			InitMysql()
+			initStage()
+
+			redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Pcode, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Db)
+			go SaveEntFunc1("dwd_f_nzj_ent", EntField)
+
+			crn := cron.New()
+			cronstr := "0 */30 * * * ?" // 每30min执行一次
+			_ = crn.AddFunc(cronstr, func() {
+				if TaskSingle {
+					TaskSingle = false
+					go taskAA()
+					go taskBB()
+				} else {
+					log.Info("上次任务未执行完成")
+				}
+
+			})
+			crn.Start()
+
+			c := make(chan bool, 1)
+			<-c
+		},
+	}
+	return cmdClient
+}
+
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
 	case udp.OP_TYPE_DATA:
@@ -322,8 +385,8 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 
 func taskQ() (string, string) {
 	log.Info("taskQ", zap.String("lastid", LastId))
-	query := bson.M{"gtid": bson.M{"$gt": LastId}, "dataprocess": 8}
-	info, _ := MgoBid.Find("bidding_processing_ids", query, `{"_id": -1}`, nil, false, -1, 1)
+	query := bson.M{"gtid": bson.M{"$gt": LastId}}
+	info, _ := MgoBid.Find("field_data_record", query, `{"_id": -1}`, nil, false, -1, 1)
 	if len(*info) > 0 {
 		newid := util.ObjToString((*info)[0]["lteid"])
 		log.Info("taskQ", zap.String("新lastid", newid))
@@ -480,6 +543,7 @@ func esDisp() {
 	}
 	wg.Wait()
 	log.Info(fmt.Sprintf("over --- %d", count))
+	TaskSingle = true
 }
 
 func saveMethod() {

+ 1 - 1
proposed_project/proTask.go

@@ -183,7 +183,7 @@ func doTask1(gtid, lteid string) {
 		"$lte": mongodb.StringTOBsonId(lteid)}}
 
 	log.Info("doTask", zap.Any("q", q))
-	query := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(nil).Iter()
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Select(nil).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {

+ 395 - 16
proposed_project/projectTask.go

@@ -3,9 +3,13 @@ package main
 import (
 	util "app.yhyue.com/data_processing/common_utils"
 	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"app.yhyue.com/data_processing/common_utils/redis"
 	"encoding/json"
 	"fmt"
 	"github.com/go-ego/gse"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
 	"io/ioutil"
 	"net/http"
 	"net/url"
@@ -35,8 +39,8 @@ var (
 )
 
 func initSeg() {
-	//_ = seg.LoadDict("./t_1.txt")
-	_ = seg.LoadDict()
+	_ = seg.LoadDict("./t_1.txt")
+	//_ = seg.LoadDict()
 	seg.AddToken("渼陂", 3, "")
 	seg.LoadStop("./stopwords.txt")
 
@@ -66,7 +70,7 @@ func taskC() {
 		"district":      1,
 	}
 
-	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Serve.ProColl).Find(nil).Select(f).Iter()
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%2000 == 0 {
@@ -89,23 +93,26 @@ func taskC() {
 			var eArr []map[string]interface{}
 			n1, n2 := 0, 0
 			// approvecode、approvenumber
-			q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
-			if q != "" {
-				binfo := Es.Get("projectset", q)
-				if binfo != nil && len(*binfo) > 0 {
-					for _, m := range *binfo {
-						n1 = len(*binfo)
-						mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
-					}
-				}
-			}
+			//q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
+			//if q != "" {
+			//	binfo := Es.Get("projectset", q)
+			//	if binfo != nil && len(*binfo) > 0 {
+			//		for _, m := range *binfo {
+			//			n1 = len(*binfo)
+			//			mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
+			//		}
+			//	}
+			//}
 
 			wds, q := Method1(util.ObjToString(tmp["projectname"]))
 			if q != "" {
-				binfo := Es.Get("projectset", q)
+				binfo := Es.Get("projectset_v1", q)
 				if binfo != nil && len(*binfo) > 0 {
 					n2 = len(*binfo)
 					for _, m := range *binfo {
+						if b, _ := redis.Exists(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"])); b {
+							continue
+						}
 						if util.ObjToString(m["bidstatus"]) == "拟建" {
 							eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
 							continue
@@ -114,7 +121,10 @@ func taskC() {
 							eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
 							continue
 						}
+
+						redis.PutCKV(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"]), mongodb.BsonIdToSId(tmp["_id"]))
 						mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2})
+
 					}
 				}
 			}
@@ -128,6 +138,7 @@ func taskC() {
 			save["esearch"] = q
 			save["size_1"] = n1
 			save["size_2"] = n2
+			save["createtime"] = time.Now().Unix()
 			savePpPool <- save
 			//}
 		}(tmp)
@@ -379,7 +390,7 @@ func SavePpMethod() {
 					defer func() {
 						<-savePpSp
 					}()
-					MgoPro.SaveBulk("projectset_comb", arru...)
+					MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -391,7 +402,375 @@ func SavePpMethod() {
 					defer func() {
 						<-savePpSp
 					}()
-					MgoPro.SaveBulk("projectset_comb", arru...)
+					MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+var StageCode []TagMatching
+
+func initStage() {
+	info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1)
+	for _, m := range *info {
+		tag := TagMatching{}
+		tag.tagName = util.ObjToString(m["label_name"])
+		tag.tagCode = util.ObjToString(m["code"])
+		// 关键词
+		tag.matchField = []string{"title", "project"}
+		if v := util.ObjToString(m["keyword"]); v != "" {
+			tag.matchKey = util.ObjToString(m["keyword"])
+			tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
+		}
+		// 附件词
+		if f := util.ObjToString(m["match_fjword"]); f != "" {
+			tag.addField = strings.Split(f, ",")
+			for _, s := range tag.addField {
+				SelectF[s] = 1
+			}
+			if v := util.ObjToString(m["fjword"]); v != "" {
+				tag.addKey = util.ObjToString(m["fjword"])
+				tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
+			}
+		}
+		// 排除词
+		if f := util.ObjToString(m["match_pcword"]); f != "" {
+			tag.excludeField = strings.Split(f, ",")
+			for _, s := range tag.excludeField {
+				SelectF[s] = 1
+			}
+			if v := util.ObjToString(m["pcword"]); v != "" {
+				tag.excludeKey = util.ObjToString(m["pcword"])
+				tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
+			}
+		}
+		// 清理词
+		if v := util.ObjToString(m["qlword"]); v != "" {
+			tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
+		}
+		StageCode = append(StageCode, tag)
+	}
+	log.Info("initStage", zap.Int("StageCode", len(StageCode)))
+}
+
+func taskD() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+
+	ch := make(chan bool, config.Conf.Serve.Thread)
+	wg := &sync.WaitGroup{}
+
+	//q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%2000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			if ids, ok := tmp["ids"].([]interface{}); ok {
+				//id := mongodb.BsonIdToSId(tmp["_id"])
+
+				for _, p := range ids {
+					p1 := p.(map[string]interface{})
+					info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
+					if list, ok1 := (*info)["list"].([]interface{}); ok1 {
+						for _, l := range list {
+							l1 := l.(map[string]interface{})
+							m := make(map[string]interface{})
+							m["project_stage_code"] = tagFunc(l1)
+							m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+							m["title"] = util.ObjToString(l1["title"])
+							if t := util.Int64All(l1["publishtime"]); t > 0 {
+								m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+							}
+							m["infoid"] = util.ObjToString(l1["infoid"])
+							m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
+							m["createtime"] = time.Now().Format(util.Date_Full_Layout)
+							MgoPro.Save("projectset_comb_temp1", m)
+							//MysqlTool.Insert("dwd_f_nzj_follw_record", m)
+						}
+					}
+					//if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
+					//	s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
+					//	if s <= 0 {
+					//		saveEnt := make(map[string]interface{})
+					//		saveEnt["proposed_id"] = id
+					//		saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+					//		saveEnt["name"] = buyer
+					//		if eid := redis.GetStr("ent_id", buyer); eid != "" {
+					//			arr := strings.Split(eid, "_")
+					//			saveEnt["name_id"] = arr[0]
+					//			if len(arr) == 2 {
+					//				saveEnt["area_code"] = arr[1]
+					//			} else if len(arr) == 3 {
+					//				saveEnt["city_code"] = arr[2]
+					//			}
+					//			info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
+					//			if info != nil && len(*info) > 0 {
+					//				saveEnt["address"] = (*info)[0]["address"]
+					//			}
+					//		}
+					//		saveEnt["identity_type"] = 2
+					//		saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+					//		saveEntPool1 <- saveEnt
+					//	}
+					//}
+					//if winner := util.ObjToString((*info)["buyer"]); winner != "" {
+					//	for _, w := range strings.Split(winner, ",") {
+					//		s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
+					//		if s <= 0 {
+					//			saveEnt := make(map[string]interface{})
+					//			saveEnt["proposed_id"] = id
+					//			saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+					//			saveEnt["name"] = w
+					//			if eid := redis.GetStr("ent_id", w); eid != "" {
+					//				arr := strings.Split(eid, "_")
+					//				saveEnt["name_id"] = arr[0]
+					//				if len(arr) == 2 {
+					//					saveEnt["area_code"] = arr[1]
+					//				} else if len(arr) == 3 {
+					//					saveEnt["city_code"] = arr[2]
+					//				}
+					//				info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
+					//				if info != nil && len(*info) > 0 {
+					//					saveEnt["address"] = (*info)[0]["address"]
+					//				}
+					//			}
+					//			saveEnt["identity_type"] = 3
+					//			saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+					//			saveEntPool1 <- saveEnt
+					//		}
+					//	}
+					//}
+				}
+
+				//size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
+				//info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
+				//MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
+			}
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+// @Description 施工准备(06)、施工(07)、设计(05)
+// @Author J 2023/4/21 14:45
+func tagFunc(info map[string]interface{}) string {
+	tag := taskFuc1(info)
+	if tag["project_stage"] != "" {
+		return util.ObjToString(tag["project_stage"])
+	}
+	if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" {
+		return "06"
+	}
+	return "00"
+}
+
+// @Description 在建项目增量
+// @Author J 2023/4/24 13:58
+func taskAA() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+
+	ch := make(chan bool, config.Conf.Serve.Thread)
+	wg := &sync.WaitGroup{}
+
+	q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%2000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
+			config.Conf.Serve.Pici = pc
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			if str := redis.GetStr(config.Conf.DB.Redis.Pcode, id); str != "" {
+				strs := strings.Split(str, "-")
+				if len(tmp["list"].([]interface{})) != util.IntAll(strs[1]) {
+					for _, info := range tmp["list"].([]interface{}) {
+						info1 := info.(map[string]interface{})
+
+					}
+				}
+			} else {
+
+			}
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
+}
+
+// @Description 拟建项目增量
+// @Author J 2023/4/24 13:59
+func taskBB() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+
+	ch := make(chan bool, config.Conf.Serve.Thread)
+	wg := &sync.WaitGroup{}
+
+	//q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%2000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			if ids, ok := tmp["ids"].([]interface{}); ok {
+				//id := mongodb.BsonIdToSId(tmp["_id"])
+
+				for _, p := range ids {
+					p1 := p.(map[string]interface{})
+					info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
+					if list, ok1 := (*info)["list"].([]interface{}); ok1 {
+						for _, l := range list {
+							l1 := l.(map[string]interface{})
+							m := make(map[string]interface{})
+							m["project_stage_code"] = tagFunc(l1)
+							m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+							m["title"] = util.ObjToString(l1["title"])
+							if t := util.Int64All(l1["publishtime"]); t > 0 {
+								m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+							}
+							m["infoid"] = util.ObjToString(l1["infoid"])
+							m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
+							m["createtime"] = time.Now().Format(util.Date_Full_Layout)
+							MgoPro.Save("projectset_comb_temp1", m)
+							//MysqlTool.Insert("dwd_f_nzj_follw_record", m)
+						}
+					}
+					//if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
+					//	s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
+					//	if s <= 0 {
+					//		saveEnt := make(map[string]interface{})
+					//		saveEnt["proposed_id"] = id
+					//		saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+					//		saveEnt["name"] = buyer
+					//		if eid := redis.GetStr("ent_id", buyer); eid != "" {
+					//			arr := strings.Split(eid, "_")
+					//			saveEnt["name_id"] = arr[0]
+					//			if len(arr) == 2 {
+					//				saveEnt["area_code"] = arr[1]
+					//			} else if len(arr) == 3 {
+					//				saveEnt["city_code"] = arr[2]
+					//			}
+					//			info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
+					//			if info != nil && len(*info) > 0 {
+					//				saveEnt["address"] = (*info)[0]["address"]
+					//			}
+					//		}
+					//		saveEnt["identity_type"] = 2
+					//		saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+					//		saveEntPool1 <- saveEnt
+					//	}
+					//}
+					//if winner := util.ObjToString((*info)["buyer"]); winner != "" {
+					//	for _, w := range strings.Split(winner, ",") {
+					//		s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
+					//		if s <= 0 {
+					//			saveEnt := make(map[string]interface{})
+					//			saveEnt["proposed_id"] = id
+					//			saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+					//			saveEnt["name"] = w
+					//			if eid := redis.GetStr("ent_id", w); eid != "" {
+					//				arr := strings.Split(eid, "_")
+					//				saveEnt["name_id"] = arr[0]
+					//				if len(arr) == 2 {
+					//					saveEnt["area_code"] = arr[1]
+					//				} else if len(arr) == 3 {
+					//					saveEnt["city_code"] = arr[2]
+					//				}
+					//				info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
+					//				if info != nil && len(*info) > 0 {
+					//					saveEnt["address"] = (*info)[0]["address"]
+					//				}
+					//			}
+					//			saveEnt["identity_type"] = 3
+					//			saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+					//			saveEntPool1 <- saveEnt
+					//		}
+					//	}
+					//}
+				}
+
+				//size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
+				//info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
+				//MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
+			}
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+var saveEntPool1 = make(chan map[string]interface{}, 5000)
+var saveEntSp1 = make(chan bool, 1)
+
+func SaveEntFunc1(table string, arr []string) {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveEntPool1:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveEntSp1 <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEntSp1
+					}()
+					MysqlTool.InsertBulk(table, arr, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveEntSp1 <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEntSp1
+					}()
+					MysqlTool.InsertBulk(table, arr, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0

BIN
proposed_project/proposed_project


+ 95 - 3
proposed_project/tagTask.go

@@ -89,7 +89,7 @@ func taskRun() {
 	ch := make(chan bool, config.Conf.Serve.Thread)
 	wg := &sync.WaitGroup{}
 
-	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.Coll).Find(nil).Select(SelectF).Iter()
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(SelectF).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {
@@ -250,6 +250,98 @@ func taskFuc(tmp map[string]interface{}) map[string]string {
 	return tag
 }
 
+func taskFuc1(tmp map[string]interface{}) map[string]string {
+	tag := make(map[string]string) // 打上的标签
+
+	for _, v := range StageCode {
+		// 同个类型的标签如果存在,就不需要再打
+		if tag[v.tagName] != "" {
+			continue
+		}
+		// 排除词
+		if len(v.excludeField) > 0 && len(v.excludeKeyReg) > 0 {
+			for _, f := range v.excludeField {
+				if val := util.ObjToString(tmp[f]); val != "" {
+					for _, e1 := range v.excludeKeyReg {
+						flag := false
+						if e1.regs != nil && e1.regs.MatchString(val) {
+							flag = true
+						} else {
+							// && 特殊处理
+							if strings.Contains(e1.keyStr, "&&") {
+								for _, s := range strings.Split(e1.keyStr, "&&") {
+									if strings.Contains(val, s) {
+										flag = true
+										break
+									}
+								}
+							}
+						}
+						if flag {
+							goto L
+						}
+					}
+				}
+			}
+		}
+		// 清理词
+		if len(v.clearKey) > 0 && len(v.matchField) > 0 {
+			for _, s := range v.clearKey {
+				for _, f := range v.matchField {
+					if val := util.ObjToString(tmp[f]); val != "" {
+						tmp[f] = strings.ReplaceAll(val, s, "")
+					}
+				}
+			}
+		}
+		// 关键词
+		if len(v.matchField) > 0 && len(v.matchKeyReg) > 0 {
+			for _, f := range v.matchField {
+				if val := util.ObjToString(tmp[f]); val != "" {
+					for _, r1 := range v.matchKeyReg {
+						if r1.regs.MatchString(val) {
+							if len(v.addField) > 0 && len(v.addKeyReg) > 0 {
+								// 匹配附加词
+								isCt := false
+								for _, f1 := range v.addField {
+									if v1 := util.ObjToString(tmp[f1]); v1 != "" {
+										for _, r2 := range v.addKeyReg {
+											if r2.regs != nil && r2.regs.MatchString(v1) {
+												isCt = true
+											} else {
+												// && 特殊处理
+												if strings.Contains(r2.keyStr, "&&") {
+													flag := true
+													for _, s := range strings.Split(r2.keyStr, "&&") {
+														if !strings.Contains(v1, s) {
+															flag = false
+															break
+														}
+													}
+													if flag {
+														isCt = true
+													}
+												}
+											}
+										}
+									}
+								}
+								if isCt {
+									tag[v.tagName] = v.tagCode
+								}
+							} else {
+								tag[v.tagName] = v.tagCode
+							}
+						}
+					}
+				}
+			}
+		}
+	L:
+	}
+	return tag
+}
+
 func UpdateMethod() {
 	arru := make([][]map[string]interface{}, saveSize)
 	indexu := 0
@@ -264,7 +356,7 @@ func UpdateMethod() {
 					defer func() {
 						<-updateSp
 					}()
-					MgoPro.UpdateBulk(config.Conf.DB.MongoP.Coll, arru...)
+					MgoPro.UpdateBulk(config.Conf.DB.MongoP.ProposedColl, arru...)
 				}(arru)
 				arru = make([][]map[string]interface{}, saveSize)
 				indexu = 0
@@ -276,7 +368,7 @@ func UpdateMethod() {
 					defer func() {
 						<-updateSp
 					}()
-					MgoPro.UpdateBulk(config.Conf.DB.MongoP.Coll, arru...)
+					MgoPro.UpdateBulk(config.Conf.DB.MongoP.ProposedColl, arru...)
 				}(arru[:indexu])
 				arru = make([][]map[string]interface{}, saveSize)
 				indexu = 0

+ 1 - 0
proposed_project/tidbTask.go

@@ -79,6 +79,7 @@ func InitTagCode() {
 	}
 	TagCode["nature"].(map[string]interface{})["00"] = "其它"
 	TagCode["project_stage"].(map[string]interface{})["00"] = "其它"
+	TagCode["category"].(map[string]interface{})["04"] = "其它工程"
 	log.Info("InitTagCode", zap.Any("TagCode", TagCode))
 }
 

+ 17 - 0
proposed_project/util.go

@@ -353,3 +353,20 @@ func diffNatureDays(t1, t2 int64) int {
 
 	return diffDays
 }
+
+type FollwRecord struct {
+	Proposed_id        string
+	Infoid             string
+	Title              string
+	Project_stage_code string
+	Jybxhref           string
+	Project_scale      string
+	Publishtime        int64
+	Createtime         int64
+}
+
+type ByPtime []FollwRecord
+
+func (a ByPtime) Len() int           { return len(a) }
+func (a ByPtime) Less(i, j int) bool { return a[i].Publishtime > a[j].Publishtime }
+func (a ByPtime) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }