123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- package main
- import (
- "fmt"
- "mongodb"
- "qfw/util"
- es "qfw/util/elastic"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- Es *es.Elastic
- EsSaveCache chan map[string]interface{}
- SP chan bool
- EsField = []string{"_id", "company_name", "history_name", "updatetime", "legal_person", "legal_person_certno", "credit_no", "org_code", "tax_code",
- "company_code", "area_code", "company_area", "company_city", "company_district", "currency", "capital", "company_type", "company_status",
- "establish_date", "issue_date", "lastupdatetime", "operation_startdate", "operation_enddate", "cancel_date", "revoke_date", "authority",
- "company_address", "business_scope", "partners", "employees", "stock_name", "company_phone", "company_email", "website_url", "search_type",
- "employee_name", "company_type_old"}
- TypeMap = map[string]string{
- "采购单位": "1",
- "投标企业": "2",
- "代理机构": "3",
- "厂商": "4",
- }
- TypeMap1 = map[string]string{
- "固定电话": "1",
- "手机号": "2",
- "邮箱": "3",
- "不存在": "4",
- }
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- //MongodbAddr: "192.168.3.207:27092",
- //Size: 10,
- //DbName: "wjh",
- }
- MongoTool.InitPool()
- Es = &es.Elastic{
- S_esurl: "http://172.17.145.170:9800", // http://172.17.145.170:9800
- I_size: 10,
- }
- Es.InitElasticSize()
- EsSaveCache = make(chan map[string]interface{}, 1000)
- SP = make(chan bool, 5)
- }
- func main() {
- go SaveEs()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 1)
- wg := &sync.WaitGroup{}
- query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(nil).Skip(6110000).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%5000 == 0 {
- util.Debug("current ---", count)
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if p1, ok := tmp["partners"].([]interface{}); ok {
- p2 := p1[0].(map[string]interface{})
- if p2["stock_capital"] != "" {
- taskinfo(tmp)
- }
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- util.Debug("over ---", count)
- c := make(chan bool, 1)
- <-c
- }
- func taskinfo(tmp map[string]interface{}) {
- esMap := make(map[string]interface{})
- for _, v := range EsField {
- if tmp[v] != nil {
- esMap[v] = tmp[v]
- }
- }
- esMap["name"] = tmp["company_name"]
- company_type := util.ObjToString(tmp["company_type"])
- company_name := util.ObjToString(tmp["company_name"])
- if company_type == "个体工商户" {
- if len([]rune(company_name)) >= 5 {
- esMap["company_type_int"] = 31
- } else {
- esMap["company_type_int"] = 32
- }
- } else if company_type == "其他" || company_type == "" {
- if len([]rune(company_name)) >= 4 {
- esMap["company_type_int"] = 21
- } else {
- esMap["company_type_int"] = 22
- }
- } else {
- if company_type == "内资分公司" {
- esMap["company_type_int"] = 12
- } else if len([]rune(company_name)) >= 4 {
- esMap["company_type_int"] = 11
- } else {
- esMap["company_type_int"] = 13
- }
- }
- if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
- p1 := util.ObjArrToStringArr(pname)
- esMap["bid_projectname"] = strings.Join(p1, ",")
- }
- if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
- p1 := util.ObjArrToStringArr(pur)
- esMap["bid_purchasing"] = strings.Join(p1, ",")
- }
- if areas, ok := tmp["bid_area"].([]interface{}); ok {
- p1 := util.ObjArrToStringArr(areas)
- esMap["bid_area"] = strings.Join(p1, ",")
- }
- if t1, ok := tmp["bid_unittype"].([]interface{}); ok {
- var arr []string
- for _, v := range util.ObjArrToStringArr(t1) {
- arr = append(arr, TypeMap[v])
- }
- esMap["bid_unittype"] = strings.Join(arr, ",")
- }
- if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
- var arr []string
- for _, v := range util.ObjArrToStringArr(t2) {
- arr = append(arr, TypeMap1[v])
- }
- esMap["bid_contracttype"] = strings.Join(arr, ",")
- }
- if p1, ok := tmp["partners"].([]interface{}); ok {
- for _, v := range p1 {
- v1 := v.(map[string]interface{})
- if text := util.ObjToString(v1["stock_capital"]); text != "" {
- if c1, err := strconv.ParseFloat(text, 64); err == nil {
- c1, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c1), 64) //保留小数点两位
- v1["stock_capital"] = c1
- } else {
- delete(v1, "stock_capital")
- }
- }
- if text := util.ObjToString(v1["stock_realcapital"]); text != "" {
- if c2, err := strconv.ParseFloat(text, 64); err == nil {
- c2, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c2), 64) //保留小数点两位
- v1["stock_realcapital"] = c2
- } else {
- delete(v1, "stock_realcapital")
- }
- }
- }
- }
- EsSaveCache <- esMap
- }
- func SaveEs() {
- util.Debug("Es Save...")
- arru := make([]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-EsSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave("qyxy_v2", "qyxy", &arru, false)
- }(arru)
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- Es.BulkSave("qyxy_v2", "qyxy", &arru, false)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
|