task.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package main
  2. import (
  3. "fmt"
  4. "go.mongodb.org/mongo-driver/bson"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "log"
  7. "regexp"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var MgoSaveCache = make(chan map[string]interface{}, 5000)
  13. var SP = make(chan bool, 5)
  14. var updatePool = make(chan []map[string]interface{}, 5000)
  15. var updateSp = make(chan bool, 5)
  16. //GetData 增量数据
  17. func GetData() {
  18. defer util.Catch()
  19. sess := Mgo.GetMgoConn()
  20. defer Mgo.DestoryMongoConn(sess)
  21. pool := make(chan bool, 10)
  22. wg := &sync.WaitGroup{}
  23. q := bson.M{"_id": bson.M{"$gt": lastId}}
  24. //q := bson.M{"_id": 368454367}
  25. var zid int
  26. it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
  27. count := 0
  28. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  29. if count%20000 == 0 {
  30. log.Println("current:", count)
  31. }
  32. zid = util.IntAll(tmp["_id"])
  33. pool <- true
  34. wg.Add(1)
  35. fmt.Println("zid =>", zid)
  36. if util.Int64All(zid) > lastId {
  37. lastId = util.Int64All(zid)
  38. }
  39. go func(tmp map[string]interface{}) {
  40. defer func() {
  41. <-pool
  42. wg.Done()
  43. }()
  44. currentTime := time.Now().Unix()
  45. query := bson.M{"company_id": tmp["company_id"]}
  46. info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
  47. if b && len(*info) > 0 {
  48. if util.ObjToString(tmp["_operation_type"]) == "insert" {
  49. update := make(map[string]interface{})
  50. item := make(map[string]interface{})
  51. item["change_field"] = tmp["change_field"]
  52. item["content_before"] = tmp["content_before"]
  53. item["content_after"] = tmp["content_after"]
  54. item["change_date"] = tmp["change_date"]
  55. setMark(item) //change_name_new
  56. //update["changes"] = changes
  57. update["update_time"] = currentTime
  58. saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
  59. //updatePool <- saveInfo
  60. MgoMix.UpdateById(CollSave, tmp["company_id"], saveInfo)
  61. }
  62. } else {
  63. query := bson.M{"_id": tmp["company_id"]}
  64. qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
  65. if b1 && len(*qyxy) > 0 {
  66. save := make(map[string]interface{})
  67. var changes []map[string]interface{}
  68. item := make(map[string]interface{})
  69. item["change_field"] = tmp["change_field"]
  70. item["content_before"] = tmp["content_before"]
  71. item["content_after"] = tmp["content_after"]
  72. item["change_date"] = tmp["change_date"]
  73. setMark(item) //change_name_new
  74. changes = append(changes, item)
  75. save["company_name"] = (*qyxy)["company_name"]
  76. save["company_id"] = (*qyxy)["_id"]
  77. save["changes"] = changes
  78. save["create_time"] = currentTime
  79. save["update_time"] = currentTime
  80. //save["_id"] = primitive.NewObjectID()
  81. saveInfo := map[string]interface{}{"$set": save}
  82. //updatePool <- saveInfo
  83. MgoMix.Save(CollSave, saveInfo)
  84. }
  85. }
  86. }(tmp)
  87. }
  88. util.Debug("over---", count, zid)
  89. }
  90. //IncData 增量处理数据
  91. func IncData() {
  92. defer util.Catch()
  93. sess := Mgo.GetMgoConn()
  94. defer Mgo.DestoryMongoConn(sess)
  95. q := bson.M{"_id": bson.M{"$gt": lastId}}
  96. //q := bson.M{"_id": 368454367}
  97. var zid int
  98. it := sess.DB("mixdata").C("company_change").Find(q).Select(nil).Iter()
  99. count := 0
  100. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  101. if count%2000 == 0 {
  102. log.Println("current:", count)
  103. }
  104. zid = util.IntAll(tmp["_id"])
  105. currentTime := time.Now().Unix()
  106. query := bson.M{"company_id": tmp["company_id"]}
  107. info, b := MgoMix.FindOneByField(CollSave, query, bson.M{"changes": 1})
  108. //原来数据有changes 字段就更新,追加数据
  109. if b && len(*info) > 0 {
  110. if util.ObjToString(tmp["_operation_type"]) == "insert" {
  111. update := make(map[string]interface{})
  112. item := make(map[string]interface{})
  113. item["change_field"] = tmp["change_field"]
  114. item["content_before"] = tmp["content_before"]
  115. item["content_after"] = tmp["content_after"]
  116. item["change_date"] = tmp["change_date"]
  117. setMark(item) //change_name_new
  118. //update["changes"] = changes
  119. update["update_time"] = currentTime
  120. saveInfo := map[string]interface{}{"$set": update, "$push": map[string]interface{}{"changes": item}}
  121. MgoMix.UpdateById(CollSave, tmp["company_id"], saveInfo)
  122. }
  123. } else {
  124. //没有的直接写入
  125. query := bson.M{"_id": tmp["company_id"]}
  126. qyxy, b1 := MgoMix.FindOne("qyxy_std", query)
  127. if b1 && len(*qyxy) > 0 {
  128. save := make(map[string]interface{})
  129. var changes []map[string]interface{}
  130. item := make(map[string]interface{})
  131. item["change_field"] = tmp["change_field"]
  132. item["content_before"] = tmp["content_before"]
  133. item["content_after"] = tmp["content_after"]
  134. item["change_date"] = tmp["change_date"]
  135. setMark(item) //change_name_new
  136. changes = append(changes, item)
  137. save["company_name"] = (*qyxy)["company_name"]
  138. save["company_id"] = (*qyxy)["_id"]
  139. save["changes"] = changes
  140. save["create_time"] = currentTime
  141. save["update_time"] = currentTime
  142. saveInfo := map[string]interface{}{"$set": save}
  143. MgoMix.Save(CollSave, saveInfo)
  144. }
  145. }
  146. }
  147. util.Debug("over---", count, zid)
  148. }
  149. func setMark(tmp map[string]interface{}) {
  150. for _, v := range ChangeMap {
  151. str := util.ObjToString(tmp["change_field"])
  152. regArr := v["change_key_reg"].([]string)
  153. for _, v1 := range regArr {
  154. matched, _ := regexp.MatchString(v1, str)
  155. if matched {
  156. tmp["change_name_new"] = v["change_name"]
  157. return
  158. }
  159. }
  160. }
  161. }
  162. func clearRepeat(list []interface{}) []interface{} {
  163. var tmp []interface{}
  164. if len(list) > 1 {
  165. for k, v := range list {
  166. if k < len(list)-1 {
  167. if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) {
  168. tmp = append(tmp, v)
  169. }
  170. } else {
  171. tmp = append(tmp, v)
  172. }
  173. }
  174. return tmp
  175. } else {
  176. return list
  177. }
  178. }
  179. // TaskAll 存量数据
  180. func TaskAll() {
  181. defer util.Catch()
  182. sess := MgoMix.GetMgoConn()
  183. defer MgoMix.DestoryMongoConn(sess)
  184. pool := make(chan bool, 10)
  185. wg := &sync.WaitGroup{}
  186. field := bson.M{"company_name": 1}
  187. it := sess.DB("mixdata").C(CollQy).Find(nil).Select(field).Iter()
  188. count := 0
  189. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  190. if count%20000 == 0 {
  191. log.Println("current:", count, tmp["_id"])
  192. }
  193. if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
  194. continue
  195. }
  196. pool <- true
  197. wg.Add(1)
  198. go func(tmp map[string]interface{}) {
  199. defer func() {
  200. <-pool
  201. wg.Done()
  202. }()
  203. tmp["company_id"] = tmp["_id"]
  204. delete(tmp, "_id")
  205. query := bson.M{"company_id": tmp["company_id"]}
  206. info, b := Mgo.Find("company_change", query, nil, nil, false, -1, -1)
  207. if b && len(*info) > 0 {
  208. disposeFuc(*info, tmp)
  209. }
  210. }(tmp)
  211. }
  212. }
  213. func taskInfo(id string) {
  214. update := make(map[string]interface{})
  215. q := map[string]interface{}{"company_id": id}
  216. info, _ := Mgo.Find("company_change", q, nil, nil, false, -1, -1)
  217. if len(*info) > 0 {
  218. var changes []map[string]interface{}
  219. currentTime := time.Now().Unix()
  220. for _, v := range *info {
  221. item := make(map[string]interface{})
  222. item["change_field"] = v["change_field"]
  223. item["content_before"] = v["content_before"]
  224. item["content_after"] = v["content_after"]
  225. item["change_date"] = v["change_date"]
  226. setMark(item) //change_name_new
  227. changes = append(changes, item)
  228. }
  229. update["changes"] = changes
  230. update["update_time"] = currentTime
  231. }
  232. util.Debug(update)
  233. MgoMix.Update("qyxy_change", q, map[string]interface{}{"$set": update}, false, false)
  234. }
  235. func disposeFuc(maps []map[string]interface{}, tmp map[string]interface{}) {
  236. var changes []map[string]interface{}
  237. currentTime := time.Now().Unix()
  238. for _, v := range maps {
  239. item := make(map[string]interface{})
  240. item["change_field"] = v["change_field"]
  241. item["content_before"] = v["content_before"]
  242. item["content_after"] = v["content_after"]
  243. item["change_date"] = v["change_date"]
  244. setMark(item) //change_name_new
  245. changes = append(changes, item)
  246. }
  247. tmp["changes"] = changes
  248. tmp["create_time"] = currentTime
  249. tmp["update_time"] = currentTime
  250. MgoSaveCache <- tmp
  251. }
  252. func SaveData() {
  253. log.Println("Mgo Save...")
  254. arru := make([]map[string]interface{}, 200)
  255. indexu := 0
  256. for {
  257. select {
  258. case v := <-MgoSaveCache:
  259. arru[indexu] = v
  260. indexu++
  261. if indexu == 200 {
  262. SP <- true
  263. go func(arru []map[string]interface{}) {
  264. defer func() {
  265. <-SP
  266. }()
  267. MgoMix.SaveBulk(CollSave, arru...)
  268. }(arru)
  269. arru = make([]map[string]interface{}, 200)
  270. indexu = 0
  271. }
  272. case <-time.After(1000 * time.Millisecond):
  273. if indexu > 0 {
  274. SP <- true
  275. go func(arru []map[string]interface{}) {
  276. defer func() {
  277. <-SP
  278. }()
  279. MgoMix.SaveBulk(CollSave, arru...)
  280. }(arru[:indexu])
  281. arru = make([]map[string]interface{}, 200)
  282. indexu = 0
  283. }
  284. }
  285. }
  286. }
  287. func updateMethod() {
  288. arru := make([][]map[string]interface{}, 200)
  289. indexu := 0
  290. for {
  291. select {
  292. case v := <-updatePool:
  293. arru[indexu] = v
  294. indexu++
  295. if indexu == 200 {
  296. updateSp <- true
  297. go func(arru [][]map[string]interface{}) {
  298. defer func() {
  299. <-updateSp
  300. }()
  301. MgoMix.UpSertBulk(CollSave, arru...)
  302. }(arru)
  303. arru = make([][]map[string]interface{}, 200)
  304. indexu = 0
  305. }
  306. case <-time.After(1000 * time.Millisecond):
  307. if indexu > 0 {
  308. updateSp <- true
  309. go func(arru [][]map[string]interface{}) {
  310. defer func() {
  311. <-updateSp
  312. }()
  313. MgoMix.UpSertBulk(CollSave, arru...)
  314. }(arru[:indexu])
  315. arru = make([][]map[string]interface{}, 200)
  316. indexu = 0
  317. }
  318. }
  319. }
  320. }