123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- package main
- import (
- "strconv"
- //"fmt"
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- mgov "gopkg.in/mgo.v2"
- "gopkg.in/mgo.v2/bson"
- )
- func projectTask(data []byte, project, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- } else {
- if q["pici"] == nil {
- idMap, _ := q["_id"].(map[string]interface{})
- if idMap != nil {
- tmpQ := map[string]interface{}{}
- for c, id := range idMap {
- if idStr, ok := id.(string); ok && id != "" {
- tmpQ[c] = util.StringTOBsonId(idStr)
- }
- }
- q["_id"] = tmpQ
- }
- }
- }
- var session *mgov.Session
- if project["addr"] != nil {
- session = project2db.GetMgoConn(3600)
- defer project2db.DestoryMongoConn(session)
- } else {
- session = extractmgo.GetMgoConn(3600)
- defer extractmgo.DestoryMongoConn(session)
- }
- c, _ := project["collect"].(string)
- db, _ := project["db"].(string)
- index, _ := project["index"].(string)
- itype, _ := project["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- pp := map[string]map[string]interface{}{}
- if packages, ok := tmp["package"].(map[string]interface{}); ok {
- for _, pks := range packages {
- if pk, ok := pks.([]interface{}); ok {
- for _, v := range pk {
- if p, ok := v.(map[string]interface{}); ok {
- winner := util.ObjToString(p["winner"])
- bidamount := util.Float64All((p["bidamount"]))
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- }
- }
- }
- } else {
- winner := util.ObjToString(tmp["winner"])
- bidamount := util.Float64All(tmp["bidamount"])
- if len(winner) > 4 && bidamount > 0 {
- p := map[string]interface{}{
- "winner": winner,
- "bidamount": bidamount,
- }
- pp[winner] = p
- }
- }
- pk1 := []map[string]interface{}{}
- for _, v := range pp {
- pk1 = append(pk1, v)
- }
- if len(pk1) > 0 {
- tmp["package1"] = pk1
- }
- budget := util.Float64All(tmp["budget"])
- bidamount := util.Float64All(tmp["bidamount"])
- if float64(budget) > 0 && float64(bidamount) > 0 {
- rate := float64(1) - float64(bidamount)/float64(budget)
- f, _ := strconv.ParseFloat(strconv.FormatFloat(rate, 'f', 4, 64), 64)
- //不在0~0.6之间,不生成费率;只生成预算,中标金额舍弃,索引增加折扣率异常标识
- if f < 0 || f > 0.6 {
- delete(tmp, "bidamount")
- tmp["prate_flag"] = 1
- } else {
- tmp["project_rate"] = f
- }
- }
- if topscopeclass, ok := tmp["topscopeclass"].([]interface{}); ok {
- tc := []string{}
- m2 := map[string]bool{}
- for _, v := range topscopeclass {
- str := util.ObjToString(v)
- str = reg_letter.ReplaceAllString(str, "") // 去除字母
- if !m2[str] {
- m2[str] = true
- tc = append(tc, str)
- }
- }
- tmp["topscopeclass"] = tc
- }
- //不生索引字段
- delete(tmp, "package")
- delete(tmp, "infofield")
- list := tmp["list"].([]interface{})
- for _, m := range list {
- tmpM := m.(map[string]interface{})
- //删除purchasing,review_experts
- delete(tmpM, "purchasing")
- delete(tmpM, "review_experts")
- if bidamount, ok := tmpM["bidamount"].(string); ok && len(bidamount) > 0 { //bidamount为string类型,转成float
- tmpB := util.Float64All(tmpM["bidamount"])
- tmpM["bidamount"] = tmpB
- }
- //projectscope截断
- listProjectscopeRune := []rune(util.ObjToString(tmpM["projectscope"]))
- if len(listProjectscopeRune) > 1000 {
- tmpM["projectscope"] = string(listProjectscopeRune[:1000])
- }
- }
- //projectscope截断
- projectscopeRune := []rune(util.ObjToString(tmp["projectscope"]))
- if len(projectscopeRune) > 1000 {
- tmp["projectscope"] = string(projectscopeRune[:1000])
- }
- // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- // tmp["budget"] = nil
- // }
- // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- // tmp["bidamount"] = nil
- // }
- //go IS.Add("project")
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- util.Debug(arr)
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println(mapInfo, "create project index...over", n)
- }
|