|
@@ -3,6 +3,9 @@ package ent_contact
|
|
|
import (
|
|
|
"context"
|
|
|
ul "data_ent_wuye/ent_util"
|
|
|
+ "database/sql"
|
|
|
+ "fmt"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
"log"
|
|
@@ -10,6 +13,90 @@ import (
|
|
|
qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
)
|
|
|
|
|
|
+func InjectContactTidbFullInfo() {
|
|
|
+ pool := make(chan bool, 5) //控制线程数
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ finalId := 0
|
|
|
+ lastInfo := ul.MysqlGlobalTool.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", ul.G_Units_Contact))
|
|
|
+ if len(*lastInfo) > 0 {
|
|
|
+ finalId = qu.IntAll((*lastInfo)[0]["id"])
|
|
|
+ }
|
|
|
+ log.Println("last id", finalId)
|
|
|
+ lastid, count := 0, 0
|
|
|
+ for {
|
|
|
+ log.Println("重新查询,lastid---", lastid)
|
|
|
+ q := fmt.Sprintf("SELECT id,name_id,contact_name,contact_tel,contact_email,source_type FROM %s WHERE id > %d ORDER BY id ASC limit 10000", ul.G_Units_Contact, lastid)
|
|
|
+ var stmtOut *sql.Stmt
|
|
|
+ var tx *sql.Tx
|
|
|
+ var err error
|
|
|
+ if tx == nil {
|
|
|
+ stmtOut, err = ul.MysqlGlobalTool.DB.Prepare(q)
|
|
|
+ } else {
|
|
|
+ stmtOut, err = tx.Prepare(q)
|
|
|
+ }
|
|
|
+ rows, err := stmtOut.Query()
|
|
|
+ if err != nil {
|
|
|
+ log.Println("err : ", err)
|
|
|
+ }
|
|
|
+ columns, err := rows.Columns()
|
|
|
+ if finalId == lastid {
|
|
|
+ log.Println("is over ...", count)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ for rows.Next() {
|
|
|
+ scanArgs := make([]interface{}, len(columns))
|
|
|
+ values := make([]interface{}, len(columns))
|
|
|
+ ret := make(map[string]interface{})
|
|
|
+ for k := range values {
|
|
|
+ scanArgs[k] = &values[k]
|
|
|
+ }
|
|
|
+ err = rows.Scan(scanArgs...)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("---", err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ for i, col := range values {
|
|
|
+ if v, ok := col.([]uint8); ok {
|
|
|
+ ret[columns[i]] = string(v)
|
|
|
+ } else {
|
|
|
+ ret[columns[i]] = col
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lastid = qu.IntAll(ret["id"])
|
|
|
+ count++
|
|
|
+ if count%5000 == 0 {
|
|
|
+ log.Println("cur index", count, lastid)
|
|
|
+ }
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ name_id := qu.ObjToString(tmp["name_id"])
|
|
|
+ name := qu.ObjToString(tmp["contact_name"])
|
|
|
+ phone := qu.ObjToString(tmp["contact_tel"])
|
|
|
+ email := qu.ObjToString(tmp["contact_email"])
|
|
|
+ source_type := qu.IntAll(tmp["source_type"])
|
|
|
+ createtime := time.Now().Unix()
|
|
|
+ query := "INSERT INTO information.ent_contact(id,phone,name,email,source_type,create_time,update_time) VALUES(?,?,?,?,?,?,?)"
|
|
|
+ //插入数据
|
|
|
+ err := ul.ClickHouseConn.Exec(context.Background(), query, name_id, phone, name, email, source_type, createtime, createtime)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ }(ret)
|
|
|
+ ret = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ _ = rows.Close()
|
|
|
+ stmtOut.Close()
|
|
|
+ wg.Wait()
|
|
|
+ }
|
|
|
+ log.Println("is over ...")
|
|
|
+}
|
|
|
+
|
|
|
// from tidb
|
|
|
func InjectContactTidbInfo() {
|
|
|
index, total := 0, 0
|
|
@@ -18,6 +105,7 @@ func InjectContactTidbInfo() {
|
|
|
"lt": "2024-04-19 00:00:00",
|
|
|
},
|
|
|
}
|
|
|
+ log.Println("tidb...query", q)
|
|
|
L:
|
|
|
for {
|
|
|
dataArr := ul.MysqlGlobalTool.Find(ul.G_Units_Contact, q, "name_id,contact_name,contact_tel,contact_email,source_type", "id", index*50000, 50000)
|
|
@@ -25,7 +113,6 @@ L:
|
|
|
if len(*dataArr) == 0 {
|
|
|
break
|
|
|
}
|
|
|
-
|
|
|
for _, v := range *dataArr {
|
|
|
name_id := qu.ObjToString(v["name_id"])
|
|
|
name := qu.ObjToString(v["contact_name"])
|
|
@@ -45,7 +132,7 @@ L:
|
|
|
}
|
|
|
index++
|
|
|
total += len(*dataArr)
|
|
|
- if index%5 == 0 {
|
|
|
+ if index%1 == 0 {
|
|
|
log.Println("cur idx ", total)
|
|
|
}
|
|
|
}
|