service.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. package service
  2. import (
  3. . "app.yhyue.com/moapp/jybase/common"
  4. . "app.yhyue.com/moapp/jybase/mongodb"
  5. "dataPrefer/db"
  6. "dataPrefer/entity"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/os/gcfg"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "github.com/gogf/gf/v2/util/gconv"
  11. "golang.org/x/net/context"
  12. "log"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. const (
  18. Result_20220219 = "result_20220219"
  19. Result_20220218 = "result_20220218"
  20. Bidding = "bidding"
  21. Bidding_master = "bidding_master"
  22. )
  23. var (
  24. allSite = map[string]*entity.SiteInfo{}
  25. loginSite = map[string]bool{}
  26. delaySite = map[string]bool{}
  27. ruleConf *gcfg.Config
  28. competitorSite = map[string]bool{}
  29. gatherRules, structRules map[string][]*entity.Rule
  30. allExistsFields = map[string]string{}
  31. partExistsFields = map[string]string{}
  32. govSiteYes map[string]*g.Var
  33. extractSelectFields = map[string]interface{}{"_id": 1, "agency": 1, "area": 1, "bidamount": 1, "bidendtime": 1, "bidopentime": 1, "budget": 1, "buyer": 1, "buyertel": 1, "city": 1, "detail": 1, "isValidFile": 1, "procurementlist": 1, "projectcode": 1, "projectname": 1, "projectperiod": 1, "purchasinglist": 1, "title": 1, "winner": 1, "s_winner": 1, "winnertel": 1, "prefer_score": 1, "repeat": 1, "site": 1, "toptype": 1, "repeat_id": 1, "ai_zhipu": 1, "ext_ai_record": 1, "dataging": 1}
  34. biddingSelectFields = map[string]interface{}{"_id": 1, "detail": 1, "procurementlist": 1, "purchasinglist": 1, "isValidFile": 1, "pici": 1}
  35. )
  36. func reLoadConf() {
  37. ctx := gctx.New()
  38. ruleConf = g.Config("rule.yaml")
  39. for _, v := range ruleConf.MustGet(ctx, "站点画像.竞品网站.list").Strings() {
  40. competitorSite[v] = true
  41. }
  42. if err := ruleConf.MustGet(ctx, "采集字段").Scan(&gatherRules); err != nil {
  43. log.Fatalln("采集字段 scan error", err)
  44. }
  45. if err := ruleConf.MustGet(ctx, "结构化字段").Scan(&structRules); err != nil {
  46. log.Fatalln("结构化字段 scan error", err)
  47. }
  48. for _, v := range ruleConf.MustGet(ctx, "对比大模型.allExistsAndSame.fields").Strings() {
  49. vs := strings.Split(v, ":")
  50. allExistsFields[vs[0]] = vs[1]
  51. }
  52. for _, v := range ruleConf.MustGet(ctx, "对比大模型.partExistsAndSame.fields").Strings() {
  53. vs := strings.Split(v, ":")
  54. partExistsFields[vs[0]] = vs[1]
  55. }
  56. govSiteYes = ruleConf.MustGet(ctx, "站点画像.政府采购网站.yes").MapStrVar()
  57. }
  58. func initSite() {
  59. log.Println("开始初始化网站数据。。。")
  60. wait := &sync.WaitGroup{}
  61. wait.Add(1)
  62. go func() {
  63. sess := db.Mgo_Site.GetMgoConn()
  64. defer func() {
  65. db.Mgo_Site.DestoryMongoConn(sess)
  66. wait.Done()
  67. }()
  68. siteIt := sess.DB("editor").C("site").Find(nil).Select(map[string]interface{}{
  69. "site": 1,
  70. "site_subtype": 1,
  71. "site_toptype": 1,
  72. "type_plate": 1,
  73. }).Iter()
  74. for m := make(map[string]interface{}); siteIt.Next(&m); {
  75. allSite[gconv.String(m["site"])] = &entity.SiteInfo{
  76. Subtype: gconv.String(m["site_subtype"]),
  77. Toptype: gconv.String(m["site_toptype"]),
  78. Site: gconv.String(m["site"]),
  79. TypePlate: gconv.String(m["type_plate"]),
  80. }
  81. m = make(map[string]interface{})
  82. }
  83. log.Println("site表数据初始化结束。。。", len(allSite))
  84. }()
  85. //
  86. wait.Add(1)
  87. go func() {
  88. sess := db.Mgo_Site.GetMgoConn()
  89. defer func() {
  90. db.Mgo_Site.DestoryMongoConn(sess)
  91. wait.Done()
  92. }()
  93. loginSiteIt := sess.DB("editor").C("site_login").Find(nil).Select(map[string]interface{}{
  94. "site": 1,
  95. }).Iter()
  96. for m := make(map[string]interface{}); loginSiteIt.Next(&m); {
  97. loginSite[gconv.String(m["site"])] = true
  98. m = make(map[string]interface{})
  99. }
  100. log.Println("site_login表数据初始化结束。。。", len(loginSite))
  101. }()
  102. //
  103. wait.Add(1)
  104. go func() {
  105. sess := db.Mgo_Site.GetMgoConn()
  106. defer func() {
  107. db.Mgo_Site.DestoryMongoConn(sess)
  108. wait.Done()
  109. }()
  110. delaySiteIt := sess.DB("spider").C("spider_compete").Find(map[string]interface{}{"compete": true}).Select(map[string]interface{}{
  111. "site": 1,
  112. }).Iter()
  113. for m := make(map[string]interface{}); delaySiteIt.Next(&m); {
  114. delaySite[gconv.String(m["site"])] = true
  115. m = make(map[string]interface{})
  116. }
  117. log.Println("spider_compete表数据初始化结束。。。", len(delaySite))
  118. }()
  119. wait.Wait()
  120. }
  121. func IncDataByTime() {
  122. //{comeintime:{$gt:1747886400,$lt:1747888200}}
  123. query := map[string]interface{}{
  124. "comeintime": map[string]interface{}{
  125. "$gte": 1747756800,
  126. "$lt": 1747843200,
  127. },
  128. }
  129. //query := map[string]interface{}{
  130. // "$or": []map[string]interface{}{
  131. // map[string]interface{}{
  132. // "_id": StringTOBsonId("682bf3325f834436f023b091"),
  133. // },
  134. // map[string]interface{}{
  135. // "repeat_id": "682bf3325f834436f023b091",
  136. // },
  137. // },
  138. //}
  139. //query := map[string]interface{}{
  140. // "_id": StringTOBsonId("682cd75d5f834436f02824ee"),
  141. //}
  142. IncData(query)
  143. }
  144. func IncDataById(sid, eid string) {
  145. query := map[string]interface{}{
  146. "_id": map[string]interface{}{
  147. "$gte": StringTOBsonId(sid),
  148. "$lte": StringTOBsonId(eid),
  149. },
  150. }
  151. if sid == eid {
  152. query = map[string]interface{}{
  153. "_id": StringTOBsonId(sid),
  154. }
  155. }
  156. IncData(query)
  157. }
  158. func IncData(query map[string]interface{}) {
  159. defer Catch()
  160. log.Println("start。。。", query)
  161. reLoadConf()
  162. initSite()
  163. //
  164. sess := db.Mgo_Main.GetMgoConn()
  165. defer db.Mgo_Main.DestoryMongoConn(sess)
  166. ctx := gctx.New()
  167. it := sess.DB(db.Mgo_Main.DbName).C(Bidding).Find(query).Select(biddingSelectFields).Sort("_id").Iter()
  168. index := 0
  169. pool := make(chan bool, g.Config().MustGet(ctx, "scoringPoolSize").Int())
  170. wait := &sync.WaitGroup{}
  171. for m := make(map[string]interface{}); it.Next(&m); {
  172. index++
  173. if index%5000 == 0 {
  174. log.Println("处理数据", index)
  175. }
  176. pool <- true
  177. wait.Add(1)
  178. go func(nm map[string]interface{}) {
  179. defer func() {
  180. <-pool
  181. wait.Done()
  182. }()
  183. bidding, ok := db.Mgo_Extract.FindOneByField(Result_20220219, map[string]interface{}{"_id": nm["_id"]}, extractSelectFields)
  184. if !ok || bidding == nil || len(*bidding) == 0 {
  185. bidding, ok = db.Mgo_Extract.FindOneByField(Result_20220218, map[string]interface{}{"_id": nm["_id"]}, extractSelectFields)
  186. }
  187. _id := BsonIdToSId(nm["_id"])
  188. if !ok || bidding == nil || len(*bidding) == 0 {
  189. log.Println("抽取表中没有找到数据", _id)
  190. return
  191. } else if gconv.Int((*bidding)["dataging"]) == 1 { //抽取表extract中dataging=1跳过
  192. log.Println("抽取表中dataging是1跳过", _id)
  193. return
  194. } else if gconv.Int((*bidding)["repeat"]) == 1 && gconv.String((*bidding)["repeat_id"]) == "" {
  195. log.Println("repeat=1并且repeat_id不存在,过滤掉", _id)
  196. return
  197. }
  198. for k, _ := range *bidding {
  199. if nm[k] != nil {
  200. continue
  201. }
  202. nm[k] = (*bidding)[k]
  203. }
  204. score := MakeScore(ctx, nm)
  205. if score == -1 {
  206. return
  207. }
  208. Preferred(ctx, nm, score)
  209. }(m)
  210. m = make(map[string]interface{})
  211. }
  212. wait.Wait()
  213. log.Println("over。。。", index)
  214. }
  215. // 去打分
  216. func MakeScore(ctx context.Context, nm map[string]interface{}) int {
  217. totalScore := Scoring(ctx, nm)
  218. log.Println("标讯", BsonIdToSId(nm["_id"]), "分数", totalScore)
  219. nm["prefer_score"] = totalScore
  220. db.Mgo_Main.UpdateById(Bidding, nm["_id"], map[string]interface{}{
  221. "$set": map[string]interface{}{
  222. "prefer_score": totalScore,
  223. },
  224. })
  225. return totalScore
  226. }
  227. // 打分
  228. func Scoring(ctx context.Context, data map[string]interface{}) int {
  229. _id := BsonIdToSId(data["_id"])
  230. //
  231. var countScore = func(field string, rule *entity.Rule) int {
  232. var totalScore int
  233. switch rule.Type {
  234. case "exists":
  235. for _, vv := range gconv.Strings(rule.Value) {
  236. if data[vv] != nil {
  237. continue
  238. }
  239. totalScore += rule.Score
  240. log.Println(_id, vv, "字段不存在", rule.Score)
  241. }
  242. case "length":
  243. valueLen := len([]rune(gconv.String(data[field])))
  244. if rule.Min >= 0 && rule.Max > 0 {
  245. if valueLen >= rule.Min && valueLen <= rule.Max {
  246. totalScore += rule.Score
  247. log.Println(_id, field, "字符串长度大于", rule.Min, "小于", rule.Max, rule.Score)
  248. }
  249. } else if rule.Min >= 0 {
  250. if valueLen > rule.Min {
  251. totalScore += rule.Score
  252. log.Println(_id, field, "字符串长度大于", rule.Min, rule.Score)
  253. }
  254. }
  255. case "equal":
  256. var isDeduct bool
  257. switch value := rule.Value.(type) {
  258. case int, int32, int64:
  259. isDeduct = gconv.Int64(value) == gconv.Int64(data[field])
  260. case float32, float64:
  261. isDeduct = gconv.Float64(value) == gconv.Float64(data[field])
  262. case bool:
  263. isDeduct = gconv.Bool(value) == gconv.Bool(data[field])
  264. default:
  265. isDeduct = rule.Value == data[field]
  266. }
  267. if isDeduct {
  268. totalScore += rule.Score
  269. log.Println(_id, field, "值等于", rule.Value, rule.Score)
  270. }
  271. }
  272. return totalScore
  273. }
  274. totalScore := 100
  275. sts := structRules[gconv.String(data["toptype"])]
  276. if sts == nil {
  277. sts = structRules["其他"]
  278. }
  279. //结构化字段
  280. for _, v := range sts {
  281. totalScore += countScore("", v)
  282. }
  283. //采集字段
  284. for k, v := range gatherRules {
  285. for _, vv := range v {
  286. if thisScore := countScore(k, vv); thisScore != 0 {
  287. totalScore += thisScore
  288. break
  289. }
  290. }
  291. }
  292. site := gconv.String(data["site"])
  293. if allSite[site] != nil {
  294. score := 0
  295. if topType := ruleConf.MustGet(ctx, "站点画像.政府采购网站.topType").String(); allSite[site].Toptype == topType {
  296. if govSiteYes[allSite[site].Subtype].IsNil() {
  297. score = govSiteYes["默认"].Int()
  298. log.Println(_id, "地方政府网站", score)
  299. } else {
  300. score = govSiteYes[allSite[site].Subtype].Int()
  301. log.Println(_id, allSite[site].Subtype, "类网站", score)
  302. }
  303. } else {
  304. score = ruleConf.MustGet(ctx, "站点画像.政府采购网站.not.score").Int()
  305. log.Println(_id, "非", topType, score)
  306. }
  307. totalScore += score
  308. }
  309. //
  310. if loginSite[site] {
  311. score := ruleConf.MustGet(ctx, "站点画像.需登录网站.score").Int()
  312. totalScore += score
  313. log.Println(_id, "需登录网站", score)
  314. }
  315. //
  316. if competitorSite[site] {
  317. score := ruleConf.MustGet(ctx, "站点画像.竞品网站.score").Int()
  318. totalScore += score
  319. log.Println(_id, "竞品网站", score)
  320. }
  321. //
  322. if delaySite[site] {
  323. score := ruleConf.MustGet(ctx, "站点画像.延时采集网站.score").Int()
  324. totalScore += score
  325. log.Println(_id, "延时采集网站", score)
  326. }
  327. //
  328. if data["ai_zhipu"] != nil {
  329. aiZhipu, _ := data["ai_zhipu"].(map[string]interface{})
  330. compare := map[string]interface{}{}
  331. extAiRecord, _ := data["ext_ai_record"].(map[string]interface{})
  332. for _, v := range []map[string]string{allExistsFields, partExistsFields} {
  333. for kk, _ := range v {
  334. if extAiRecord[kk] != nil {
  335. compare[kk] = extAiRecord[kk]
  336. } else if data[kk] != nil {
  337. compare[kk] = data[kk]
  338. }
  339. }
  340. }
  341. //全部字段存在,且不一致
  342. allFieldExists := 0
  343. for k, v := range allExistsFields {
  344. if compare[k] == nil || aiZhipu[v] == nil {
  345. break
  346. }
  347. allFieldExists++
  348. }
  349. if allFieldExists > 0 && allFieldExists == len(allExistsFields) {
  350. score := ruleConf.MustGet(ctx, "对比大模型.allExistsAndSame.score").Int()
  351. for k, v := range allExistsFields {
  352. if gconv.String(compare[k]) == gconv.String(aiZhipu[v]) {
  353. continue
  354. }
  355. totalScore += score
  356. log.Println(_id, "对比大模型,字段都存在,", v, "的值不一致", score)
  357. }
  358. }
  359. //部分字段存在,且不一致
  360. partFieldExists := false
  361. for k, v := range partExistsFields {
  362. if compare[k] == nil || aiZhipu[v] == nil {
  363. continue
  364. }
  365. partFieldExists = true
  366. break
  367. }
  368. if partFieldExists {
  369. score := ruleConf.MustGet(ctx, "对比大模型.partExistsAndSame.score").Int()
  370. for k, v := range partExistsFields {
  371. if compare[k] == nil || aiZhipu[v] == nil || gconv.String(compare[k]) == gconv.String(aiZhipu[v]) {
  372. continue
  373. }
  374. totalScore += score
  375. log.Println(_id, "对比大模型,部分字段存在,", v, "的值不一致", score)
  376. }
  377. }
  378. }
  379. return totalScore
  380. }
  381. // 数据优选
  382. func Preferred(ctx context.Context, mainData map[string]interface{}, score int) {
  383. _id := BsonIdToSId(mainData["_id"])
  384. rid := gconv.String(mainData["repeat_id"])
  385. isRepeat := gconv.Int(mainData["repeat"])
  386. var miniPici, repeatPici int64
  387. var bestId string = _id
  388. var bestScore int = score
  389. //找被我判重掉的数据
  390. var allDatas []map[string]interface{}
  391. wait := &sync.WaitGroup{}
  392. lock := &sync.Mutex{}
  393. for _, v := range []string{Result_20220219, Result_20220218} {
  394. wait.Add(1)
  395. go func(collection string) {
  396. defer wait.Done()
  397. if isRepeat == 1 && rid == "" {
  398. return
  399. }
  400. findId := _id
  401. if isRepeat == 1 {
  402. findId = rid
  403. }
  404. datas, ok := db.Mgo_Extract.Find(collection, map[string]interface{}{"repeat_id": findId}, nil, extractSelectFields, false, -1, -1)
  405. if ok && *datas != nil && len(*datas) > 0 {
  406. lock.Lock()
  407. allDatas = append(allDatas, *datas...)
  408. lock.Unlock()
  409. }
  410. }(v)
  411. if isRepeat == 1 && rid != "" {
  412. wait.Add(1)
  413. go func(collection string) {
  414. defer wait.Done()
  415. data, ok := db.Mgo_Extract.FindById(collection, rid, extractSelectFields)
  416. if ok && *data != nil && len(*data) > 0 {
  417. lock.Lock()
  418. allDatas = append(allDatas, *data)
  419. lock.Unlock()
  420. }
  421. }(v)
  422. }
  423. }
  424. wait.Wait()
  425. pool := make(chan bool, g.Config().MustGet(ctx, "preferPoolSize").Int())
  426. repeatMap := map[string]bool{}
  427. findIds := []interface{}{StringTOBsonId(_id)}
  428. for _, vv := range allDatas {
  429. thisId := BsonIdToSId(vv["_id"])
  430. if thisId == _id || repeatMap[thisId] {
  431. continue
  432. }
  433. repeatMap[thisId] = true
  434. pool <- true
  435. wait.Add(1)
  436. go func(vm map[string]interface{}) {
  437. defer func() {
  438. <-pool
  439. wait.Done()
  440. }()
  441. bidObj, _ := db.Mgo_Main.FindOneByField(Bidding, map[string]interface{}{"_id": vm["_id"]}, biddingSelectFields)
  442. thisId := BsonIdToSId(vm["_id"])
  443. if bidObj == nil || len(*bidObj) == 0 {
  444. log.Println("bidding中没有找到", thisId)
  445. return
  446. }
  447. for k, v := range *bidObj {
  448. vm[k] = v
  449. }
  450. if pici := gconv.Int64(vm["pici"]); pici > 0 {
  451. if miniPici == 0 || miniPici > pici {
  452. miniPici = pici
  453. }
  454. if thisId == rid {
  455. repeatPici = pici
  456. }
  457. }
  458. preferScore := MakeScore(ctx, vm)
  459. lock.Lock()
  460. findIds = append(findIds, vm["_id"])
  461. if preferScore > bestScore || (preferScore == bestScore && thisId > bestId) {
  462. bestId = thisId
  463. bestScore = preferScore
  464. }
  465. lock.Unlock()
  466. }(vv)
  467. }
  468. wait.Wait()
  469. updateIds := []interface{}{}
  470. for _, v := range findIds {
  471. if BsonIdToSId(v) == bestId {
  472. continue
  473. }
  474. updateIds = append(updateIds, v)
  475. }
  476. prevBests, ok := db.Mgo_Main.Find(Bidding, map[string]interface{}{
  477. "_id": map[string]interface{}{
  478. "$in": findIds,
  479. },
  480. "isprefer": 1,
  481. }, `{"prefer_score":-1,"_id":1}`, `{"_id":1,"pici":1}`, false, 0, 1)
  482. if !ok {
  483. log.Println("查询上次优选id失败", findIds)
  484. return
  485. }
  486. prevBestId := ""
  487. var prevPici int64
  488. if prevBests != nil && len(*prevBests) == 1 {
  489. prevBestId = BsonIdToSId((*prevBests)[0]["_id"])
  490. prevPici = gconv.Int64((*prevBests)[0]["pici"])
  491. }
  492. if prevBestId == "" {
  493. prevBestId = rid
  494. }
  495. if prevPici == 0 {
  496. prevPici = repeatPici
  497. }
  498. if prevPici == 0 {
  499. prevPici = miniPici
  500. }
  501. //优选
  502. if prevBestId == "" || prevBestId != bestId {
  503. set := map[string]interface{}{
  504. "isprefer": 1,
  505. "prefertime": time.Now().Unix(),
  506. "extracttype": 1,
  507. }
  508. if prevPici > 0 {
  509. set["pici"] = prevPici
  510. }
  511. if prevBestId != "" && prevBestId != bestId {
  512. set["old_preferid"] = prevBestId
  513. }
  514. db.Mgo_Main.UpdateById(Bidding, bestId, map[string]interface{}{
  515. "$set": set,
  516. })
  517. db.Mgo_Main.Save("bidding_prefer_log", map[string]interface{}{
  518. "create_time": time.Now().Unix(),
  519. "old_preferid": prevBestId,
  520. "preferid": bestId,
  521. "ids": findIds,
  522. })
  523. }
  524. //取消优选
  525. db.Mgo_Main.Update(Bidding, map[string]interface{}{
  526. "_id": map[string]interface{}{
  527. "$in": updateIds,
  528. },
  529. "isprefer": 1,
  530. }, map[string]interface{}{
  531. "$set": map[string]interface{}{
  532. "isprefer": -1,
  533. "extracttype": -1,
  534. "prefertime": time.Now().Unix(),
  535. },
  536. "$unset": map[string]interface{}{"old_preferid": ""},
  537. }, false, true)
  538. //
  539. db.Mgo_Main.Update(Bidding, map[string]interface{}{
  540. "_id": map[string]interface{}{
  541. "$in": updateIds,
  542. },
  543. "extracttype": 1,
  544. }, map[string]interface{}{
  545. "$set": map[string]interface{}{
  546. "extracttype": -1,
  547. },
  548. }, false, true)
  549. }
  550. // 数据优选
  551. func PreferredTest(ctx context.Context, mainData map[string]interface{}, score int) {
  552. _id := BsonIdToSId(mainData["_id"])
  553. rid := gconv.String(mainData["repeat_id"])
  554. isRepeat := gconv.Int(mainData["repeat"])
  555. var bestId string = _id
  556. var bestScore int = score
  557. //找被我判重掉的数据
  558. var allDatas []map[string]interface{}
  559. wait := &sync.WaitGroup{}
  560. lock := &sync.Mutex{}
  561. for _, v := range []string{Result_20220219, Result_20220218} {
  562. wait.Add(1)
  563. go func(collection string) {
  564. defer wait.Done()
  565. if isRepeat == 1 && rid == "" {
  566. return
  567. }
  568. findId := _id
  569. if isRepeat == 1 {
  570. findId = rid
  571. }
  572. datas, ok := db.Mgo_Extract.Find(collection, map[string]interface{}{"repeat_id": findId}, nil, extractSelectFields, false, -1, -1)
  573. if ok && *datas != nil && len(*datas) > 0 {
  574. lock.Lock()
  575. allDatas = append(allDatas, *datas...)
  576. lock.Unlock()
  577. }
  578. }(v)
  579. if isRepeat == 1 && rid != "" {
  580. wait.Add(1)
  581. go func(collection string) {
  582. defer wait.Done()
  583. data, ok := db.Mgo_Extract.FindById(collection, rid, extractSelectFields)
  584. if ok && *data != nil && len(*data) > 0 {
  585. lock.Lock()
  586. allDatas = append(allDatas, *data)
  587. lock.Unlock()
  588. }
  589. }(v)
  590. }
  591. }
  592. wait.Wait()
  593. pool := make(chan bool, g.Config().MustGet(ctx, "preferPoolSize").Int())
  594. repeatMap := map[string]bool{}
  595. findIds := []interface{}{StringTOBsonId(_id)}
  596. groupMap := map[string]map[string]interface{}{_id: mainData}
  597. for _, vv := range allDatas {
  598. thisId := BsonIdToSId(vv["_id"])
  599. if thisId == _id || repeatMap[thisId] {
  600. continue
  601. }
  602. repeatMap[thisId] = true
  603. pool <- true
  604. wait.Add(1)
  605. go func(vm map[string]interface{}) {
  606. defer func() {
  607. <-pool
  608. wait.Done()
  609. }()
  610. preferScore := MakeScore(ctx, vm)
  611. lock.Lock()
  612. thisId := BsonIdToSId(vm["_id"])
  613. groupMap[thisId] = vm
  614. findIds = append(findIds, vm["_id"])
  615. if preferScore > bestScore || (preferScore == bestScore && thisId > bestId) {
  616. bestId = thisId
  617. bestScore = preferScore
  618. }
  619. lock.Unlock()
  620. }(vv)
  621. }
  622. wait.Wait()
  623. //找master_id
  624. //datas, ok := db.Mgo_Main.Find(Bidding, map[string]interface{}{
  625. // "_id": map[string]interface{}{
  626. // "$in": findIds,
  627. // },
  628. //}, map[string]interface{}{"prefer_score": -1, "_id": 1}, map[string]interface{}{"_id": 0, "master_id": 1}, false, -1, -1)
  629. //if !ok {
  630. // log.Println("bidding中找master_id出错", findIds)
  631. // return
  632. //}
  633. updateIds := []interface{}{}
  634. existsMasterIds := map[string]bool{}
  635. //if datas != nil && len(*datas) > 0 {
  636. // for _, v := range *datas {
  637. // if v["master_id"] == nil {
  638. // continue
  639. // }
  640. // if masterId == "" {
  641. // masterId = gconv.String(v["master_id"])
  642. // }
  643. // existsMasterIds[BsonIdToSId(v["_id"])] = true
  644. // }
  645. //}
  646. for _, v := range findIds {
  647. if existsMasterIds[BsonIdToSId(v)] {
  648. continue
  649. }
  650. updateIds = append(updateIds, v)
  651. }
  652. //保存最优的数据
  653. data, ok := db.Mgo_Main.FindById(Bidding, bestId, nil)
  654. if ok && data != nil && len(*data) > 0 {
  655. delete(*data, "detail")
  656. delete(*data, "contenthtml")
  657. delete(*data, "master_id")
  658. if oldObj := groupMap[bestId]; oldObj != nil {
  659. if repeatId := gconv.String(oldObj["repeat_id"]); repeatId != "" {
  660. (*data)["old_id"] = repeatId
  661. if oldObj = groupMap[repeatId]; oldObj != nil {
  662. if siteObj := allSite[gconv.String(oldObj["site"])]; siteObj != nil {
  663. (*data)["old_site_toptype"] = siteObj.Toptype
  664. }
  665. (*data)["old_prefer_score"] = oldObj["prefer_score"]
  666. }
  667. }
  668. }
  669. if allSite[gconv.String((*data)["site"])] != nil {
  670. (*data)["site_toptype"] = allSite[gconv.String((*data)["site"])].Toptype
  671. }
  672. (*data)["id"] = bestId
  673. db.Mgo_Main.Update(Bidding_master, map[string]interface{}{"_id": (*data)["_id"]}, map[string]interface{}{"$set": *data}, true, false)
  674. }
  675. }