Эх сурвалжийг харах

Merge branch 'master' of https://jygit.jydev.jianyu360.cn/data_processing/data_ent_wuye

xuzhiheng 1 жил өмнө
parent
commit
314f92c9d8

+ 89 - 2
ent_contact/contact_full.go

@@ -3,6 +3,9 @@ package ent_contact
 import (
 	"context"
 	ul "data_ent_wuye/ent_util"
+	"database/sql"
+	"fmt"
+	"sync"
 	"time"
 
 	"log"
@@ -10,6 +13,90 @@ import (
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 )
 
+func InjectContactTidbFullInfo() {
+	pool := make(chan bool, 5) //控制线程数
+	wg := &sync.WaitGroup{}
+	finalId := 0
+	lastInfo := ul.MysqlGlobalTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", ul.G_Units_Contact))
+	if len(*lastInfo) > 0 {
+		finalId = qu.IntAll((*lastInfo)[0]["id"])
+	}
+	log.Println("last id", finalId)
+	lastid, count := 0, 0
+	for {
+		log.Println("重新查询,lastid---", lastid)
+		q := fmt.Sprintf("SELECT id,name_id,contact_name,contact_tel,contact_email,source_type FROM %s WHERE id > %d ORDER BY id ASC limit 10000", ul.G_Units_Contact, lastid)
+		var stmtOut *sql.Stmt
+		var tx *sql.Tx
+		var err error
+		if tx == nil {
+			stmtOut, err = ul.MysqlGlobalTool.DB.Prepare(q)
+		} else {
+			stmtOut, err = tx.Prepare(q)
+		}
+		rows, err := stmtOut.Query()
+		if err != nil {
+			log.Println("err : ", err)
+		}
+		columns, err := rows.Columns()
+		if finalId == lastid {
+			log.Println("is over ...", count)
+			break
+		}
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Println("---", err)
+				break
+			}
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+			lastid = qu.IntAll(ret["id"])
+			count++
+			if count%5000 == 0 {
+				log.Println("cur index", count, lastid)
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				name_id := qu.ObjToString(tmp["name_id"])
+				name := qu.ObjToString(tmp["contact_name"])
+				phone := qu.ObjToString(tmp["contact_tel"])
+				email := qu.ObjToString(tmp["contact_email"])
+				source_type := qu.IntAll(tmp["source_type"])
+				createtime := time.Now().Unix()
+				query := "INSERT INTO information.ent_contact(id,phone,name,email,source_type,create_time,update_time) VALUES(?,?,?,?,?,?,?)"
+				//插入数据
+				err := ul.ClickHouseConn.Exec(context.Background(), query, name_id, phone, name, email, source_type, createtime, createtime)
+				if err != nil {
+					log.Println(err)
+				}
+
+			}(ret)
+			ret = make(map[string]interface{})
+		}
+		_ = rows.Close()
+		stmtOut.Close()
+		wg.Wait()
+	}
+	log.Println("is over ...")
+}
+
 // from tidb
 func InjectContactTidbInfo() {
 	index, total := 0, 0
@@ -18,6 +105,7 @@ func InjectContactTidbInfo() {
 			"lt": "2024-04-19 00:00:00",
 		},
 	}
+	log.Println("tidb...query", q)
 L:
 	for {
 		dataArr := ul.MysqlGlobalTool.Find(ul.G_Units_Contact, q, "name_id,contact_name,contact_tel,contact_email,source_type", "id", index*50000, 50000)
@@ -25,7 +113,6 @@ L:
 			if len(*dataArr) == 0 {
 				break
 			}
-
 			for _, v := range *dataArr {
 				name_id := qu.ObjToString(v["name_id"])
 				name := qu.ObjToString(v["contact_name"])
@@ -45,7 +132,7 @@ L:
 		}
 		index++
 		total += len(*dataArr)
-		if index%5 == 0 {
+		if index%1 == 0 {
 			log.Println("cur idx ", total)
 		}
 	}

+ 3 - 3
ent_util/init.go

@@ -39,11 +39,11 @@ const (
 func InitGlobalVar() {
 	IsLocal = false
 	qu.ReadConfig(&SysConfig) //加载配置文件
-	initMgo()
+	//initMgo()
 	initMysql()
-	initVCode()
 	initClickHouse()
-	initEs()
+	//initVCode()
+	//initEs()
 }
 
 // 初始化mgo