Jianghan 2 éve
szülő
commit
67ea62fbdb

+ 4 - 4
proposed_project/common.toml

@@ -1,7 +1,7 @@
 
 [serve]
 pici = 1669942800
-thread = 10
+thread = 1
 tagrule = "nijian_rule"
 procoll = "projectset_proposed"
 jyhref = "https://www.jianyu360.cn/article/content/%s.html"
@@ -23,7 +23,7 @@ maxconn = 20
 maxquerytime = "10s"
 
 [db.mongoB]
-addr = "127.0.0.1:27092"
+addr = "192.168.3.207:27092"
 dbname = "wjh"
 coll = "bidding"
 size = 15
@@ -31,9 +31,9 @@ user = ""
 password = ""
 
 [db.mongoP]
-addr = "127.0.0.1:27092"
+addr = "192.168.3.207:27092"
 dbname = "wjh"
-coll = "bidding"
+coll = "projectset_proposed"
 size = 15
 user = ""
 password = ""

+ 11 - 3
proposed_project/init.go

@@ -11,9 +11,9 @@ import (
 )
 
 var (
-	MgoBid, MgoPro *mongodb.MongodbSim //mongodb连接
-	MysqlTool      *mysqldb.Mysql
-	saveSize       int
+	MgoBid, MgoPro        *mongodb.MongodbSim //mongodb连接
+	MysqlTool, MysqlTool1 *mysqldb.Mysql
+	saveSize              int
 
 	LastId     string //增量拟建数据最后id
 	TaskSingle bool
@@ -98,6 +98,14 @@ func InitMysql() {
 		PassWord: dbcfg.Password,
 	}
 	MysqlTool.Init()
+
+	MysqlTool1 = &mysqldb.Mysql{
+		Address:  dbcfg.Addr,
+		DBName:   dbcfg.DbnameBasic,
+		UserName: dbcfg.User,
+		PassWord: dbcfg.Password,
+	}
+	MysqlTool1.Init()
 }
 
 type Pname struct {

+ 85 - 2
proposed_project/main.go

@@ -15,6 +15,7 @@ import (
 	"net"
 	"os"
 	"proposed_project/config"
+	"sync"
 	"time"
 )
 
@@ -31,6 +32,7 @@ func main() {
 	rootCmd.AddCommand(projectAdd())
 	rootCmd.AddCommand(tidbSave())
 	rootCmd.AddCommand(tidbAddSave())
+	rootCmd.AddCommand(redisSave())
 
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
@@ -166,11 +168,13 @@ func tidbSave() *cobra.Command {
 			InitMysql()
 			InitArea()
 			//go SaveFunc("dwd_f_nzj_baseinfo_new", BaseField)
-			go SaveRFunc("dwd_f_nzj_follw_record_new", RecordField)
+			//go SaveRFunc("dwd_f_nzj_follw_record_new", RecordField)
 			//go SaveCFunc("dwd_f_nzj_contact_new", ContactField)
-			go SaveCyFunc("dwd_f_nzj_category_tags_new", CategoryField)
+			//go SaveCyFunc("dwd_f_nzj_category_tags_new", CategoryField)
+			go SaveEntFunc("dwd_f_nzj_ent", EntField)
 
 			redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
+
 			taskTidb(nil)
 			c := make(chan bool, 1)
 			<-c
@@ -216,6 +220,20 @@ func tidbAddSave() *cobra.Command {
 	return cmdClient
 }
 
+func redisSave() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "redis",
+		Short: "Start processing project save to tidb",
+		Run: func(cmd *cobra.Command, args []string) {
+			InitMysql()
+			redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
+
+			redisDisp()
+		},
+	}
+	return cmdClient
+}
+
 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	switch act {
 	case udp.OP_TYPE_DATA:
@@ -255,6 +273,71 @@ func taskQ() (string, string) {
 	}
 }
 
+func redisDisp() {
+	pool := make(chan bool, 5) //控制线程数
+	wg := &sync.WaitGroup{}
+
+	finalId := 0
+	lastInfo := MysqlTool1.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+	util.Debug("taskIterateSql---", "finally id", finalId)
+	lastid, count := 0, 0
+	for {
+		util.Debug("重新查询,lastid---", lastid)
+		q := fmt.Sprintf("SELECT id, name, name_id, area_code, city_code FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_ent_baseinfo", lastid)
+		rows, err := MysqlTool1.DB.Query(q)
+		if err != nil {
+			util.Debug("taskIterateSql---", err)
+		}
+		columns, err := rows.Columns()
+		if finalId == lastid {
+			util.Debug("----finish----------", count)
+			break
+		}
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				util.Debug("taskIterateSql---", err)
+				break
+			}
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+			lastid = util.IntAll(ret["id"])
+			count++
+			if count%20000 == 0 {
+				util.Debug("current-------", count, lastid)
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				redis.PutCKV("ent_id", util.ObjToString(tmp["name"]),
+					fmt.Sprintf("%s_%s_%s", util.ObjToString(tmp["name_id"]), util.ObjToString(tmp["area_code"]), util.ObjToString(tmp["city_code"])))
+
+			}(ret)
+			ret = make(map[string]interface{})
+		}
+		_ = rows.Close()
+		wg.Wait()
+	}
+}
+
 func saveMethod() {
 	arru := make([]map[string]interface{}, saveSize)
 	indexu := 0

+ 8 - 4
proposed_project/merge.go

@@ -10,6 +10,7 @@ import (
 	"reflect"
 	"sort"
 	"strconv"
+	"sync"
 	"time"
 	"unicode/utf8"
 )
@@ -42,14 +43,18 @@ func startProjectMerge(tmp map[string]interface{}, info *Info) {
 		AllPidMap[info.Id] = &ID{P: pinfo}
 		AllPidMapLock.Unlock()
 		AllPnMapLock.Lock()
-		res := AllPnMap[info.ProjectName]
+		res := AllPnMap[info.Site]
 		if res != nil {
-			res.Id[info.Id] = info.ProjectName
+			res.Lock.Lock()
+			res.Id[info.ProjectName] = info.Id
+			res.Lock.Unlock()
 		} else {
 			res = &Pname{
-				Id: map[string]string{info.Id: info.ProjectName},
+				Id:   map[string]string{info.ProjectName: info.Id},
+				Lock: sync.Mutex{},
 			}
 		}
+		AllPnMap[info.Site] = res
 		AllPnMapLock.Unlock()
 	}
 }
@@ -75,7 +80,6 @@ func getCompareIds(pname, site string) (IDArr []*ID) {
 			IDArr = append(IDArr, ID)
 		}
 	}
-
 	return IDArr
 
 }

+ 1 - 1
proposed_project/proTask.go

@@ -66,7 +66,7 @@ func doTask(gtid, lteid string) {
 		"kvtext":       0,
 	}
 	log.Info("doTask", zap.Any("q", q))
-	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("nzj_bidding").Find(q).Select(f).Sort("publishtime").Iter()
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(f).Sort("publishtime").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%2000 == 0 {

+ 125 - 46
proposed_project/tidbTask.go

@@ -25,6 +25,8 @@ var (
 	saveCtSp     = make(chan bool, 1)
 	saveCyPool   = make(chan map[string]interface{}, 5000)
 	saveCySp     = make(chan bool, 1)
+	saveEntPool  = make(chan map[string]interface{}, 5000)
+	saveEntSp    = make(chan bool, 1)
 
 	BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode",
 		"approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr",
@@ -33,6 +35,7 @@ var (
 	RecordField   = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"}
 	ContactField  = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"}
 	CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"}
+	EntField      = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "createtime"}
 	AreaCode      = make(map[string]string, 5000)
 )
 
@@ -153,54 +156,72 @@ func taskTidb(q map[string]interface{}) {
 			//	}
 			//}
 			//saveBasePool <- saveM
-			saveCy := make(map[string]interface{})
-			saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
-			saveCy["labelcode"] = "category_code"
-			saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
-			saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
-			saveCyPool <- saveCy
-
-			for _, v := range tmp["list"].([]interface{}) {
-				saveRc := make(map[string]interface{})
-				v1 := v.(map[string]interface{})
-				saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
-				infoid := util.ObjToString(v1["infoid"])
-				saveRc["infoid"] = infoid
-				saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
-				saveRc["follow_num"] = v1["follow_num"]
-				saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
-				saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
-				saveRc["title"] = util.ObjToString(v1["title"])
-				if t := util.Int64All(v1["publishtime"]); t > 0 {
-					saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			//saveCy := make(map[string]interface{})
+			//saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+			//saveCy["labelcode"] = "category_code"
+			//saveCy["labelvalues"] = util.ObjToString(tmp["category_code"])
+			//saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
+			//saveCyPool <- saveCy
+			if ow := util.ObjToString(tmp["owner"]); ow != "" {
+				saveEnt := make(map[string]interface{})
+				saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+				saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+				saveEnt["name"] = ow
+				if eid := redis.GetStr("ent_id", ow); eid != "" {
+					arr := strings.Split(eid, "_")
+					saveEnt["name_id"] = arr[0]
+					if len(arr) == 2 {
+						saveEnt["area_code"] = arr[1]
+					} else if len(arr) == 3 {
+						saveEnt["city_code"] = arr[2]
+					}
 				}
-				saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
-				saveRcPool <- saveRc
-
-				//if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
-				//	saveCt := make(map[string]interface{})
-				//	saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
-				//	saveCt["infoid"] = infoid
-				//	saveCt["follow_num"] = tmp["follow_num"]
-				//	if b := util.ObjToString(tmp["owner"]); b != "" {
-				//		saveCt["name"] = util.ObjToString(tmp["owner"])
-				//		if eid := redis.GetStr("ent_id", b); eid != "" {
-				//			saveCt["name_id"] = strings.Split(eid, "_")[0]
-				//		}
-				//	}
-				//	if p := util.ObjToString(v1["project_person"]); p != "" {
-				//		saveCt["contact_name"] = p
-				//	}
-				//	if p := util.ObjToString(v1["project_phone"]); p != "" {
-				//		saveCt["contact_tel"] = p
-				//	}
-				//	if p := util.ObjToString(v1["projectaddr"]); p != "" {
-				//		saveCt["contact_addr"] = p
-				//	}
-				//	saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
-				//	saveCtPool <- saveCt
-				//}
+				saveEnt["identify_type"] = 1
+				saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+				saveEntPool <- saveEnt
 			}
+
+			//for _, v := range tmp["list"].([]interface{}) {
+			//	saveRc := make(map[string]interface{})
+			//	v1 := v.(map[string]interface{})
+			//	saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+			//	infoid := util.ObjToString(v1["infoid"])
+			//	saveRc["infoid"] = infoid
+			//	saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
+			//	saveRc["follow_num"] = v1["follow_num"]
+			//	saveRc["project_scale"] = util.ObjToString(v1["project_scale"])
+			//	saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"])
+			//	saveRc["title"] = util.ObjToString(v1["title"])
+			//	if t := util.Int64All(v1["publishtime"]); t > 0 {
+			//		saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			//	}
+			//	saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout)
+			//	saveRcPool <- saveRc
+			//
+			//	//if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" {
+			//	//	saveCt := make(map[string]interface{})
+			//	//	saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+			//	//	saveCt["infoid"] = infoid
+			//	//	saveCt["follow_num"] = tmp["follow_num"]
+			//	//	if b := util.ObjToString(tmp["owner"]); b != "" {
+			//	//		saveCt["name"] = util.ObjToString(tmp["owner"])
+			//	//		if eid := redis.GetStr("ent_id", b); eid != "" {
+			//	//			saveCt["name_id"] = strings.Split(eid, "_")[0]
+			//	//		}
+			//	//	}
+			//	//	if p := util.ObjToString(v1["project_person"]); p != "" {
+			//	//		saveCt["contact_name"] = p
+			//	//	}
+			//	//	if p := util.ObjToString(v1["project_phone"]); p != "" {
+			//	//		saveCt["contact_tel"] = p
+			//	//	}
+			//	//	if p := util.ObjToString(v1["projectaddr"]); p != "" {
+			//	//		saveCt["contact_addr"] = p
+			//	//	}
+			//	//	saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+			//	//	saveCtPool <- saveCt
+			//	//}
+			//}
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
@@ -333,6 +354,29 @@ func taskB(tmp map[string]interface{}) {
 		saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout)
 		MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy)
 	}
+	info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "")
+	if info2 != nil && len(*info2) > 0 {
+
+	} else {
+		if ow := util.ObjToString(tmp["owner"]); ow != "" {
+			saveEnt := make(map[string]interface{})
+			saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
+			saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
+			saveEnt["name"] = ow
+			if eid := redis.GetStr("ent_id", ow); eid != "" {
+				arr := strings.Split(eid, "_")
+				saveEnt["name_id"] = arr[0]
+				if len(arr) == 2 {
+					saveEnt["area_code"] = arr[1]
+				} else if len(arr) == 3 {
+					saveEnt["city_code"] = arr[2]
+				}
+			}
+			saveEnt["identify_type"] = 1
+			saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
+			MysqlTool.Insert("dwd_f_nzj_ent", saveEnt)
+		}
+	}
 }
 
 func taskE(tmp map[string]interface{}) {
@@ -525,3 +569,38 @@ func SaveCyFunc(table string, arr []string) {
 		}
 	}
 }
+
+func SaveEntFunc(table string, arr []string) {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveEntPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveEntSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEntSp
+					}()
+					MysqlTool.InsertBulk(table, arr, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveEntSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveEntSp
+					}()
+					MysqlTool.InsertBulk(table, arr, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}