task.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "qfw/util"
  9. "regexp"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. var MgoSaveCache = make(chan map[string]interface{}, 5000)
  15. var SP = make(chan bool, 5)
  16. var updatePool = make(chan []map[string]interface{}, 5000)
  17. var updateSp = make(chan bool, 5)
  18. func TimeTask() {
  19. c := cron.New()
  20. //cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?" //每天TaskTime跑一次
  21. cronstrPa := "0 0 15 ? * WED" //凭安增量数据每周二跑一次
  22. _ = c.AddFunc(cronstrPa, func() { GetData() })
  23. c.Start()
  24. }
  25. func GetData() {
  26. defer util.Catch()
  27. sess := Mgo.GetMgoConn()
  28. defer Mgo.DestoryMongoConn(sess)
  29. pool := make(chan bool, 10)
  30. wg := &sync.WaitGroup{}
  31. q := bson.M{"_id": bson.M{"$gt": lastId}}
  32. //q := bson.M{"_id": 368454367}
  33. var zid int
  34. it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
  35. count := 0
  36. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  37. if count%20000 == 0 {
  38. log.Println("current:", count)
  39. }
  40. zid = util.IntAll(tmp["_id"])
  41. pool <- true
  42. wg.Add(1)
  43. go func(tmp map[string]interface{}) {
  44. defer func() {
  45. <-pool
  46. wg.Done()
  47. }()
  48. currentTime := time.Now().Unix()
  49. query := bson.M{"company_id": tmp["company_id"]}
  50. info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
  51. if b && len(*info) > 0 {
  52. update := make(map[string]interface{})
  53. item := make(map[string]interface{})
  54. item["change_field"] = tmp["change_field"]
  55. item["content_before"] = tmp["content_before"]
  56. item["content_after"] = tmp["content_after"]
  57. item["change_date"] = tmp["change_date"]
  58. setMark(item) //change_name_new
  59. //update["changes"] = changes
  60. update["update_time"] = currentTime
  61. saveInfo := []map[string]interface{}{
  62. {"_id": (*info)["_id"]},
  63. {"$set": update, "$push": map[string]interface{}{"changes": item}},
  64. }
  65. updatePool <- saveInfo
  66. } else {
  67. query := bson.M{"_id": tmp["company_id"]}
  68. qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
  69. if b1 && len(*qyxy) > 0 {
  70. save := make(map[string]interface{})
  71. var changes []map[string]interface{}
  72. item := make(map[string]interface{})
  73. item["change_field"] = tmp["change_field"]
  74. item["content_before"] = tmp["content_before"]
  75. item["content_after"] = tmp["content_after"]
  76. item["change_date"] = tmp["change_date"]
  77. setMark(item) //change_name_new
  78. changes = append(changes, item)
  79. save["company_name"] = (*qyxy)["company_name"]
  80. save["company_id"] = (*qyxy)["_id"]
  81. save["changes"] = changes
  82. save["create_time"] = currentTime
  83. save["update_time"] = currentTime
  84. save["_id"] = primitive.NewObjectID()
  85. saveInfo := []map[string]interface{}{
  86. {"_id": save["_id"]},
  87. {"$set": save},
  88. }
  89. updatePool <- saveInfo
  90. }
  91. }
  92. }(tmp)
  93. }
  94. util.Debug("over---", count, zid)
  95. }
  96. func setMark(tmp map[string]interface{}) {
  97. for _, v := range ChangeMap {
  98. str := util.ObjToString(tmp["change_field"])
  99. regArr := v["change_key_reg"].([]string)
  100. for _, v1 := range regArr {
  101. matched, _ := regexp.MatchString(v1, str)
  102. if matched {
  103. tmp["change_name_new"] = v["change_name"]
  104. return
  105. }
  106. }
  107. }
  108. }
  109. func clearRepeat(list []interface{}) []interface{} {
  110. var tmp []interface{}
  111. if len(list) > 1 {
  112. for k, v := range list {
  113. if k < len(list)-1 {
  114. if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) {
  115. tmp = append(tmp, v)
  116. }
  117. } else {
  118. tmp = append(tmp, v)
  119. }
  120. }
  121. return tmp
  122. } else {
  123. return list
  124. }
  125. }
  126. // TaskAll 存量数据
  127. func TaskAll() {
  128. defer util.Catch()
  129. sess := MgoMix.GetMgoConn()
  130. defer MgoMix.DestoryMongoConn(sess)
  131. pool := make(chan bool, 10)
  132. wg := &sync.WaitGroup{}
  133. field := bson.M{"company_name": 1}
  134. it := sess.DB("mixdata").C(CollQy).Find(nil).Select(field).Iter()
  135. count := 0
  136. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  137. if count%20000 == 0 {
  138. log.Println("current:", count, tmp["_id"])
  139. }
  140. if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
  141. continue
  142. }
  143. pool <- true
  144. wg.Add(1)
  145. go func(tmp map[string]interface{}) {
  146. defer func() {
  147. <-pool
  148. wg.Done()
  149. }()
  150. tmp["company_id"] = tmp["_id"]
  151. delete(tmp, "_id")
  152. query := bson.M{"company_id": tmp["company_id"]}
  153. info, b := Mgo.Find("company_change", query, nil, nil, false, -1, -1)
  154. if b && len(*info) > 0 {
  155. disposeFuc(*info, tmp)
  156. }
  157. }(tmp)
  158. }
  159. }
  160. func disposeFuc(maps []map[string]interface{}, tmp map[string]interface{}) {
  161. var changes []map[string]interface{}
  162. currentTime := time.Now().Unix()
  163. for _, v := range maps {
  164. item := make(map[string]interface{})
  165. item["change_field"] = v["change_field"]
  166. item["content_before"] = v["content_before"]
  167. item["content_after"] = v["content_after"]
  168. item["change_date"] = v["change_date"]
  169. setMark(item) //change_name_new
  170. changes = append(changes, item)
  171. }
  172. tmp["changes"] = changes
  173. tmp["create_time"] = currentTime
  174. tmp["update_time"] = currentTime
  175. MgoSaveCache <- tmp
  176. }
  177. func SaveData() {
  178. log.Println("Mgo Save...")
  179. arru := make([]map[string]interface{}, 200)
  180. indexu := 0
  181. for {
  182. select {
  183. case v := <-MgoSaveCache:
  184. arru[indexu] = v
  185. indexu++
  186. if indexu == 200 {
  187. SP <- true
  188. go func(arru []map[string]interface{}) {
  189. defer func() {
  190. <-SP
  191. }()
  192. MgoMix.SaveBulk(CollSave, arru...)
  193. }(arru)
  194. arru = make([]map[string]interface{}, 200)
  195. indexu = 0
  196. }
  197. case <-time.After(1000 * time.Millisecond):
  198. if indexu > 0 {
  199. SP <- true
  200. go func(arru []map[string]interface{}) {
  201. defer func() {
  202. <-SP
  203. }()
  204. MgoMix.SaveBulk(CollSave, arru...)
  205. }(arru[:indexu])
  206. arru = make([]map[string]interface{}, 200)
  207. indexu = 0
  208. }
  209. }
  210. }
  211. }
  212. func updateMethod() {
  213. arru := make([][]map[string]interface{}, 200)
  214. indexu := 0
  215. for {
  216. select {
  217. case v := <-updatePool:
  218. arru[indexu] = v
  219. indexu++
  220. if indexu == 200 {
  221. updateSp <- true
  222. go func(arru [][]map[string]interface{}) {
  223. defer func() {
  224. <-updateSp
  225. }()
  226. MgoMix.UpSertBulk(CollSave, arru...)
  227. }(arru)
  228. arru = make([][]map[string]interface{}, 200)
  229. indexu = 0
  230. }
  231. case <-time.After(1000 * time.Millisecond):
  232. if indexu > 0 {
  233. updateSp <- true
  234. go func(arru [][]map[string]interface{}) {
  235. defer func() {
  236. <-updateSp
  237. }()
  238. MgoMix.UpSertBulk(CollSave, arru...)
  239. }(arru[:indexu])
  240. arru = make([][]map[string]interface{}, 200)
  241. indexu = 0
  242. }
  243. }
  244. }
  245. }