123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package main
- import (
- "mongodb"
- "qfw/util"
- "strings"
- "sync"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- StatusMap map[string]string
- WordsArr []string
- updatePool chan []map[string]interface{}
- updateSp chan bool
- )
- func init() {
- MongoTool = &mongodb.MongodbSim{
- MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
- Size: 10,
- DbName: "mixdata",
- UserName: "SJZY_RWESBid_Other",
- Password: "SJZY@O17t8herB3B",
- }
- MongoTool.InitPool()
- updatePool = make(chan []map[string]interface{}, 2000)
- updateSp = make(chan bool, 4)
- StatusMap = map[string]string{
- "正常": "存续",
- "其他": "存续",
- "未注销": "吊销",
- "个体转企业": "存续",
- }
- WordsArr = []string{"研发", "研制", "开发", "生产", "制造", "制作", "加工", "种植"}
- }
- func main() {
- go updateMethod()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- field := map[string]interface{}{"company_status": 1, "company_type": 1, "company_phone": 1, "company_email": 1, "business_scope": 1, "employees": 1,
- "annual_reports": 1, "company_name": 1, "company_type_old": 1}
- query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Limit(3000).Sort("-updatetime").Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
- if count%5000 == 0 {
- util.Debug("current ---", count)
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- updata := make(map[string]interface{})
- // company_status
- text := util.ObjToString(tmp["company_status"])
- if text != "" {
- updata["company_status"] = StatusMap[text]
- }
- // employee_name
- var ename []string
- if emp, ok := tmp["employees"].([]interface{}); ok {
- for _, v := range emp {
- v1 := v.(map[string]interface{})
- if util.ObjToString(v1["employee_name"]) != "" {
- ename = append(ename, util.ObjToString(v1["employee_name"]))
- }
- }
- }
- if len(ename) > 0 {
- updata["employee_name"] = strings.Join(ename, ",")
- }
- // bid_unittype 厂商
- flag := false
- for _, v := range WordsArr {
- if strings.Contains(util.ObjToString(tmp["business_scope"]), v) {
- flag = true
- break
- }
- }
- if flag {
- updata["bid_unittype"] = []string{"厂商"}
- }
- // bid_contracttype
- var types []string
- if phone := util.ObjToString(tmp["company_phone"]); phone != "" {
- if len(phone) == 11 {
- types = append(types, "手机号")
- } else {
- types = append(types, "固定电话")
- }
- }
- if util.ObjToString(tmp["company_email"]) != "" {
- types = append(types, "邮箱")
- }
- if len(types) == 0 {
- types = append(types, "不存在")
- }
- updata["bid_contracttype"] = types
- // website_url
- if annualReports, ok := tmp["annual_reports"].([]interface{}); ok && len(annualReports) > 0 {
- L:
- for _, v := range annualReports {
- v1 := v.(map[string]interface{})
- if v1["report_websites"] != nil {
- if reportWebsites, o := tmp["report_websites"].([]interface{}); o && len(reportWebsites) > 0 {
- for _, m := range reportWebsites {
- m1 := m.(map[string]interface{})
- if util.ObjToString(m1["website_url"]) != "" {
- updata["website_url"] = util.ObjToString(m1["website_url"])
- break L
- }
- }
- }
- }
- }
- }
- // search_type
- if t := util.ObjToString(tmp["company_type"]); t != "" {
- if t != "个体工商户" && t != "其他" {
- t1 := util.ObjToString(tmp["company_type_old"])
- name := util.ObjToString(tmp["company_name"])
- if strings.Contains(t1, "有限合伙") {
- updata["search_type"] = "有限合伙"
- } else if strings.Contains(t1, "合伙") {
- updata["search_type"] = "普通合伙"
- } else if strings.Contains(name, "股份") ||
- (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
- updata["search_type"] = "股份有限公司"
- } else {
- updata["search_type"] = "有限责任公司"
- }
- }
- }
- save := []map[string]interface{}{{
- "_id": tmp["_id"],
- },
- {"$set": updata},
- }
- updatePool <- save
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- util.Debug("over ---", count)
- c := make(chan bool, 1)
- <-c
- }
- func updateMethod() {
- arru := make([][]map[string]interface{}, 1000)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 1000 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("qyxy_std", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 1000)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MongoTool.UpSertBulk("qyxy_std", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 1000)
- indexu = 0
- }
- }
- }
- }
|