package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/log" "app.yhyue.com/data_processing/common_utils/mongodb" "encoding/json" "fmt" "go.uber.org/zap" "proposed_project/config" "sync" "time" ) func updateAllQueue() { arru := make([][]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == saveSize { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpSertBulk(config.Conf.Serve.ProColl, arru...) }(arru) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1 * time.Second): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() MgoPro.UpSertBulk(config.Conf.Serve.ProColl, arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, saveSize) indexu = 0 } } } } func doTask(gtid, lteid string) { defer util.Catch() sess := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} f := map[string]interface{}{ "detail": 0, "contenthtml": 0, "field_source": 0, "nj_record": 0, "kvtext": 0, } log.Info("doTask", zap.Any("q", q)) query := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(f).Sort("publishtime").Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if util.ObjToString(tmp["toptype"]) != "拟建" { continue } ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() taskTag(temp) // 增量数据打标签 info := ParseInfo(temp) if info.ProjectName != "" { MergeTask(temp, info) } else { log.Error("MergeTask", zap.Any("infoid", temp["_id"])) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) TaskSingle = true } func MergeTask(tmp map[string]interface{}, info *Info) { if info.ApproveCode != "" && info.ApproveCode != "无" && info.ApproveCode != "暂无项目代码" { AllCodeMapLock.Lock() pid := AllCodeMap[info.ApproveCode] AllCodeMapLock.Unlock() if pid != "" { AllPidMapLock.Lock() res := AllPidMap[pid] AllPidMapLock.Unlock() if res != nil { comparePro := res.P updateProject(tmp, *info, comparePro) } else { startProjectMerge(tmp, info) } } else { pid, pinfo := newProject(tmp, *info) AllPidMapLock.Lock() AllPidMap[info.Id] = &ID{P: pinfo} AllPidMapLock.Unlock() AllPnMapLock.Lock() res := AllPnMap[info.ProjectName] if res != nil { res.Id[info.Id] = info.ProjectName } else { res = &Pname{ Id: map[string]string{info.Id: info.ProjectName}, } } AllPnMapLock.Unlock() AllCodeMapLock.Lock() AllCodeMap[info.ApproveCode] = pid AllCodeMapLock.Unlock() } } else { startProjectMerge(tmp, info) } } func ParseInfo(tmp map[string]interface{}) (info *Info) { bys, _ := json.Marshal(tmp) var thisinfo *Info _ = json.Unmarshal(bys, &thisinfo) if thisinfo == nil { return nil } if len(thisinfo.Topscopeclass) == 0 { thisinfo.Topscopeclass = []string{} } if len(thisinfo.Subscopeclass) == 0 { thisinfo.Subscopeclass = []string{} } return thisinfo } func taskTag(tmp map[string]interface{}) { tag := taskFuc(tmp) if tag["nature"] != "" { tmp["nature_code"] = tag["nature"] } else { tmp["nature_code"] = "00" } if tag["project_stage"] != "" { tmp["project_stage_code"] = tag["project_stage"] } else { tmp["project_stage_code"] = "00" } if tag["owner"] != "" { tmp["ownerclass_code"] = tag["owner"] } else { tmp["ownerclass_code"] = "00" } if tag["sub_category"] != "" { tmp["category_code"] = tag["sub_category"] } else { if tag["top_category"] != "" { tmp["category_code"] = tag["top_category"] } } if util.ObjToString(tmp["category_code"]) == "" { tmp["category_code"] = "04" } } func doTask1(gtid, lteid string) { defer util.Catch() sess := MgoBid.GetMgoConn() defer MgoBid.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := map[string]interface{}{"_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}} log.Info("doTask", zap.Any("q", q)) query := sess.DB(config.Conf.DB.MongoB.Dbname).C(config.Conf.DB.MongoB.Coll).Find(q).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } if util.ObjToString(tmp["toptype"]) != "拟建" { continue } ch <- true wg.Add(1) go func(temp map[string]interface{}) { defer func() { <-ch wg.Done() }() savePool <- temp }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) TaskSingle = true }