task.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. "mongodb"
  6. qu "qfw/util"
  7. "strconv"
  8. "time"
  9. )
  10. var queryClose = make(chan bool)
  11. var queryCloseOver = make(chan bool)
  12. func SaveMgo() {
  13. log.Println("Mgo Save...")
  14. arru := make([]map[string]interface{}, 200)
  15. indexu := 0
  16. for {
  17. select {
  18. case v := <-MgoSaveCache:
  19. arru[indexu] = v
  20. indexu++
  21. if indexu == 200 {
  22. SP <- true
  23. go func(arru []map[string]interface{}) {
  24. defer func() {
  25. <-SP
  26. }()
  27. MongoTool.SaveBulk(CollSave, arru...)
  28. }(arru)
  29. arru = make([]map[string]interface{}, 200)
  30. indexu = 0
  31. }
  32. case <-time.After(1000 * time.Millisecond):
  33. if indexu > 0 {
  34. SP <- true
  35. go func(arru []map[string]interface{}) {
  36. defer func() {
  37. <-SP
  38. }()
  39. MongoTool.SaveBulk(CollSave, arru...)
  40. }(arru[:indexu])
  41. arru = make([]map[string]interface{}, 200)
  42. indexu = 0
  43. }
  44. }
  45. }
  46. }
  47. //项目数据
  48. func GetProjectData(t string) {
  49. defer qu.Catch()
  50. count, taskcount := 0, 0
  51. sess := MongoTool.GetMgoConn()
  52. defer MongoTool.DestoryMongoConn(sess)
  53. dataPool := make(chan map[string]interface{}, 2000)
  54. over := make(chan bool)
  55. pool := make(chan bool, 4)
  56. go func() {
  57. L:
  58. for {
  59. select {
  60. case tmp := <-dataPool:
  61. pool <- true
  62. taskcount++
  63. go func(tmp map[string]interface{}) {
  64. defer func() {
  65. <-pool
  66. }()
  67. ForecastMethod(tmp)
  68. }(tmp)
  69. case <-over:
  70. break L
  71. }
  72. }
  73. }()
  74. uptime, err := strconv.ParseInt(t, 10, 64)
  75. if err == nil {
  76. qu.Debug(err)
  77. }
  78. query := bson.M{
  79. "updatetime": bson.M{"$gt": uptime},
  80. "o_projectinfo.nature": bson.M{"$in": Nature},
  81. "spidercode": bson.M{"$in": SpiderCodes},
  82. "$or": []bson.M{
  83. {"category_buyer": bson.M{"$in": Category}},
  84. {"category_purpose": bson.M{"$in": Category}},
  85. },
  86. }
  87. qu.Debug("query-----", CollPro, query["updatetime"])
  88. filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1}
  89. it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
  90. var lastid interface{}
  91. L:
  92. for {
  93. select {
  94. case <-queryClose:
  95. log.Println("receive interrupt sign")
  96. log.Println("close iter..", lastid, it.Cursor.Close(nil))
  97. queryCloseOver <- true
  98. break L
  99. default:
  100. tmp := make(map[string]interface{})
  101. if it.Next(&tmp) {
  102. lastid = tmp["_id"]
  103. if count%1000 == 0 {
  104. log.Println("current", count, lastid)
  105. }
  106. dataPool <- tmp
  107. count++
  108. } else {
  109. break L
  110. }
  111. }
  112. }
  113. time.Sleep(5 * time.Second)
  114. over <- true
  115. //阻塞
  116. for n := 0; n < 4; n++ {
  117. pool <- true
  118. }
  119. }
  120. func ForecastMethod(pro map[string]interface{}) {
  121. pro["infoid"] = mongodb.BsonIdToSId(pro["_id"])
  122. pro["yucetime"] = time.Now().Unix()
  123. pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
  124. delete(pro, "_id")
  125. delete(pro, "o_projectinfo")
  126. category := GetCategory(pro)
  127. stage := qu.ObjToString(pro["stage"])
  128. q := bson.M{
  129. "category": category,
  130. "stage": bson.M{"$in": Forecast[stage]},
  131. }
  132. var maps []map[string]interface{}
  133. ent, _ := MongoTool.FindOne(CollEnt, bson.M{"buyer_name": pro["buyer"]})
  134. if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
  135. arr := qu.ObjArrToStringArr((*ent)["buyerclass"].([]interface{}))
  136. if len(arr) == 1 {
  137. pro["buyerclass"] = arr
  138. }else {
  139. var arrTmp []string
  140. for _, v := range arr {
  141. if v != "其它" {
  142. arrTmp = append(arrTmp, v)
  143. }
  144. }
  145. pro["buyerclass"] = arrTmp
  146. }
  147. }
  148. result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1)
  149. for _, t := range *result {
  150. if len(t) == 0 {
  151. continue
  152. }
  153. tmp := make(map[string]interface{})
  154. tmp["stage"] = t["stage"]
  155. tmp["purchase_classify"] = t["purchase_classify"]
  156. tmp["purchasing"] = t["purchasing"]
  157. tmp["p_rate"] = Rate
  158. tmp["time"] = ""
  159. //tmp["p_projects"] = "" 暂无该字段
  160. maps = append(maps, tmp)
  161. }
  162. if len(maps) > 0 {
  163. pro["results"] = maps
  164. }
  165. MgoSaveCache <- pro
  166. }
  167. func GetCategory(tmp map[string]interface{}) string {
  168. categoryBuyerIndex := -1
  169. categoryPurposeIndex := -1
  170. for k, v := range Category {
  171. if tmp["category_buyer"] != nil {
  172. if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) {
  173. categoryBuyerIndex = k
  174. }
  175. }
  176. if tmp["category_purpose"] != nil {
  177. categoryPurposeIndex = k
  178. }
  179. }
  180. if categoryBuyerIndex >= categoryPurposeIndex {
  181. return qu.ObjToString(Category[categoryBuyerIndex])
  182. } else {
  183. return qu.ObjToString(Category[categoryPurposeIndex])
  184. }
  185. }