123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/redis"
- "encoding/json"
- "fmt"
- "github.com/go-ego/gse"
- "go.mongodb.org/mongo-driver/bson"
- "go.uber.org/zap"
- "io/ioutil"
- "net/http"
- "net/url"
- "os"
- "proposed_project/config"
- "regexp"
- "strings"
- "sync"
- "time"
- "unicode/utf8"
- )
- var (
- seg gse.Segmenter
- stopWords []string
- regNum = regexp.MustCompile(`[\dA-Za-z]{6,30}`)
- regSymb = regexp.MustCompile("[•、,,.。??'\"“”‘’·~!@#¥$%…&*()()\\-—+=【】\\[\\]{}{}<>《》|\\/\\s]+")
- regDel = regexp.MustCompile("项目|工程|生产|中心")
- reg1 = regexp.MustCompile("分布式光伏发电|自然人|出让|国有建设用地使用权")
- field = []string{"projectname.pname"}
- sField = []string{"projectname", "bidstatus", "firsttime", "_id", "area", "city", "district"}
- savePpPool = make(chan map[string]interface{}, 5000)
- savePpSp = make(chan bool, 3)
- )
- func initSeg() {
- _ = seg.LoadDict("./t_1.txt")
- //_ = seg.LoadDict()
- seg.AddToken("渼陂", 3, "")
- seg.LoadStop("./stopwords.txt")
- //f, _ := os.Open("./stopwords.txt")
- //defer f.Close()
- //scanner := bufio.NewScanner(f)
- //for scanner.Scan() {
- // stopWords = append(stopWords, scanner.Text())
- //}
- //sort.Strings(stopWords)
- }
- func taskC() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- f := map[string]interface{}{
- "projectname": 1,
- "approvecode": 1,
- "approvenumber": 1,
- "approvestatus": 1,
- "area": 1,
- "city": 1,
- "district": 1,
- }
- query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%2000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if filterMethod(tmp) {
- return
- }
- var mArr []map[string]interface{}
- var eArr []map[string]interface{}
- n1, n2 := 0, 0
- // approvecode、approvenumber
- //q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"]))
- //if q != "" {
- // binfo := Es.Get("projectset", q)
- // if binfo != nil && len(*binfo) > 0 {
- // for _, m := range *binfo {
- // n1 = len(*binfo)
- // mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1})
- // }
- // }
- //}
- wds, q := Method1(util.ObjToString(tmp["projectname"]))
- if q != "" {
- binfo := Es.Get("projectset_v1", q)
- if binfo != nil && len(*binfo) > 0 {
- n2 = len(*binfo)
- for _, m := range *binfo {
- if b, _ := redis.Exists(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"])); b {
- continue
- }
- if util.ObjToString(m["bidstatus"]) == "拟建" {
- eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
- continue
- }
- if judgeArea(tmp, m, wds) {
- eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]})
- continue
- }
- redis.PutCKV(config.Conf.DB.Redis.Pcode, util.ObjToString(m["_id"]), mongodb.BsonIdToSId(tmp["_id"]))
- mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2})
- }
- }
- }
- //if mArr != nil && len(mArr) > 0 {
- save := make(map[string]interface{})
- save["_id"] = tmp["_id"]
- save["ids"] = mArr
- save["wds"] = wds
- save["err"] = eArr
- save["projectname"] = tmp["projectname"]
- save["esearch"] = q
- save["size_1"] = n1
- save["size_2"] = n2
- save["createtime"] = time.Now().Unix()
- savePpPool <- save
- //}
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- func Method(pname string) string {
- pname = regNum.ReplaceAllString(pname, "")
- pname = regSymb.ReplaceAllString(pname, "")
- //wds := seg.CutStop(pname, true)
- util.Debug(pname)
- surl := config.Conf.DB.Es.Addr + "/projectset_v2/_analyze"
- URL, _ := url.Parse(surl)
- Q := URL.Query()
- Q.Add("pretty", "1")
- Q.Add("analyzer", "my_ngram_title")
- Q.Add("text", pname)
- URL.RawQuery = Q.Encode()
- resp, err := http.Get(URL.String())
- if err != nil {
- log.Info("")
- }
- result, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- log.Info("")
- }
- var resMap map[string]interface{}
- json.Unmarshal(result, &resMap)
- if resMap == nil || len(resMap["tokens"].([]interface{})) == 0 {
- log.Info("")
- }
- tokens := util.ObjArrToMapArr(resMap["tokens"].([]interface{}))
- var should []interface{}
- for _, t := range tokens {
- wd := util.ObjToString(t["token"])
- if in(wd) {
- // 删除词 跳过
- continue
- }
- mm := &ShouldObject{MultiMatch: &MultiMatch{
- Query: wd,
- Type: "phrase",
- Fields: field,
- }}
- should = append(should, mm)
- }
- query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Must: should}}}
- jsr, err := json.Marshal(query)
- if err != nil {
- fmt.Printf("Err %v", err)
- os.Exit(1)
- }
- return string(jsr)
- }
- //@Description jieba分词查询
- // @Desc projectname
- // @Desc minimum_should_match 当should分支总数小于等于指定的数量时,则必须匹配所有should分支,当should分支总数大于指定的数量时,则应用指定的说明符
- // @Author J 2023/3/7 17:13
- func Method1(pname string) ([]string, string) {
- if pname == "" {
- return nil, ""
- }
- var wds []string
- p1 := regSymb.ReplaceAllString(pname, "")
- p1 = regNum.ReplaceAllString(p1, "")
- if utf8.RuneCountInString(p1) < 12 {
- wds = append(wds, pname)
- } else {
- wds = seg.CutStop(pname, true)
- }
- if len(wds) > 0 {
- wds = combArr(wds)
- var should []interface{}
- for _, t := range wds {
- //if utf8.RuneCountInString(t) == 1 {
- // continue
- //}
- mm := &ShouldObject{MultiMatch: &MultiMatch{
- Query: t,
- Type: "phrase",
- Fields: field,
- }}
- should = append(should, mm)
- }
- if should == nil || len(should) <= 0 {
- return nil, ""
- }
- query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Should: should, MinSdMatch: config.Conf.DB.Es.MinSdMh}}, Source: sField, Size: 1000}
- jsr, err := json.Marshal(query)
- if err != nil {
- fmt.Printf("Err %v", err)
- os.Exit(1)
- }
- return wds, strings.Replace(string(jsr), "\\u003c", "<", -1)
- } else {
- return nil, ""
- }
- }
- // @Description
- // @Author J 2023/3/20 14:39
- func Method2(acode, anumb string) string {
- if acode == "" && anumb == "" {
- return ""
- }
- var should []interface{}
- if acode != "" {
- should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
- Query: acode,
- Type: "phrase",
- Fields: []string{"detail"},
- }})
- }
- if anumb != "" {
- should = append(should, &ShouldObject{MultiMatch: &MultiMatch{
- Query: anumb,
- Type: "phrase",
- Fields: []string{"detail"},
- }})
- }
- query := &QueryObject1{Query: &BoolObject1{Bool: &MMSObject{Should: should}}, Source: sField, Size: 500}
- jsr, err := json.Marshal(query)
- if err != nil {
- fmt.Printf("Err %v", err)
- os.Exit(1)
- }
- return string(jsr)
- }
- func judgeArea(tmp, btmp map[string]interface{}, wds []string) bool {
- if util.ObjToString(tmp["area"]) == "全国" || util.ObjToString(btmp["area"]) == "全国" {
- pname := util.ObjToString(btmp["projectname"])
- for _, wd := range wds {
- if !strings.Contains(strings.ToLower(pname), strings.ToLower(wd)) {
- return true
- }
- }
- return false
- }
- if tmp["district"] != nil && btmp["district"] != nil {
- if util.ObjToString(tmp["district"]) == util.ObjToString(btmp["district"]) {
- return false
- } else {
- return true
- }
- } else if tmp["city"] != nil && btmp["city"] != nil {
- if util.ObjToString(tmp["city"]) == util.ObjToString(btmp["city"]) {
- return false
- } else {
- return true
- }
- } else if tmp["area"] != nil && btmp["area"] != nil {
- if util.ObjToString(tmp["area"]) == util.ObjToString(btmp["area"]) {
- return false
- } else {
- return true
- }
- }
- return false
- }
- var regs = []*regexp.Regexp{
- regexp.MustCompile("[\\\\u4e00-\\\\u9fa5]{0,10}私宅"),
- regexp.MustCompile("自然人|分布式光伏发电"),
- }
- // @Description 过滤数据
- // @Author J 2023/3/24 09:30
- func filterMethod(tmp map[string]interface{}) bool {
- pname := regSymb.ReplaceAllString(util.ObjToString(tmp["projectname"]), "")
- pname = regNum.ReplaceAllString(pname, "")
- p1 := regDel.ReplaceAllString(pname, "")
- p1 = regSymb.ReplaceAllString(p1, "")
- if utf8.RuneCountInString(p1) <= 5 {
- return true
- }
- if len(reg1.FindAllString(pname, -1)) > 1 {
- return true
- }
- for _, reg := range regs {
- if reg.MatchString(util.ObjToString(tmp["projectname"])) {
- return true
- }
- }
- return false
- }
- func combArr(arr []string) []string {
- var nArr []string
- for i := 0; i < len(arr); i++ {
- if i == len(arr)-1 {
- if utf8.RuneCountInString(arr[i]) == 1 {
- nArr[len(nArr)-1] += arr[i]
- } else {
- nArr = append(nArr, arr[i])
- }
- } else {
- if utf8.RuneCountInString(arr[i]) == 1 {
- if i == 0 {
- nArr = append(nArr, arr[i])
- } else {
- nArr[len(nArr)-1] += arr[i]
- //if utf8.RuneCountInString(arr[i+1]) == 1 {
- // if utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
- // nArr[len(nArr)-1] += arr[i]
- // } else {
- // nArr = append(nArr, arr[i])
- // }
- //} else {
- // nArr[len(nArr)-1] += arr[i]
- //}
- }
- } else {
- if len(nArr) > 0 && utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 {
- nArr[len(nArr)-1] += arr[i]
- } else {
- nArr = append(nArr, arr[i])
- }
- }
- }
- }
- return nArr
- }
- func SavePpMethod() {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-savePpPool:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- savePpSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-savePpSp
- }()
- MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- savePpSp <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-savePpSp
- }()
- MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
- var StageCode []TagMatching
- func initStage() {
- info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1)
- for _, m := range *info {
- tag := TagMatching{}
- tag.tagName = util.ObjToString(m["label_name"])
- tag.tagCode = util.ObjToString(m["code"])
- // 关键词
- tag.matchField = []string{"title", "project"}
- if v := util.ObjToString(m["keyword"]); v != "" {
- tag.matchKey = util.ObjToString(m["keyword"])
- tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"]))
- }
- // 附件词
- if f := util.ObjToString(m["match_fjword"]); f != "" {
- tag.addField = strings.Split(f, ",")
- for _, s := range tag.addField {
- SelectF[s] = 1
- }
- if v := util.ObjToString(m["fjword"]); v != "" {
- tag.addKey = util.ObjToString(m["fjword"])
- tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"]))
- }
- }
- // 排除词
- if f := util.ObjToString(m["match_pcword"]); f != "" {
- tag.excludeField = strings.Split(f, ",")
- for _, s := range tag.excludeField {
- SelectF[s] = 1
- }
- if v := util.ObjToString(m["pcword"]); v != "" {
- tag.excludeKey = util.ObjToString(m["pcword"])
- tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"]))
- }
- }
- // 清理词
- if v := util.ObjToString(m["qlword"]); v != "" {
- tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",")
- }
- StageCode = append(StageCode, tag)
- }
- log.Info("initStage", zap.Int("StageCode", len(StageCode)))
- }
- func taskD() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
- query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%2000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if ids, ok := tmp["ids"].([]interface{}); ok {
- //id := mongodb.BsonIdToSId(tmp["_id"])
- for _, p := range ids {
- p1 := p.(map[string]interface{})
- info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
- if list, ok1 := (*info)["list"].([]interface{}); ok1 {
- for _, l := range list {
- l1 := l.(map[string]interface{})
- m := make(map[string]interface{})
- m["project_stage_code"] = tagFunc(l1)
- m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- m["title"] = util.ObjToString(l1["title"])
- if t := util.Int64All(l1["publishtime"]); t > 0 {
- m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- m["infoid"] = util.ObjToString(l1["infoid"])
- m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
- m["createtime"] = time.Now().Format(util.Date_Full_Layout)
- MgoPro.Save("projectset_comb_temp1", m)
- //MysqlTool.Insert("dwd_f_nzj_follw_record", m)
- }
- }
- //if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
- // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
- // if s <= 0 {
- // saveEnt := make(map[string]interface{})
- // saveEnt["proposed_id"] = id
- // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- // saveEnt["name"] = buyer
- // if eid := redis.GetStr("ent_id", buyer); eid != "" {
- // arr := strings.Split(eid, "_")
- // saveEnt["name_id"] = arr[0]
- // if len(arr) == 2 {
- // saveEnt["area_code"] = arr[1]
- // } else if len(arr) == 3 {
- // saveEnt["city_code"] = arr[2]
- // }
- // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- // if info != nil && len(*info) > 0 {
- // saveEnt["address"] = (*info)[0]["address"]
- // }
- // }
- // saveEnt["identity_type"] = 2
- // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- // saveEntPool1 <- saveEnt
- // }
- //}
- //if winner := util.ObjToString((*info)["buyer"]); winner != "" {
- // for _, w := range strings.Split(winner, ",") {
- // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
- // if s <= 0 {
- // saveEnt := make(map[string]interface{})
- // saveEnt["proposed_id"] = id
- // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- // saveEnt["name"] = w
- // if eid := redis.GetStr("ent_id", w); eid != "" {
- // arr := strings.Split(eid, "_")
- // saveEnt["name_id"] = arr[0]
- // if len(arr) == 2 {
- // saveEnt["area_code"] = arr[1]
- // } else if len(arr) == 3 {
- // saveEnt["city_code"] = arr[2]
- // }
- // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- // if info != nil && len(*info) > 0 {
- // saveEnt["address"] = (*info)[0]["address"]
- // }
- // }
- // saveEnt["identity_type"] = 3
- // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- // saveEntPool1 <- saveEnt
- // }
- // }
- //}
- }
- //size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
- //info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
- //MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- // @Description 施工准备(06)、施工(07)、设计(05)
- // @Author J 2023/4/21 14:45
- func tagFunc(info map[string]interface{}) string {
- tag := taskFuc1(info)
- if tag["project_stage"] != "" {
- return util.ObjToString(tag["project_stage"])
- }
- if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" {
- return "06"
- }
- return "00"
- }
- // @Description 在建项目增量
- // @Author J 2023/4/24 13:58
- func taskAA() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}}
- query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%2000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici {
- config.Conf.Serve.Pici = pc
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mongodb.BsonIdToSId(tmp["_id"])
- if str := redis.GetStr(config.Conf.DB.Redis.Pcode, id); str != "" {
- strs := strings.Split(str, "-")
- if len(tmp["list"].([]interface{})) != util.IntAll(strs[1]) {
- for _, info := range tmp["list"].([]interface{}) {
- info1 := info.(map[string]interface{})
- }
- }
- } else {
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d, pici ---%d", count, config.Conf.Serve.Pici))
- }
- // @Description 拟建项目增量
- // @Author J 2023/4/24 13:59
- func taskBB() {
- sess := MgoPro.GetMgoConn()
- defer MgoPro.DestoryMongoConn(sess)
- ch := make(chan bool, config.Conf.Serve.Thread)
- wg := &sync.WaitGroup{}
- //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")}
- query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%2000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if ids, ok := tmp["ids"].([]interface{}); ok {
- //id := mongodb.BsonIdToSId(tmp["_id"])
- for _, p := range ids {
- p1 := p.(map[string]interface{})
- info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil)
- if list, ok1 := (*info)["list"].([]interface{}); ok1 {
- for _, l := range list {
- l1 := l.(map[string]interface{})
- m := make(map[string]interface{})
- m["project_stage_code"] = tagFunc(l1)
- m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
- m["title"] = util.ObjToString(l1["title"])
- if t := util.Int64All(l1["publishtime"]); t > 0 {
- m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- m["infoid"] = util.ObjToString(l1["infoid"])
- m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"])))
- m["createtime"] = time.Now().Format(util.Date_Full_Layout)
- MgoPro.Save("projectset_comb_temp1", m)
- //MysqlTool.Insert("dwd_f_nzj_follw_record", m)
- }
- }
- //if buyer := util.ObjToString((*info)["buyer"]); buyer != "" {
- // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer})
- // if s <= 0 {
- // saveEnt := make(map[string]interface{})
- // saveEnt["proposed_id"] = id
- // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- // saveEnt["name"] = buyer
- // if eid := redis.GetStr("ent_id", buyer); eid != "" {
- // arr := strings.Split(eid, "_")
- // saveEnt["name_id"] = arr[0]
- // if len(arr) == 2 {
- // saveEnt["area_code"] = arr[1]
- // } else if len(arr) == 3 {
- // saveEnt["city_code"] = arr[2]
- // }
- // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- // if info != nil && len(*info) > 0 {
- // saveEnt["address"] = (*info)[0]["address"]
- // }
- // }
- // saveEnt["identity_type"] = 2
- // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- // saveEntPool1 <- saveEnt
- // }
- //}
- //if winner := util.ObjToString((*info)["buyer"]); winner != "" {
- // for _, w := range strings.Split(winner, ",") {
- // s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w})
- // if s <= 0 {
- // saveEnt := make(map[string]interface{})
- // saveEnt["proposed_id"] = id
- // saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout)
- // saveEnt["name"] = w
- // if eid := redis.GetStr("ent_id", w); eid != "" {
- // arr := strings.Split(eid, "_")
- // saveEnt["name_id"] = arr[0]
- // if len(arr) == 2 {
- // saveEnt["area_code"] = arr[1]
- // } else if len(arr) == 3 {
- // saveEnt["city_code"] = arr[2]
- // }
- // info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1)
- // if info != nil && len(*info) > 0 {
- // saveEnt["address"] = (*info)[0]["address"]
- // }
- // }
- // saveEnt["identity_type"] = 3
- // saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
- // saveEntPool1 <- saveEnt
- // }
- // }
- //}
- }
- //size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id})
- //info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc")
- //MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)})
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- var saveEntPool1 = make(chan map[string]interface{}, 5000)
- var saveEntSp1 = make(chan bool, 1)
- func SaveEntFunc1(table string, arr []string) {
- arru := make([]map[string]interface{}, saveSize)
- indexu := 0
- for {
- select {
- case v := <-saveEntPool1:
- arru[indexu] = v
- indexu++
- if indexu == saveSize {
- saveEntSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEntSp1
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru)
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- saveEntSp1 <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-saveEntSp1
- }()
- MysqlTool.InsertBulk(table, arr, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, saveSize)
- indexu = 0
- }
- }
- }
- }
|