task.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package main
  2. import (
  3. "go.mongodb.org/mongo-driver/bson"
  4. "log"
  5. "mongodb"
  6. qu "qfw/util"
  7. "time"
  8. )
  9. var queryClose = make(chan bool)
  10. var queryCloseOver = make(chan bool)
  11. func SaveMgo() {
  12. log.Println("Mgo Save...")
  13. arru := make([]map[string]interface{}, 200)
  14. indexu := 0
  15. for {
  16. select {
  17. case v := <-MgoSaveCache:
  18. arru[indexu] = v
  19. indexu++
  20. if indexu == 200 {
  21. SP <- true
  22. go func(arru []map[string]interface{}) {
  23. defer func() {
  24. <-SP
  25. }()
  26. MongoTool.SaveBulk(CollSave, arru...)
  27. }(arru)
  28. arru = make([]map[string]interface{}, 200)
  29. indexu = 0
  30. }
  31. case <-time.After(1000 * time.Millisecond):
  32. if indexu > 0 {
  33. SP <- true
  34. go func(arru []map[string]interface{}) {
  35. defer func() {
  36. <-SP
  37. }()
  38. MongoTool.SaveBulk(CollSave, arru...)
  39. }(arru[:indexu])
  40. arru = make([]map[string]interface{}, 200)
  41. indexu = 0
  42. }
  43. }
  44. }
  45. }
  46. //项目数据
  47. func GetProjectData(t string) {
  48. defer qu.Catch()
  49. count, taskcount := 0, 0
  50. sess := MongoTool.GetMgoConn()
  51. defer MongoTool.DestoryMongoConn(sess)
  52. dataPool := make(chan map[string]interface{}, 2000)
  53. over := make(chan bool)
  54. pool := make(chan bool, 4)
  55. go func() {
  56. L:
  57. for {
  58. select {
  59. case tmp := <-dataPool:
  60. pool <- true
  61. taskcount++
  62. go func(tmp map[string]interface{}) {
  63. defer func() {
  64. <-pool
  65. }()
  66. ForecastMethod(tmp)
  67. }(tmp)
  68. case <-over:
  69. break L
  70. }
  71. }
  72. }()
  73. //uptime, err := strconv.ParseInt(t, 10, 64)
  74. //if err == nil {
  75. // qu.Debug(err)
  76. //}
  77. query := bson.M{
  78. //"updatetime": bson.M{"$gt": uptime},
  79. "o_projectinfo.nature": bson.M{"$in": Nature},
  80. "spidercode": bson.M{"$in": SpiderCodes},
  81. "$or": []bson.M{
  82. {"category_buyer": bson.M{"$in": Category}},
  83. {"category_purpose": bson.M{"$in": Category}},
  84. },
  85. }
  86. //qu.Debug("query-----", CollPro, query["updatetime"])
  87. filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1}
  88. it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
  89. var lastid interface{}
  90. L:
  91. for {
  92. select {
  93. case <-queryClose:
  94. log.Println("receive interrupt sign")
  95. log.Println("close iter..", lastid, it.Cursor.Close(nil))
  96. queryCloseOver <- true
  97. break L
  98. default:
  99. tmp := make(map[string]interface{})
  100. if it.Next(&tmp) {
  101. lastid = tmp["_id"]
  102. if count%1000 == 0 {
  103. log.Println("current", count, lastid)
  104. }
  105. dataPool <- tmp
  106. count++
  107. } else {
  108. break L
  109. }
  110. }
  111. }
  112. time.Sleep(5 * time.Second)
  113. over <- true
  114. //阻塞
  115. for n := 0; n < 4; n++ {
  116. pool <- true
  117. }
  118. }
  119. func ForecastMethod(pro map[string]interface{}) {
  120. pro["infoid"] = mongodb.BsonIdToSId(pro["_id"])
  121. pro["yucetime"] = time.Now().Unix()
  122. pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
  123. delete(pro, "_id")
  124. delete(pro, "o_projectinfo")
  125. category := GetCategory(pro)
  126. stage := qu.ObjToString(pro["stage"])
  127. q := bson.M{
  128. "category": category,
  129. "stage": bson.M{"$in": Forecast[stage]},
  130. }
  131. var maps []map[string]interface{}
  132. ent, _ := MongoTool.FindOne(CollEnt, bson.M{"buyer_name": pro["buyer"]})
  133. if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
  134. arr := qu.ObjArrToStringArr((*ent)["buyerclass"].([]interface{}))
  135. if len(arr) == 1 {
  136. pro["buyerclass"] = arr
  137. }else {
  138. var arrTmp []string
  139. for _, v := range arr {
  140. if v != "其它" {
  141. arrTmp = append(arrTmp, v)
  142. }
  143. }
  144. pro["buyerclass"] = arrTmp
  145. }
  146. }
  147. result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1)
  148. for _, t := range *result {
  149. if len(t) == 0 {
  150. continue
  151. }
  152. tmp := make(map[string]interface{})
  153. tmp["stage"] = t["stage"]
  154. tmp["purchase_classify"] = t["purchase_classify"]
  155. tmp["purchasing"] = t["purchasing"]
  156. tmp["p_rate"] = Rate
  157. tmp["time"] = ""
  158. //tmp["p_projects"] = "" 暂无该字段
  159. maps = append(maps, tmp)
  160. }
  161. if len(maps) > 0 {
  162. pro["results"] = maps
  163. }
  164. MgoSaveCache <- pro
  165. }
  166. func GetCategory(tmp map[string]interface{}) string {
  167. categoryBuyerIndex := -1
  168. categoryPurposeIndex := -1
  169. for k, v := range Category {
  170. if tmp["category_buyer"] != nil {
  171. if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) {
  172. categoryBuyerIndex = k
  173. }
  174. }
  175. if tmp["category_purpose"] != nil {
  176. categoryPurposeIndex = k
  177. }
  178. }
  179. if categoryBuyerIndex >= categoryPurposeIndex {
  180. return qu.ObjToString(Category[categoryBuyerIndex])
  181. } else {
  182. return qu.ObjToString(Category[categoryPurposeIndex])
  183. }
  184. }