package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "app.yhyue.com/data_processing/common_utils/redis" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "math/rand" "proposed_project/config" "strconv" "strings" "sync" "time" "unicode/utf8" ) var ( saveBasePool = make(chan map[string]interface{}, 5000) saveBaseSp = make(chan bool, 1) saveRcPool = make(chan map[string]interface{}, 5000) saveRcSp = make(chan bool, 1) saveCtPool = make(chan map[string]interface{}, 5000) saveCtSp = make(chan bool, 1) saveCyPool = make(chan map[string]interface{}, 5000) saveCySp = make(chan bool, 1) saveEntPool = make(chan map[string]interface{}, 5000) saveEntSp = make(chan bool, 1) BaseField = []string{"lasttime", "firsttime", "proposed_number", "proposed_id", "follow_num", "title", "projectname", "approvecode", "approvedept", "approvenumber", "project_stage_code", "total_investment", "funds", "owner", "name_id", "ownerclass_code", "projecttype_code", "projectaddr", "projectperiod", "project_startdate", "project_completedate", "industry_code", "approvestatus", "project_scale", "category_code", "nature_code", "construction_area", "floor_area", "area_code", "city_code", "createtime"} RecordField = []string{"proposed_id", "infoid", "follow_num", "project_stage_code", "title", "project_scale", "publishtime", "jybxhref", "createtime"} ContactField = []string{"proposed_id", "infoid", "follow_num", "name_id", "name", "contact_name", "contact_tel", "contact_addr", "createtime"} CategoryField = []string{"proposed_id", "labelcode", "labelvalues", "labelweight", "createtime"} EntField = []string{"proposed_id", "name_id", "name", "area_code", "city_code", "address", "createtime", "identity_type"} AreaCode = make(map[string]string, 5000) TagCode = make(map[string]interface{}, 100) ) func InitArea() { info := MysqlTool.Find("d_area_code_back", nil, "", "", -1, -1) for _, m := range *info { var key string for i, v := range []string{"area", "city", "district"} { if i == 0 && util.ObjToString(m[v]) != "" { key = util.ObjToString(m[v]) } else if util.ObjToString(m[v]) != "" { key += "," + util.ObjToString(m[v]) } } AreaCode[key] = util.ObjToString(m["code"]) } log.Info("InitField", zap.Int("AreaCode", len(AreaCode))) } func InitTagCode() { info, _ := MgoBid.Find("nzj_rule", nil, nil, bson.M{"label_name": 1, "label": 1, "code": 1}, false, -1, -1) for _, m := range *info { lname := util.ObjToString(m["label_name"]) lb := util.ObjToString(m["label"]) code := util.ObjToString(m["code"]) if lname == "sub_category" || lname == "top_category" { lname = "category" } if TagCode[lname] != nil { m1 := TagCode[lname].(map[string]interface{}) m1[code] = lb TagCode[lname] = m1 } else { m1 := make(map[string]interface{}) m1[code] = lb TagCode[lname] = m1 } } TagCode["nature"].(map[string]interface{})["00"] = "其它" TagCode["project_stage"].(map[string]interface{})["00"] = "其它" TagCode["category"].(map[string]interface{})["04"] = "其它工程" log.Info("InitTagCode", zap.Any("TagCode", TagCode)) } func taskTidb(q map[string]interface{}) { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} var query *mongodb.MgoIter if q != nil && len(q) > 0 { query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(SelectF).Iter() } else { query = sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(SelectF).Iter() } count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if t := util.Int64All(tmp["pici"]); t > pici { pici = t } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() saveM := make(map[string]interface{}) for _, f := range BaseField { if f == "lasttime" || f == "firsttime" { if t := util.Int64All(tmp[f]); t > 0 { saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "proposed_id" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "owner" { if v := util.ObjToString(tmp[f]); v != "" { if utf8.RuneCountInString(v) < 100 { saveM[f] = v } } } else if f == "name_id" { if b := util.ObjToString(tmp["owner"]); b != "" { if eid := redis.GetStr("ent_id", b); eid != "" { saveM["name_id"] = strings.Split(eid, "_")[0] } } } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "total_investment" { text := util.ObjToString(tmp[f]) capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { capital, _ = util.FormatFloat(capital, 6) saveM[f] = capital } } else if f == "approvestatus" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 { saveM[f] = tmp[f] } } else if f == "proposed_number" { if tmp[f] == nil { now := time.Now().Unix() st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd) parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制 rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数 saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd) } else { saveM[f] = tmp[f] } } else if f == "approvecode" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 { saveM[f] = tmp[f] } } else if f == "floor_area" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 { saveM[f] = tmp[f] } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } saveBasePool <- saveM saveCy := make(map[string]interface{}) saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveCy["labelcode"] = "category_code" saveCy["labelvalues"] = util.ObjToString(tmp["category_code"]) saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout) saveCyPool <- saveCy if ow := util.ObjToString(tmp["owner"]); ow != "" { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = ow if eid := redis.GetStr("ent_id", ow); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 1 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] saveEntPool <- saveEnt } for _, v := range tmp["list"].([]interface{}) { saveRc := make(map[string]interface{}) v1 := v.(map[string]interface{}) saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) infoid := util.ObjToString(v1["infoid"]) saveRc["infoid"] = infoid saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid)) saveRc["follow_num"] = v1["follow_num"] saveRc["project_scale"] = util.ObjToString(v1["project_scale"]) saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"]) saveRc["title"] = util.ObjToString(v1["title"]) if t := util.Int64All(v1["publishtime"]); t > 0 { saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout) saveRcPool <- saveRc if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" { saveCt := make(map[string]interface{}) saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveCt["infoid"] = infoid saveCt["follow_num"] = tmp["follow_num"] if b := util.ObjToString(tmp["owner"]); b != "" { saveCt["name"] = util.ObjToString(tmp["owner"]) if eid := redis.GetStr("ent_id", b); eid != "" { saveCt["name_id"] = strings.Split(eid, "_")[0] } } if p := util.ObjToString(v1["project_person"]); p != "" { saveCt["contact_name"] = p } if p := util.ObjToString(v1["project_phone"]); p != "" { saveCt["contact_tel"] = p } if p := util.ObjToString(v1["projectaddr"]); p != "" { saveCt["contact_addr"] = p } saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveCtPool <- saveCt } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici)) } func taskTidb_add(q map[string]interface{}) { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} log.Info("taskTidb_add", zap.Any("q: ", q)) query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%200 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if t := util.Int64All(tmp["pici"]); t > pici { pici = t } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { go func() { <-ch wg.Done() }() taskB(tmp) taskE(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() TaskSingle = true log.Info(fmt.Sprintf("over --- %d", count), zap.Int64("pici", pici)) } func taskB(tmp map[string]interface{}) { saveM := make(map[string]interface{}) for _, f := range BaseField { if f == "lasttime" || f == "firsttime" { if t := util.Int64All(tmp[f]); t > 0 { saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "proposed_id" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "owner" { if v := util.ObjToString(tmp[f]); v != "" { if utf8.RuneCountInString(v) < 100 { saveM[f] = v } } } else if f == "name_id" { if b := util.ObjToString(tmp["owner"]); b != "" { if eid := redis.GetStr("ent_id", b); eid != "" { saveM["name_id"] = strings.Split(eid, "_")[0] } } } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" { if tmp[f] != nil && util.IntAll(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "total_investment" { text := util.ObjToString(tmp[f]) capital := ObjToMoney(text) capital = capital / 10000 if capital != 0 { capital, _ = util.FormatFloat(capital, 6) saveM[f] = capital } } else if f == "approvestatus" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 8 { saveM[f] = tmp[f] } } else if f == "proposed_number" { if tmp[f] == nil { now := time.Now().Unix() st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd) parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制 rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数 saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd) } else { saveM[f] = tmp[f] } } else if f == "approvecode" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 200 { saveM[f] = tmp[f] } } else if f == "floor_area" { if util.ObjToString(tmp[f]) != "" && utf8.RuneCountInString(util.ObjToString(tmp[f])) < 255 { saveM[f] = tmp[f] } } else { if tmp[f] != nil { saveM[f] = tmp[f] } } } info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "") if info != nil && len(*info) > 0 { saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Update("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, saveM) } else { MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM) } info1 := MysqlTool.FindOne("dwd_f_nzj_category_tags", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "") if info1 != nil && len(*info1) > 0 { } else { saveCy := make(map[string]interface{}) saveCy["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveCy["labelcode"] = "category_code" saveCy["labelvalues"] = util.ObjToString(tmp["category_code"]) saveCy["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_category_tags", saveCy) } info2 := MysqlTool.FindOne("dwd_f_nzj_ent", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"])}, "", "") if info2 != nil && len(*info2) > 0 { } else { if ow := util.ObjToString(tmp["owner"]); ow != "" { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = ow if eid := redis.GetStr("ent_id", ow); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 1 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] MysqlTool.Insert("dwd_f_nzj_ent", saveEnt) } } } func taskE(tmp map[string]interface{}) { for _, v := range tmp["list"].([]interface{}) { v1 := v.(map[string]interface{}) infoid := util.ObjToString(v1["infoid"]) info := MysqlTool.FindOne("dwd_f_nzj_follw_record", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "") if info == nil || len(*info) == 0 { saveRc := make(map[string]interface{}) saveRc["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveRc["infoid"] = infoid saveRc["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid)) saveRc["follow_num"] = v1["follow_num"] saveRc["project_scale"] = util.ObjToString(v1["project_scale"]) saveRc["project_stage_code"] = util.ObjToString(v1["project_stage_code"]) saveRc["title"] = util.ObjToString(v1["title"]) if t := util.Int64All(v1["publishtime"]); t > 0 { saveRc["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } saveRc["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_follw_record", saveRc) } info1 := MysqlTool.FindOne("dwd_f_nzj_contact", map[string]interface{}{"proposed_id": mongodb.BsonIdToSId(tmp["_id"]), "infoid": infoid}, "", "") if info1 == nil || len(*info1) == 0 { if util.ObjToString(v1["project_person"]) != "" || util.ObjToString(v1["project_phone"]) != "" { saveCt := make(map[string]interface{}) saveCt["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) saveCt["infoid"] = infoid saveCt["follow_num"] = tmp["follow_num"] if b := util.ObjToString(tmp["owner"]); b != "" { saveCt["name"] = util.ObjToString(tmp["owner"]) if eid := redis.GetStr("ent_id", b); eid != "" { saveCt["name_id"] = strings.Split(eid, "_")[0] } } if p := util.ObjToString(v1["project_person"]); p != "" { saveCt["contact_name"] = p } if p := util.ObjToString(v1["project_phone"]); p != "" { saveCt["contact_tel"] = p } if p := util.ObjToString(v1["projectaddr"]); p != "" { saveCt["contact_addr"] = p } saveCt["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_contact", saveCt) } } } } func SaveFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveBasePool: arru[indexu] = v indexu++ if indexu == saveSize { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveBaseSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveBaseSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveRFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveRcPool: arru[indexu] = v indexu++ if indexu == saveSize { saveRcSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRcSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveRcSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveRcSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveCFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveCtPool: arru[indexu] = v indexu++ if indexu == saveSize { saveCtSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveCtSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveCtSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveCtSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveCyFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveCyPool: arru[indexu] = v indexu++ if indexu == saveSize { saveCySp <- true go func(arru []map[string]interface{}) { defer func() { <-saveCySp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveCySp <- true go func(arru []map[string]interface{}) { defer func() { <-saveCySp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } func SaveEntFunc(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveEntPool: arru[indexu] = v indexu++ if indexu == saveSize { saveEntSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEntSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEntSp <- true go func(arru []map[string]interface{}) { defer func() { <-saveEntSp }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }