Browse Source

物业项目交易信息

mxs 1 year ago
parent
commit
3738edddbe

+ 2 - 0
data_project_wy/README.md

@@ -0,0 +1,2 @@
+一、基于项目数据(project),生成物业项目信息
+二、基于增量招标信息(bidding),生成物业项目信息

+ 45 - 0
data_project_wy/clickhouse.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"github.com/ClickHouse/clickhouse-go/v2"
+	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"time"
+)
+
+// 创建clickhouse连接
+func InitClickHouse(addr []string, size int, database, username, password string) (driver.Conn, error) {
+	var (
+		ctx       = context.Background()
+		conn, err = clickhouse.Open(&clickhouse.Options{
+			//Addr: []string{"cc-2ze9tv451wov14w9e.clickhouse.ads.aliyuncs.com:9000"}, //内网
+			//Addr: []string{"cc-2ze9tv451wov14w9e.public.clickhouse.ads.aliyuncs.com:9000"}, //外网
+			Addr:         addr,
+			DialTimeout:  10 * time.Second,
+			MaxIdleConns: 3,
+			MaxOpenConns: size,
+			Auth: clickhouse.Auth{
+				//Database: "information",
+				//Username: "biservice",
+				//Password: "Bi_top95215#",
+				Database: database,
+				Username: username,
+				Password: password,
+			},
+			Debugf: func(format string, v ...interface{}) {
+				fmt.Printf(format, v)
+			},
+		})
+	)
+	if err != nil {
+		return nil, err
+	}
+	if err := conn.Ping(ctx); err != nil {
+		if exception, ok := err.(*clickhouse.Exception); ok {
+			fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
+		}
+		return nil, err
+	}
+	return conn, nil
+}

+ 26 - 0
data_project_wy/config.json

@@ -0,0 +1,26 @@
+{
+  "mgob": {
+    "addr": "192.168.3.166:27082",
+    "dbname": "qfw",
+    "size": 5,
+    "username": "",
+    "password": ""
+  },
+  "mgopro": {
+    "addr": "192.168.3.166:27082",
+    "dbname": "qfw",
+    "size": 5,
+    "username": "",
+    "password": ""
+  },
+  "clickhouse": {
+    "addr": ["192.168.3.207:19000"],
+    "database": "information",
+    "size": 20,
+    "username": "jytop",
+    "password": "pwdTopJy123"
+  },
+  "bidstarttime": 1713196800,
+  "prostarttime": 1713196800,
+  "startcron": "0 0 9 ? * *"
+}

+ 218 - 0
data_project_wy/history.go

@@ -0,0 +1,218 @@
+package main
+
+import (
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"sync"
+)
+
+// HisTransactionDataFromBid 历史bidding(指定截止comeintime)
+func HisTransactionDataFromBid() {
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"toptype": "采购意向",
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		//
+		"publishtime":           1,
+		"comeintime":            1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if gconv.Int64(tmp["comeintime"]) >= 1713196800 { //截止时间1713196800
+				return
+			}
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForBid(tmp)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
+// HisTransactionDataFromProject 历史project(指定截止pici:1713196800)
+func HisTransactionDataFromProject() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"pici": map[string]interface{}{
+			"$lt": 1713196800,
+			//"$gt": 1711900800,
+		},
+	}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		"zbtime":        1,
+		"jgtime":        1,
+		"bidstatus":     1,
+		//
+		"firsttime":             1,
+		"ids":                   1,
+		"pici":                  1,
+		"sourceinfoid":          1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoPro.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForPro(tmp)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy_back", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%10000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy_back", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("结束")
+}
+
+// HisTransactionDataAddInformation 补充字段信息
+func HisTransactionDataAddInformation() {
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{}
+	it := sess.DB(MgoPro.DbName).C("projectset_wy_back").Find(&query).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			update := []map[string]interface{}{
+				{"_id": tmp["_id"]},
+			}
+			set := map[string]interface{}{}
+			//法人信息
+			buyer_id, agency_id, winner_ids := FindEntInfoData(id, gconv.String(tmp["buyer"]), gconv.String(tmp["agency"]), gconv.Strings(tmp["winner"]))
+			set["buyer_id"] = buyer_id
+			set["agency_id"] = agency_id
+			set["winner_ids"] = winner_ids
+			//项目信息补充业态
+			if from := gconv.String(tmp["from"]); from == "project" {
+				project_id := gconv.String(tmp["project_id"])
+				pro, _ := MgoPro.FindById("projectset_20230904", project_id, map[string]interface{}{"property_form": 1})
+				if len(*pro) > 0 && (*pro)["property_form"] != nil {
+					set["property_form"] = (*pro)["property_form"]
+				}
+			}
+			update = append(update, map[string]interface{}{"$set": set})
+			lock.Lock()
+			arr = append(arr, update)
+			if len(arr) > 100 {
+				MgoPro.UpdateBulk("projectset_wy_back", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.UpdateBulk("projectset_wy_back", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	fmt.Println("迁移结束...")
+}

+ 91 - 0
data_project_wy/init.go

@@ -0,0 +1,91 @@
+package main
+
+import (
+	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+)
+
+type conf struct {
+	MgoB   db `json:"mgob"`
+	MgoPro db `json:"mgopro"`
+	//MysqlDb    db  `json:"mysqldb"`
+	ClickHouse   ckh    `json:"clickhouse"`
+	BidStartTime int64  `json:"bidstarttime"` //bidding增量起始id
+	ProStartTime int64  `json:"prostarttime"` //project增量起始id
+	StartCron    string `json:"startcron"`
+}
+type db struct {
+	Addr     string `json:"addr"`
+	DbName   string `json:"dbname"`
+	Size     int    `json:"size"`
+	Username string `json:"username"`
+	Password string `json:"password"`
+}
+type ckh struct {
+	Addr     []string `json:"addr"`
+	DataBase string   `json:"database"`
+	Size     int      `json:"size"`
+	Username string   `json:"username"`
+	Password string   `json:"password"`
+}
+
+var (
+	Config  conf
+	MgoB    *mongodb.MongodbSim //bidding
+	MgoPro  *mongodb.MongodbSim //project
+	CkhTool driver.Conn         //
+	//MysqlTool *mysqldb.Mysql
+	BidStartTime int64
+	ProStartTime int64
+)
+
+//var (
+//	TransactionSaveCache = make(chan map[string]interface{}, 1000) //
+//	Transaction_Ch       = make(chan bool, 5)
+//)
+
+func InitMgo() {
+	//bidding
+	MgoB = &mongodb.MongodbSim{
+		MongodbAddr: Config.MgoB.Addr,
+		DbName:      Config.MgoB.DbName,
+		Size:        Config.MgoB.Size,
+		UserName:    Config.MgoB.Username,
+		Password:    Config.MgoB.Password,
+	}
+	MgoB.InitPool()
+	//project
+	MgoPro = &mongodb.MongodbSim{
+		MongodbAddr: Config.MgoPro.Addr,
+		DbName:      Config.MgoPro.DbName,
+		Size:        Config.MgoPro.Size,
+		UserName:    Config.MgoPro.Username,
+		Password:    Config.MgoPro.Password,
+	}
+	MgoPro.InitPool()
+}
+
+//func InitMysql() {
+//	MysqlTool = &mysqldb.Mysql{
+//		Address:  Config.MysqlDb.Addr,
+//		DBName:   Config.MysqlDb.DbName,
+//		UserName: Config.MysqlDb.Username,
+//		PassWord: Config.MysqlDb.Password,
+//	}
+//	MysqlTool.Init()
+//}
+
+func InitCkh() {
+	CkhTool, _ = InitClickHouse(
+		Config.ClickHouse.Addr,
+		Config.ClickHouse.Size,
+		Config.ClickHouse.DataBase,
+		Config.ClickHouse.Username,
+		Config.ClickHouse.Password,
+	)
+}
+
+func InitOther() {
+	BidStartTime = Config.BidStartTime
+	ProStartTime = Config.ProStartTime
+}

+ 27 - 0
data_project_wy/main.go

@@ -0,0 +1,27 @@
+package main
+
+import (
+	"github.com/robfig/cron"
+)
+
+func init() {
+	ReadConfig(&Config) //初始化
+	InitMgo()           //mgo
+	InitCkh()           //clickhouse
+	InitOther()
+}
+
+func main() {
+	//go SaveTransactionData() //保存增量物业信息
+	c := cron.New()
+	//增量
+	c.AddFunc(Config.StartCron, IncTransactionDataFromBidAndPro) //增量bidding和项目数据
+	c.Start()
+	//历史
+	//HisTransactionDataFromBid() //历史招标(bidding)数据,截止时间1713196800
+	//HisTransactionDataFromProject() //历史项目数据(projectset_20230904)
+	//HisTransactionDataAddInformation() //历史信息补充法人库信息,项目信息补充业态property_form
+	//IncTransactionDataMgoToCkh()//数据迁移
+	ch := make(chan bool)
+	<-ch
+}

+ 605 - 0
data_project_wy/task.go

@@ -0,0 +1,605 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+	"go.mongodb.org/mongo-driver/bson"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Transaction struct {
+	Project_Id        string   `bson:"project_id"`
+	Project_Name      string   `bson:"project_name"`
+	Project_Budget    float64  `bson:"project_budget"`
+	Project_Bidamount float64  `bson:"project_bidamount"`
+	Project_Money     float64  `bson:"project_money"`
+	Business_Type     string   `bson:"business_type"`
+	Project_Bidstatus int      `bson:"project_bidstatus"`
+	Info_Id           string   `bson:"info_id"`
+	Information_Id    string   `bson:"information_id"`
+	Buyer             string   `bson:"buyer"`
+	Buyer_Id          string   `bson:"buyer_id"`
+	Winner            []string `bson:"winner"`
+	Winner_Id         []string `bson:"winner_id"`
+	Agency            string   `bson:"agency"`
+	Agency_Id         string   `bson:"agency_id"`
+	Property_Form     []string `bson:"property_form"`
+	SubClass          []string `bson:"subclass"`
+	MultiPackage      int      `bson:"multipackage"`
+	Area              string   `bson:"area"`
+	City              string   `bson:"city"`
+	District          string   `bson:"district"`
+	ZbTime            int64    `bson:"zbtime"`
+	JgTime            int64    `bson:"jgtime"`
+	StartTime         int64    `bson:"starttime"`
+	EndTime           int64    `bson:"endtime"`
+	Create_Time       int64    `bson:"create_time"`
+	Update_Time       int64    `bson:"update_time"`
+	//
+	From string `bson:"from"`
+}
+
+func IncTransactionDataFromBidAndPro() {
+	IncTransactionDataFromBid() //bidding
+	return
+	IncTransactionDataFromPro()  //project
+	IncTransactionDataMgoToCkh() //mongodb迁移至clickhouse
+}
+
+// IncTransactionDataFromBid 增量bidding
+func IncTransactionDataFromBid() {
+	endTime := GetTime(-1) //前一天凌晨
+	fmt.Println("开始执行增量采购意向信息", BidStartTime, endTime)
+	if BidStartTime >= endTime {
+		fmt.Println("增量bidding采购意向查询异常:", BidStartTime, endTime)
+		return
+	}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": BidStartTime,
+			"$lt":  endTime,
+		},
+	}
+	fmt.Println("增量bidding采购意向query:", query)
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		//
+		"publishtime":           1,
+		"toptype":               1,
+		"extracttype":           1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := []map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("bidding").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if gconv.String(tmp["toptype"]) != "采购意向" { //非采购意向数据过滤
+				return
+			}
+			if gconv.Int(tmp["extracttype"]) == -1 { //重复数据过滤
+				return
+			}
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForBid(tmp)
+			lock.Lock()
+			if len(result) > 0 {
+				arr = append(arr, result)
+			}
+			if len(arr) > 50 {
+				MgoPro.SaveBulk("projectset_wy", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.SaveBulk("projectset_wy", arr...)
+		arr = []map[string]interface{}{}
+	}
+	fmt.Println("执行增量采购意向信息完毕", BidStartTime, endTime)
+	BidStartTime = endTime //替换
+}
+
+// DealTransactionForBid bidding采购意向数据处理
+func DealTransactionForBid(tmp map[string]interface{}) map[string]interface{} {
+	//基本信息封装
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	buyer := gconv.String(tmp["buyer"])
+	winner := gconv.String(tmp["s_winner"])
+	agency := gconv.String(tmp["agency"])
+	property_form := []string{}
+	if tmp["property_form"] != nil {
+		property_form = gconv.Strings(tmp["property_form"])
+	}
+	bidamount := gconv.Float64(tmp["bidamount"])
+	budget := gconv.Float64(tmp["budget"])
+	money := bidamount
+	if money <= 0 {
+		money = budget
+	}
+
+	//物业分类
+	subclass := []string{}
+	if tag_subinformation := tmp["tag_subinformation"]; tag_subinformation != nil {
+		subclass = gconv.Strings(tag_subinformation)
+	} else if tag_subinformation_ai := tmp["tag_subinformation_ai"]; tag_subinformation_ai != nil {
+		subclass = gconv.Strings(tag_subinformation_ai)
+	}
+
+	//TODO 查询法人库信息(待补充)
+	winners := []string{}
+	if winner != "" {
+		winners = strings.Split(winner, ",")
+	}
+	buyer_id, agency_id := "", ""
+	winner_ids := []string{}
+	//buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
+	//物业信息
+	t := &Transaction{
+		Project_Id:        id,
+		Project_Name:      gconv.String(tmp["projectname"]),
+		Project_Budget:    budget,
+		Project_Bidamount: bidamount,
+		Project_Money:     money,
+		Business_Type:     "采购意向",
+		Project_Bidstatus: 3,
+		Info_Id:           id,
+		Information_Id:    "",
+		Buyer:             buyer,
+		Winner:            winners,
+		Agency:            agency,
+		Buyer_Id:          buyer_id,
+		Winner_Id:         winner_ids,
+		Agency_Id:         agency_id,
+		Property_Form:     property_form,
+		SubClass:          subclass,
+		MultiPackage:      gconv.Int(tmp["multipackage"]),
+		Area:              gconv.String(tmp["area"]),
+		City:              gconv.String(tmp["city"]),
+		District:          gconv.String(tmp["district"]),
+		ZbTime:            gconv.Int64(tmp["publishtime"]),
+		JgTime:            int64(0),
+		StartTime:         int64(0),
+		EndTime:           int64(0),
+		Create_Time:       time.Now().Unix(),
+		Update_Time:       time.Now().Unix(),
+		//
+		From: "bidding",
+	}
+	result := map[string]interface{}{}
+	infomation, _ := bson.Marshal(t)
+	bson.Unmarshal(infomation, &result)
+	return result
+}
+
+// IncTransactionDataFromProject 增量project
+func IncTransactionDataFromPro() {
+	endTime := GetTime(-1) //前一天凌晨
+	fmt.Println("开始执行增量项目信息", ProStartTime, endTime)
+	if ProStartTime >= endTime {
+		fmt.Println("增量项目信息查询异常:", ProStartTime, endTime)
+		return
+	}
+	query := map[string]interface{}{
+		"pici": map[string]interface{}{
+			"$gte": ProStartTime,
+			"$lt":  endTime,
+		},
+	}
+	fmt.Println("增量项目查询query:", query)
+	sess := MgoB.GetMgoConn()
+	defer MgoB.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	fields := map[string]interface{}{
+		"projectname":   1,
+		"budget":        1,
+		"bidamount":     1,
+		"buyer":         1,
+		"s_winner":      1,
+		"agency":        1,
+		"property_form": 1,
+		"multipackage":  1,
+		"area":          1,
+		"city":          1,
+		"district":      1,
+		"zbtime":        1,
+		"jgtime":        1,
+		"bidstatus":     1,
+		//
+		"firsttime":             1,
+		"pici":                  1,
+		"ids":                   1,
+		"sourceinfoid":          1,
+		"tag_subinformation":    1,
+		"tag_subinformation_ai": 1,
+		"tag_topinformation":    1,
+		"tag_topinformation_ai": 1,
+	}
+	arr := [][]map[string]interface{}{}
+	it := sess.DB(MgoB.DbName).C("projectset_20230904").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if tmp["tag_topinformation"] == nil && tmp["tag_topinformation_ai"] == nil { //无效数据过滤
+				return
+			}
+			result := DealTransactionForPro(tmp)
+			lock.Lock()
+			if len(result) > 0 {
+				update := []map[string]interface{}{
+					{"project_id": tmp["_id"]},
+					{"$set": result},
+				}
+				arr = append(arr, update)
+			}
+			if len(arr) > 50 {
+				MgoPro.UpSertBulk("projectset_wy", arr...)
+				arr = [][]map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		MgoPro.UpSertBulk("projectset_wy", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	fmt.Println("执行增量项目信息完毕", ProStartTime, endTime)
+	ProStartTime = endTime //替换
+}
+
+// DealTransactionForPro project数据处理
+func DealTransactionForPro(data map[string]interface{}) map[string]interface{} {
+	//基本信息封装
+	id := mongodb.BsonIdToSId(data["_id"])
+	buyer := gconv.String(data["buyer"])
+	winner := gconv.String(data["s_winner"])
+	agency := gconv.String(data["agency"])
+	zbtime := gconv.Int64(data["zbtime"])
+	property_form := []string{}
+	if data["property_form"] != nil {
+		property_form = gconv.Strings(data["property_form"])
+	}
+	bidamount := gconv.Float64(data["bidamount"])
+	budget := gconv.Float64(data["budget"])
+	money := bidamount
+	if money <= 0 {
+		money = budget
+	}
+
+	//物业分类
+	subclass := []string{}
+	if tag_subinformation := data["tag_subinformation"]; tag_subinformation != nil {
+		subclass = gconv.Strings(tag_subinformation)
+	} else if tag_subinformation_ai := data["tag_subinformation_ai"]; tag_subinformation_ai != nil {
+		subclass = gconv.Strings(tag_subinformation_ai)
+	}
+
+	//项目状态、商机类型
+	business_type := ""
+	project_bidstatus := 2
+	bidstatus := gconv.String(data["bidstatus"])
+	if bidstatus == "中标" || bidstatus == "成交" || bidstatus == "合同" {
+		project_bidstatus = 1
+		business_type = "存量项目"
+	} else if bidstatus == "废标" || bidstatus == "流标" {
+		project_bidstatus = 0
+	} else if bidstatus == "招标" {
+		business_type = "招标项目"
+		if zbtime == 0 {
+			zbtime = gconv.Int64(data["firsttime"])
+		}
+	}
+
+	//查询情报信息
+	//bidId = "65fbf3f566cf0db42a2a99d2"
+	ids := gconv.Strings(data["ids"])
+	info := FindInfomationData(ids...) //情报信息查询
+
+	//TODO 查询法人库信息(待补充)
+	winners := []string{}
+	if winner != "" {
+		winners = strings.Split(winner, ",")
+	}
+	buyer_id, agency_id := "", ""
+	winner_ids := []string{}
+	//buyer_id, agency_id, winner_ids = FindEntInfoData(id, buyer, agency, winners)
+	//物业信息
+	t := &Transaction{
+		Project_Id:        id,
+		Project_Name:      gconv.String(data["projectname"]),
+		Project_Budget:    budget,
+		Project_Bidamount: bidamount,
+		Project_Money:     money,
+		Business_Type:     business_type,
+		Project_Bidstatus: project_bidstatus,
+		Info_Id:           gconv.String(data["sourceinfoid"]),
+		Information_Id:    info.Id,
+		Buyer:             buyer,
+		Winner:            winners,
+		Agency:            agency,
+		Buyer_Id:          buyer_id,
+		Winner_Id:         winner_ids,
+		Agency_Id:         agency_id,
+		Property_Form:     property_form,
+		SubClass:          subclass,
+		MultiPackage:      gconv.Int(data["multipackage"]),
+		Area:              gconv.String(data["area"]),
+		City:              gconv.String(data["city"]),
+		District:          gconv.String(data["district"]),
+		ZbTime:            zbtime,
+		JgTime:            gconv.Int64(data["jgtime"]),
+		StartTime:         info.Starttime,
+		EndTime:           info.Endtime,
+		Create_Time:       time.Now().Unix(),
+		Update_Time:       time.Now().Unix(),
+		//
+		From: "project",
+	}
+	result := map[string]interface{}{}
+	infomation, _ := bson.Marshal(t)
+	bson.Unmarshal(infomation, &result)
+	return result
+}
+
+// IncTransactionDataMgoToCkh 数据迁移
+func IncTransactionDataMgoToCkh() {
+	/*
+		数据根据update_time查询
+		1、采购意向数据(from=bidding)只插入
+		2、项目信息先查,有则更新,无则插入
+	*/
+	fmt.Println("开始执行迁移...")
+	sess := MgoPro.GetMgoConn()
+	defer MgoPro.DestoryMongoConn(sess)
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+	query := map[string]interface{}{
+		"update_time": map[string]interface{}{
+			"$gte": GetTime(0),
+		},
+	}
+	it := sess.DB(MgoPro.DbName).C("projectset_wy").Find(&query).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			var err error
+			from := gconv.String(tmp["from"])
+			delete(tmp, "from")    //无用字段删除
+			delete(tmp, "_id")     //无用字段删除
+			if from == "bidding" { //采购意向,插入
+				err = SaveDataToClickHouse(tmp)
+			} else { //项目信息,更新,插入
+				project_id := gconv.String(tmp["project_id"])
+				err = UpdateOrSaveDataToClickHouse(project_id, tmp)
+			}
+			if err != nil {
+				fmt.Println("数据迁移失败,数据类型", from, "  项目project_id", tmp["project_id"], err)
+			}
+		}(tmp)
+		if n%100 == 0 {
+			fmt.Println("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	fmt.Println("迁移结束...")
+}
+
+type Infomation struct {
+	Id        string
+	Starttime int64
+	Endtime   int64
+}
+
+// 情报信息查询
+func FindInfomationData(ids ...string) (info Infomation) {
+	for _, id := range ids {
+		query := fmt.Sprintf(`SELECT id,starttime,endtime FROM %s WHERE datajson_id = ?`, Config.ClickHouse.DataBase+".information")
+		rows, err := CkhTool.Query(context.Background(), query, id)
+		if err != nil {
+			continue
+		}
+		for rows.Next() {
+			info = Infomation{}
+			if err := rows.Scan(&info.Id, &info.Starttime, &info.Endtime); err != nil {
+				fmt.Println("查询情报信息异常:", id, err)
+			}
+			if info.Id != "" {
+				return
+			}
+			//break //目前只有一条结果
+		}
+	}
+	return
+}
+
+// 法人信息查询
+func FindEntInfoData(bid, buyer, agency string, winners []string) (buyer_id, agency_id string, winner_ids []string) {
+	winner_ids = []string{}
+	winnerMap := map[string]bool{} //记录所有中标单位
+	values := []interface{}{}
+	placeholders := []string{}
+	if buyer != "" {
+		placeholders = append(placeholders, "?")
+		values = append(values, buyer)
+	}
+	if len(winners) > 0 {
+		for _, w := range winners {
+			winnerMap[w] = true
+			placeholders = append(placeholders, "?")
+			values = append(values, w)
+		}
+	}
+	if agency != "" {
+		placeholders = append(placeholders, "?")
+		values = append(values, agency)
+	}
+	if len(values) == 0 {
+		return
+	}
+	query := fmt.Sprintf(`SELECT id,company_name FROM %s WHERE company_name IN (%s)`, Config.ClickHouse.DataBase+".ent_info", strings.Join(placeholders, ","))
+	rows, err := CkhTool.Query(context.Background(), query, values...)
+	if err != nil {
+		return
+	}
+	for rows.Next() {
+		var id, company_name string
+		if err := rows.Scan(&id, &company_name); err == nil {
+			if company_name == buyer {
+				buyer_id = id
+			} else if company_name == agency {
+				agency_id = id
+			} else if winnerMap[company_name] {
+				winner_ids = append(winner_ids, id)
+			}
+		} else {
+			fmt.Println("查询情报信息异常:", err, bid)
+		}
+	}
+	return
+}
+
+// UpdateOrSaveDataToClickHouse 判断clickhouse更新or保存
+func UpdateOrSaveDataToClickHouse(project_id string, data map[string]interface{}) (err error) {
+	count := FindClickHouseByProjectId(project_id) //查询
+	if count > 0 {                                 //更新
+		delete(data, "create_time") //不更新创建时间
+		delete(data, "project_id")  //不更新项目id(主键)
+		err = UpdateDataToClickHouse(data, map[string]interface{}{"project_id": project_id})
+	} else { //插入
+		err = SaveDataToClickHouse(data)
+	}
+	return
+}
+
+// SaveDataToClickHouse 数据保存clickhouse
+func SaveDataToClickHouse(data map[string]interface{}) error {
+	fields, placeholders := []string{}, []string{}
+	values := []interface{}{}
+	for k, v := range data {
+		fields = append(fields, k)
+		values = append(values, v)
+		placeholders = append(placeholders, "?")
+	}
+	query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", Config.ClickHouse.DataBase+".transaction_info", strings.Join(fields, ","), strings.Join(placeholders, ","))
+	return CkhTool.Exec(context.Background(), query, values...)
+}
+
+// FindClickHouseByProjectId 根据条件count clickhouse
+func FindClickHouseByProjectId(project_id string) int {
+	query := fmt.Sprintf(`SELECT COUNT(1) FROM %s WHERE project_id = ?`, Config.ClickHouse.DataBase+".transaction_info")
+	row := CkhTool.QueryRow(context.Background(), query, project_id)
+	var count uint64
+	row.Scan(&count)
+	return gconv.Int(count)
+}
+
+// UpdateDataToClickHouse 数据更新clickhouse
+func UpdateDataToClickHouse(data, querys map[string]interface{}) error {
+	sets := []string{}
+	values := []interface{}{}
+	for k, v := range data {
+		sets = append(sets, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	qs := []string{}
+	for k, v := range querys {
+		qs = append(qs, fmt.Sprintf("%s=?", k))
+		values = append(values, v)
+	}
+	query := fmt.Sprintf("ALTER TABLE %s UPDATE %s WHERE %s", Config.ClickHouse.DataBase+".transaction_info", strings.Join(sets, ","), strings.Join(qs, ","))
+	//query := `ALTER TABLE information.transaction_info UPDATE update_time = ? WHERE project_id = '5c9ee78ca5cb26b9b7fd0b57'`
+	return CkhTool.Exec(context.Background(), query, values...)
+}
+
+/*// SaveTransactionData 保存增量物业信息
+func SaveTransactionData() {
+	fmt.Println("save projectset_wy...")
+	savearr := make([]map[string]interface{}, 100)
+	indexdb := 0
+	for {
+		select {
+		case v := <-TransactionSaveCache:
+			savearr[indexdb] = v
+			indexdb++
+			if indexdb == 100 {
+				Transaction_Ch <- true
+				go func(tmp []map[string]interface{}) {
+					defer func() {
+						<-Transaction_Ch
+					}()
+					MgoPro.SaveBulk("projectset_wy", tmp...)
+				}(savearr)
+				savearr = make([]map[string]interface{}, 100)
+				indexdb = 0
+			}
+		case <-time.After(30 * time.Second):
+			if indexdb > 0 {
+				Transaction_Ch <- true
+				go func(tmp []map[string]interface{}) {
+					defer func() {
+						<-Transaction_Ch
+					}()
+					MgoPro.SaveBulk("projectset_wy", tmp...)
+				}(savearr[:indexdb])
+				savearr = make([]map[string]interface{}, 100)
+				indexdb = 0
+			}
+		}
+	}
+}*/

+ 64 - 0
data_project_wy/util.go

@@ -0,0 +1,64 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"time"
+)
+
+const (
+	Date_Full_Layout    = "2006-01-02 15:04:05"
+	Date_Short_Layout   = "2006-01-02"
+	Date_Small_Layout   = "01-02"
+	Date_Time_Layout    = "15:04"
+	Date_yyyyMMdd       = "20060102"
+	Date_yyyyMMdd_Point = "2006.01.02"
+)
+
+func ReadConfig(config ...interface{}) {
+	var r *os.File
+	if len(config) > 1 {
+		filepath, _ := config[0].(string)
+		r, _ = os.Open(filepath)
+		defer r.Close()
+		bs, _ := ioutil.ReadAll(r)
+		json.Unmarshal(bs, config[1])
+	} else {
+		r, _ = os.Open("./config.json")
+		defer r.Close()
+		bs, _ := ioutil.ReadAll(r)
+		json.Unmarshal(bs, config[0])
+	}
+}
+
+func GetTime(day int) int64 {
+	nowTime := time.Now().AddDate(0, 0, day)
+	timeStr := FormatDate(&nowTime, Date_Short_Layout)
+	t, _ := time.ParseInLocation(Date_Short_Layout, timeStr, time.Local)
+	return t.Unix()
+}
+func FormatDateByInt64(src *int64, layout string) string {
+	var tmp int64
+	if *src > 0 {
+		if len(fmt.Sprint(*src)) >= 12 {
+			tmp = (*src) / 1000
+		} else {
+			tmp = (*src)
+		}
+	} else {
+		if len(fmt.Sprint(*src)) >= 13 {
+			tmp = (*src) / 1000
+		} else {
+			tmp = (*src)
+		}
+	}
+	date := time.Unix(tmp, 0)
+	return FormatDate(&date, layout)
+}
+
+// 日期格式化
+func FormatDate(src *time.Time, layout string) string {
+	return (*src).Local().Format(layout)
+}