package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "encoding/json" "fmt" "go.uber.org/zap" "proposed_project/config" "strings" "sync" "time" ) func updateAllQueue() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpSertBulk(config.Conf.Serve.ProColl, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpSertBulk(config.Conf.Serve.ProColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } func doTask(gtid, lteid string) { defer util.Catch() sess := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} f := map[string]interface{}{ "contenthtml": 0, "field_source": 0, "nj_record": 0, "kvtext": 0, } log.Info("doTask", zap.Any("q", q)) query := sess.DB(config.Conf.DB.MongoB.Dbname).C("nzj_bidding").Find(q).Select(f).Sort("publishtime").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%5000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } //if util.ObjToString(tmp["toptype"]) != "拟建" { // continue //} ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() taskTag(temp) // 增量数据打标签 info := ParseInfo(temp) if info.ProjectName != "" { MergeTask(temp, info) } else { log.Error("MergeTask", zap.Any("infoid", temp["_id"])) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count), zap.String("LastId: ", LastId)) TaskSingle = true } func MergeTask(tmp map[string]interface{}, info *Info) { if info.ApproveCode != "" && info.ApproveCode != "无" && info.ApproveCode != "暂无项目代码" { AllCodeMapLock.Lock() pid := AllCodeMap[info.ApproveCode] AllCodeMapLock.Unlock() if pid != "" { AllPidMapLock.Lock() res := AllPidMap[pid] AllPidMapLock.Unlock() if res != nil { comparePro := res.P updateProject(tmp, *info, comparePro) } else { startProjectMerge(tmp, info) } } else { pid = startProjectMerge(tmp, info) AllCodeMapLock.Lock() AllCodeMap[info.ApproveCode] = pid AllCodeMapLock.Unlock() } } else { startProjectMerge(tmp, info) } } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } return thisinfo } func taskTag(tmp map[string]interface{}) { tag := taskFuc(tmp) if tag["nature"] != "" { tmp["nature_code"] = tag["nature"] } else { tmp["nature_code"] = "00" } if tag["project_stage"] != "" { tmp["project_stage_code"] = tag["project_stage"] } else { tmp["project_stage_code"] = "00" } if tag["owner"] != "" { tmp["ownerclass_code"] = tag["owner"] } else { tmp["ownerclass_code"] = "00" } if tag["sub_category"] != "" { tmp["category_code"] = tag["sub_category"] } else { if tag["top_category"] != "" { tmp["category_code"] = tag["top_category"] } } if util.ObjToString(tmp["category_code"]) == "" { tmp["category_code"] = "04" } } func doTask1(gtid, lteid string) { defer util.Catch() sess := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} log.Info("doTask", zap.Any("q", q)) query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if util.ObjToString(tmp["toptype"]) != "拟建" { continue } ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() //disField(temp) savePool <- temp }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) TaskSingle = true } func disField(tmp map[string]interface{}) { njInfo := map[string]int{ "approvecode": 1, "approvedept": 1, "approvestatus": 1, "approvetime": 1, "approvenumber": 1, "approvecontent": 1, "projecttype": 1, "approvecity": 1, "countryprojectcode": 1, "total_investment": 1, "owner": 1, "projectperiod": 1, "project_person": 1, "projectaddr": 1, "project_scale": 1, } code := util.ObjToString(tmp["spidercode"]) //只处理增量的lua采集的信息 if !strings.Contains(code, "_njpc") { for f := range njInfo { if tmp[f] != nil { njInfo[f] = 0 //值为0表示该字段已抽取,不必再更新 } } //projectinfo if projectinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 { flag := false for f, n := range njInfo { if n == 1 && projectinfo[f] != nil { tmp[f] = projectinfo[f] flag = true } } if !flag { //嵌套信息 tmpTime := "" tmpK := "" //记录最终取哪个map for k, v := range projectinfo { if result, ok := v.(map[string]interface{}); ok && len(result) > 0 && k != "attachments" { approvetime := util.ObjToString(result["approvetime"]) if tmpTime == "" || approvetime >= tmpTime { //取最新时间信息 tmpTime = approvetime tmpK = k } } } if tmpK != "" { if resultTmp, ok := projectinfo[tmpK].(map[string]interface{}); ok && len(resultTmp) > 0 { for f, n := range njInfo { if n == 1 && resultTmp[f] != nil { tmp[f] = resultTmp[f] } } } } } } //buyer if buyer := tmp["buyer"]; buyer != nil && tmp["owner"] == nil { tmp["owner"] = buyer } } }