Ver Fonte

中标单位从tidb 获取数据

wcc há 1 ano atrás
pai
commit
11c529557d

+ 4 - 0
createEsIndex/buyertask.go

@@ -49,6 +49,7 @@ func buyerOnce() {
 		query := fmt.Sprintf(`
  SELECT
                b.name, 
+			   b.seo_id,
                t.id,
                t.name_id, 
 			   b.company_id,
@@ -141,6 +142,9 @@ func buyerOnce() {
 				data := make(map[string]interface{}, 0)
 				data["name"] = name
 				data["name_id"] = ret["name_id"]
+				if ret["seo_id"] != nil {
+					data["seo_id"] = ret["seo_id"]
+				}
 				data["id"] = ret["name_id"]
 				data["buyer_name"] = name
 				data["province"] = ret["area"]

+ 5 - 5
createEsIndex/common.toml

@@ -2,8 +2,8 @@
     locport = ":17834"
     jyaddr = "127.0.0.1"
     jyport = 11118
-    neaddr = "127.0.0.1" ## 转发下个 es地址,
-    neport = 1784
+#    neaddr = "127.0.0.1" ## 转发下个 es地址,
+#    neport = 1784
 
 [db]
 [db.mongoB] ## bidding标讯数据
@@ -57,8 +57,8 @@
     bucketname = "topjy"
     filesize = 500000  ## 单位字节,附件总字节长度限制;超过就不再读取
 [db.es]
-    addr = "http://127.0.0.1:19805"      ## 正常bidding 链接
-#    addr = "http://192.168.3.149:9201"      ## 正常bidding 链接
+#    addr = "http://127.0.0.1:19805"      ## 正常bidding 链接
+    addr = "http://192.168.3.149:9201"      ## 正常bidding 链接
 #    addrp = "http://192.168.3.149:9201"   ## 采集使用的单机版地址
 username = "es_all"
 password = "TopJkO2E_d1x"
@@ -66,7 +66,7 @@ password = "TopJkO2E_d1x"
     indexb = "bidding"
 #    indextmp = "bidding_temporary"       ## 临时索引,其他程序需要
     indexp = "projectset_v1"
-    indexwinner = "winner"
+    indexwinner = "winner_v1"
     indexbuyer = "buyer_v2"
 detailfilter = ["(招标网|千里马|采招网|招标采购导航网|招标与采购网|中国招投标网|中国采购与招标网|中国采购与招标|优质采)[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}(qianlima|zhaobiao|okcis|zbytb|infobidding|bidcenter|youzhicai|chinabidding|Chinabidding|CHINABIDDING)[a-z0-9.\\/\\/]{0,40}",
     "招标网[\\w\\W]{0,15}[http|https|htpps]?[a-z0-9:\\/\\/.]{0,20}zhaobiao[a-z0-9.\\/\\/]{0,40}",

+ 22 - 13
createEsIndex/init.go

@@ -94,18 +94,7 @@ func InitMgo() {
 		log.Error("InitMgo", zap.String("MongoQ", "地址或者数据库为空"))
 	}
 	log.Info("InitMgo", zap.Any("MgoQ duration", time.Since(now).Seconds()))
-	//采购单位
-	Mysql = &mysqldb.Mysql{
-		Address:  config.Conf.DB.MysqlB.Addr,
-		DBName:   config.Conf.DB.MysqlB.Dbname,
-		UserName: config.Conf.DB.MysqlB.Username,
-		PassWord: config.Conf.DB.MysqlB.Password,
-	}
-	Mysql.Init()
-	if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
-		log.Error("InitMgo", zap.String("Mysql", "地址或者数据库为空"))
-	}
-	log.Info("InitMgo", zap.Any("MysqlB duration", time.Since(now).Seconds()))
+
 	//181 特殊企业,采购单位验证
 	MgoS = &mongodb.MongodbSim{
 		MongodbAddr: config.Conf.DB.MongoS.Addr,
@@ -121,6 +110,22 @@ func InitMgo() {
 	log.Info("InitMgo", zap.Any("MgoS duration", time.Since(now).Seconds()))
 }
 
+func InitMysql() {
+	//采购单位
+	now := time.Now()
+	Mysql = &mysqldb.Mysql{
+		Address:  config.Conf.DB.MysqlB.Addr,
+		DBName:   config.Conf.DB.MysqlB.Dbname,
+		UserName: config.Conf.DB.MysqlB.Username,
+		PassWord: config.Conf.DB.MysqlB.Password,
+	}
+	Mysql.Init()
+	if config.Conf.DB.MysqlB.Addr == "" || config.Conf.DB.MysqlB.Dbname == "" {
+		log.Error("InitMysql", zap.String("Mysql", "地址或者数据库为空"))
+	}
+	log.Info("InitMysql", zap.Any("MysqlB duration", time.Since(now).Seconds()))
+}
+
 func InitEs() {
 	now := time.Now()
 	Es = &elastic.Elastic{
@@ -150,9 +155,13 @@ func InitEs() {
 	}
 	if config.Conf.DB.Es.IndexWinner == "" {
 		log.Error("InitEs", zap.String("IndexWinner", "中标单位 索引为空,请检查"))
+	} else {
+		log.Debug("InitEs", zap.String("IndexWinner", config.Conf.DB.Es.IndexWinner))
 	}
 	if config.Conf.DB.Es.IndexBuyer == "" {
 		log.Error("InitEs", zap.String("IndexBuyer", "采购单位 索引为空,请检查"))
+	} else {
+		log.Debug("InitEs", zap.String("IndexBuyer", config.Conf.DB.Es.IndexBuyer))
 	}
 
 	Es1 = &elastic.Elastic{
@@ -171,7 +180,7 @@ func InitEs() {
 			Password: config.Conf.DB.Es.Password2,
 		}
 		Es2.InitElasticSize()
-		fmt.Println("es2", Es2)
+		log.Info("InitEs", zap.String("Addr2", config.Conf.DB.Es.Addr2))
 	}
 
 	log.Info("InitEs", zap.Any("duration", time.Since(now).Seconds()))

+ 17 - 0
createEsIndex/main.go

@@ -59,6 +59,7 @@ var (
 func init() {
 	config.Init("./common.toml")
 	InitLog()
+	InitMysql()
 	InitMgo()
 	InitEs()
 	InitField()
@@ -220,6 +221,22 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					}()
 					buyerOnce()
 				}()
+			case "winner_once": // 中标单位昨天增量数据
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					winnerEsTaskOnce()
+				}()
+			case "winner_all": // 中标单位存量数据
+				pool <- true
+				go func() {
+					defer func() {
+						<-pool
+					}()
+					winnerEsAll()
+				}()
 			default:
 				pool <- true
 				go func() {

+ 7 - 1
createEsIndex/projectall.toml

@@ -23,7 +23,13 @@ lteid = "62d9519d4d0d9b2bc2b402fa" ##
 [all.04]
 coll = "projectset_20230407"
 gtid = "62d9519d4d0d9b2bc2b402fa"
-lteid = "6476e4b7eb01e8efa62a676e" ## mongo表最新ID
+lteid = "6476e4b7eb01e8efa62a676e" ## mongo表最新ID   21722434
+
+[all.05]
+coll = "projectset_20230407"
+gtid = "6476e4b7eb01e8efa62a676e"
+lteid = "6526c6800000000000000000" ## 2023-10-12:00:00:00
+
 
 
 

+ 345 - 0
createEsIndex/winnertask.go

@@ -1,7 +1,9 @@
 package main
 
 import (
+	"context"
 	"esindex/config"
+	"fmt"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	"go.uber.org/zap"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
@@ -11,7 +13,350 @@ import (
 	"time"
 )
 
+//winnerEsTaskOnce 中标单位每天新增数据
 func winnerEsTaskOnce() {
+	defer util.Catch()
+	arrEs := []map[string]interface{}{}
+	winerEsLock := &sync.Mutex{}
+	defer util.Catch()
+
+	now := time.Now()
+	tarTime := time.Date(now.Year(), now.Month(), now.Day()-1, 00, 00, 00, 00, time.Local)
+	curTime := tarTime.Format("2006-01-02")
+
+	pool := make(chan bool, 15) //控制线程数
+	wg := &sync.WaitGroup{}
+	rowsPerPage := 10000
+
+	finalId := 0
+	lastSql := fmt.Sprintf(`
+	 SELECT
+             id
+
+         FROM
+            
+			dws_f_ent_baseinfo 
+
+           WHERE  ( createtime >= '%v' OR updatetime >= '%v') && (identity_type&(1<<1))>0  && company_id !='' 
+
+         ORDER BY id DESC  LIMIT 1
+`, curTime, curTime)
+
+	lastInfo := Mysql.SelectBySql(lastSql)
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+
+	log.Info("winnerEsTaskOnce", zap.Int("finalId", finalId))
+	lastid, total := 0, 0
+
+	for {
+		query := fmt.Sprintf(`
+SELECT
+              b.name,
+			  b.name_id,
+			  b.id,
+              b.company_id,
+			  b.seo_id,
+              c.area,
+              c.city,
+              b.createtime,
+              b.updatetime
+
+
+          FROM
+
+          dws_f_ent_baseinfo AS b 
+              LEFT JOIN code_area AS c ON b.city_code = c.code
+
+         WHERE b.id > %d && (identity_type&(1<<1))>0  && b.company_id !='' &&  ( b.createtime >= '%v' OR b.updatetime >= '%v')
+
+         ORDER BY b.id ASC
+
+         LIMIT %d;
+     `, lastid, curTime, curTime, rowsPerPage)
+
+		ctx := context.Background()
+		rows, err := Mysql.DB.QueryContext(ctx, query)
+		if err != nil {
+			log.Info("winnerEsTaskOnce", zap.Any("QueryContext err", err))
+
+		}
+
+		if finalId == lastid {
+			log.Info("winnerEsTaskOnce over", zap.Any("total", total), zap.Any("lastid", lastid))
+			break
+		}
+
+		columns, err := rows.Columns()
+		if err != nil {
+			log.Info("winnerEsTaskOnce", zap.Any("rows.Columns", err))
+		}
+
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Info("winnerEsTaskOnce", zap.Any("rows.Scan", err))
+				break
+			}
+
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+
+			total++
+			if total%1000 == 0 {
+				log.Info("winnerEsTaskOnce", zap.Int("current total", total), zap.Any("lastid", lastid))
+			}
+			lastid = util.IntAll(ret["id"])
+
+			pool <- true
+			wg.Add(1)
+
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				savetmp := map[string]interface{}{}
+				tmp_id := tmp["name_id"]
+				savetmp["_id"] = tmp_id
+				savetmp["id"] = tmp_id
+				savetmp["name"] = tmp["name"]
+				savetmp["winner_name"] = tmp["name"]
+				if tmp["seo_id"] != nil {
+					savetmp["seo_id"] = tmp["seo_id"]
+				}
+				createtime := tmp["createtime"].(time.Time)
+				savetmp["l_createtime"] = createtime.Unix()
+				savetmp["pici"] = createtime.Unix()
+
+				if tmp["updatetime"] != nil {
+					updatetime := tmp["updatetime"].(time.Time)
+					savetmp["pici"] = updatetime.Unix()
+				}
+
+				if province := util.ObjToString(tmp["area"]); province != "" {
+					savetmp["province"] = province
+				}
+				if city := util.ObjToString(tmp["city"]); city != "" {
+					savetmp["city"] = city
+				}
+
+				winerEsLock.Lock()
+				arrEs = append(arrEs, savetmp)
+				if len(arrEs) >= EsBulkSize {
+					tmps := arrEs
+					Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
+					arrEs = []map[string]interface{}{}
+				}
+				winerEsLock.Unlock()
+
+			}(ret)
+
+			ret = make(map[string]interface{})
+		}
+
+		rows.Close()
+		wg.Wait()
+
+		if err := rows.Err(); err != nil {
+			log.Info("winnerEsTaskOnce", zap.Any("err", err))
+		}
+
+	}
+
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
+		arrEs = []map[string]interface{}{}
+	}
+
+	log.Info("winnerEsTaskOnce", zap.Int("结束,total:", total))
+
+}
+
+//winnerEsAll 存量中标单位
+func winnerEsAll() {
+	arrEs := []map[string]interface{}{}
+	winerEsLock := &sync.Mutex{}
+	defer util.Catch()
+	pool := make(chan bool, 15) //控制线程数
+	wg := &sync.WaitGroup{}
+	rowsPerPage := 10000
+
+	finalId := 0
+	lastSql := fmt.Sprintf(`
+	 SELECT
+             id
+
+         FROM
+            
+			dws_f_ent_baseinfo 
+
+           WHERE (identity_type&(1<<1))>0  && company_id !=''
+
+         ORDER BY id DESC  LIMIT 1
+`)
+
+	lastInfo := Mysql.SelectBySql(lastSql)
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+
+	log.Info("winnerEsAll", zap.Int("finalId", finalId))
+	lastid, total := 0, 0
+
+	for {
+		query := fmt.Sprintf(`
+SELECT
+              b.name,
+			  b.name_id,
+			  b.id,
+              b.company_id,
+			  b.seo_id,
+              c.area,
+              c.city,
+              b.createtime,
+              b.updatetime
+
+
+          FROM
+
+          dws_f_ent_baseinfo AS b 
+              LEFT JOIN code_area AS c ON b.city_code = c.code
+
+         WHERE b.id > %d && (identity_type&(1<<1))>0  && b.company_id !=''
+
+         ORDER BY b.id ASC
+
+         LIMIT %d;
+     `, lastid, rowsPerPage)
+
+		ctx := context.Background()
+		rows, err := Mysql.DB.QueryContext(ctx, query)
+		if err != nil {
+			log.Info("winnerEsAll", zap.Any("QueryContext err", err))
+
+		}
+
+		if finalId == lastid {
+			log.Info("winnerEsAll over", zap.Any("total", total), zap.Any("lastid", lastid))
+			break
+		}
+
+		columns, err := rows.Columns()
+		if err != nil {
+			log.Info("winnerEsAll", zap.Any("rows.Columns", err))
+		}
+
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Info("winnerEsAll", zap.Any("rows.Scan", err))
+				break
+			}
+
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+
+			total++
+			if total%1000 == 0 {
+				log.Info("winnerEsAll", zap.Int("current total", total), zap.Any("lastid", lastid))
+			}
+			lastid = util.IntAll(ret["id"])
+
+			pool <- true
+			wg.Add(1)
+
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				savetmp := map[string]interface{}{}
+				tmp_id := tmp["name_id"]
+				savetmp["_id"] = tmp_id
+				savetmp["id"] = tmp_id
+				savetmp["name"] = tmp["name"]
+				savetmp["winner_name"] = tmp["name"]
+				if tmp["seo_id"] != nil {
+					savetmp["seo_id"] = tmp["seo_id"]
+				}
+				createtime := tmp["createtime"].(time.Time)
+				savetmp["l_createtime"] = createtime.Unix()
+				savetmp["pici"] = createtime.Unix()
+
+				if tmp["updatetime"] != nil {
+					updatetime := tmp["updatetime"].(time.Time)
+					savetmp["pici"] = updatetime.Unix()
+				}
+
+				if province := util.ObjToString(tmp["area"]); province != "" {
+					savetmp["province"] = province
+				}
+				if city := util.ObjToString(tmp["city"]); city != "" {
+					savetmp["city"] = city
+				}
+
+				winerEsLock.Lock()
+				arrEs = append(arrEs, savetmp)
+				if len(arrEs) >= EsBulkSize {
+					tmps := arrEs
+					Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
+					arrEs = []map[string]interface{}{}
+				}
+				winerEsLock.Unlock()
+
+			}(ret)
+
+			ret = make(map[string]interface{})
+		}
+
+		rows.Close()
+		wg.Wait()
+
+		if err := rows.Err(); err != nil {
+			log.Info("winnerEsAll", zap.Any("err", err))
+		}
+
+	}
+
+	if len(arrEs) > 0 {
+		tmps := arrEs
+		Es.BulkSave(config.Conf.DB.Es.IndexWinner, tmps)
+		arrEs = []map[string]interface{}{}
+	}
+
+	log.Info("winnerEsAll", zap.Int("结束,total:", total))
+}
+
+func winnerEsTaskOnceOld() {
 	defer util.Catch()
 	arrEs := []map[string]interface{}{}
 	winerEsLock := &sync.Mutex{}