test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package main
  2. import (
  3. "fmt"
  4. "go.mongodb.org/mongo-driver/bson"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  10. "sync"
  11. "time"
  12. )
  13. func taskMysql() {
  14. pool := make(chan bool, 5) //控制线程数
  15. wg := &sync.WaitGroup{}
  16. finalId := 0
  17. lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
  18. //lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
  19. if len(*lastInfo) > 0 {
  20. finalId = util.IntAll((*lastInfo)[0]["id"])
  21. }
  22. log.Info("查询最后id---", zap.Int("finally id: ", finalId))
  23. lastid, count := 0, 0
  24. for {
  25. log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
  26. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_bpmc_relation", lastid)
  27. //q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
  28. //q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
  29. rows, err := MysqlTool.DB.Query(q)
  30. if err != nil {
  31. log.Error("mysql query err ", zap.Error(err))
  32. }
  33. columns, err := rows.Columns()
  34. if finalId == lastid {
  35. log.Info("----finish-----", zap.Int("count: ", count))
  36. break
  37. }
  38. for rows.Next() {
  39. scanArgs := make([]interface{}, len(columns))
  40. values := make([]interface{}, len(columns))
  41. ret := make(map[string]interface{})
  42. for k := range values {
  43. scanArgs[k] = &values[k]
  44. }
  45. err = rows.Scan(scanArgs...)
  46. if err != nil {
  47. log.Error("mysql scan err ", zap.Error(err))
  48. break
  49. }
  50. for i, col := range values {
  51. if col == nil {
  52. ret[columns[i]] = nil
  53. } else {
  54. switch val := (*scanArgs[i].(*interface{})).(type) {
  55. case byte:
  56. ret[columns[i]] = val
  57. break
  58. case []byte:
  59. v := string(val)
  60. switch v {
  61. case "\x00": // 处理数据类型为bit的情况
  62. ret[columns[i]] = 0
  63. case "\x01": // 处理数据类型为bit的情况
  64. ret[columns[i]] = 1
  65. default:
  66. ret[columns[i]] = v
  67. break
  68. }
  69. break
  70. case time.Time:
  71. if val.IsZero() {
  72. ret[columns[i]] = nil
  73. } else {
  74. ret[columns[i]] = val.Format("2006-01-02 15:04:05")
  75. }
  76. break
  77. default:
  78. ret[columns[i]] = val
  79. }
  80. }
  81. }
  82. lastid = util.IntAll(ret["id"])
  83. count++
  84. if count%20000 == 0 {
  85. log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
  86. }
  87. pool <- true
  88. wg.Add(1)
  89. go func(tmp map[string]interface{}) {
  90. defer func() {
  91. <-pool
  92. wg.Done()
  93. }()
  94. //cid := util.Int64All(tmp["id"])
  95. //iid := util.ObjToString(tmp["infoid"])
  96. //name_id := util.ObjToString(tmp["name_id"])
  97. //identity_type := util.Int64All(tmp["identity_type+0"])
  98. //if name_id != "" {
  99. // coll := "bidding"
  100. // if iid > "5a862e7040d2d9bbe88e3b1f" {
  101. // coll = "bidding"
  102. // } else {
  103. // coll = "bidding_back"
  104. // }
  105. // info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
  106. // if len(*info) > 0 {
  107. // if identity_type == 1 {
  108. // if util.ObjToString((*info)["buyertel"]) != "" {
  109. // q := make(map[string]interface{})
  110. // q["name_id"] = name_id
  111. // q["identity_type"] = identity_type
  112. // q["contact_tel"] = util.ObjToString((*info)["buyertel"])
  113. // if util.ObjToString((*info)["buyerperson"]) != "" {
  114. // q["contact_name"] = util.ObjToString((*info)["buyerperson"])
  115. // }
  116. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  117. // if cinfo != nil && len(*cinfo) > 0 {
  118. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  119. // }
  120. // }
  121. // } else if identity_type == 2 {
  122. // if util.ObjToString((*info)["winnertel"]) != "" {
  123. // q := make(map[string]interface{})
  124. // q["name_id"] = name_id
  125. // q["identity_type"] = identity_type
  126. // q["contact_tel"] = util.ObjToString((*info)["winnertel"])
  127. // if util.ObjToString((*info)["winnerperson"]) != "" {
  128. // q["contact_name"] = util.ObjToString((*info)["winnerperson"])
  129. // }
  130. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  131. // if cinfo != nil && len(*cinfo) > 0 {
  132. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  133. // }
  134. // }
  135. // } else if identity_type == 4 {
  136. // if util.ObjToString((*info)["agencytel"]) != "" {
  137. // q := make(map[string]interface{})
  138. // q["name_id"] = name_id
  139. // q["identity_type"] = identity_type
  140. // q["contact_tel"] = util.ObjToString((*info)["agencytel"])
  141. // if util.ObjToString((*info)["agencyperson"]) != "" {
  142. // q["contact_name"] = util.ObjToString((*info)["agencyperson"])
  143. // }
  144. // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
  145. // if cinfo != nil && len(*cinfo) > 0 {
  146. // MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
  147. // }
  148. // }
  149. // }
  150. // }
  151. //}
  152. //redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
  153. saveM := make(map[string]interface{})
  154. if util.ObjToString(tmp["name_id"]) != "" {
  155. saveM["name_id"] = util.ObjToString(tmp["name_id"])
  156. } else {
  157. return
  158. }
  159. if util.ObjToString(tmp["contact_id"]) != "" {
  160. saveM["contact_id"] = util.IntAll(tmp["contact_id"])
  161. } else {
  162. return
  163. }
  164. saveM["projectid"] = util.ObjToString(tmp["projectid"])
  165. saveM["infoid"] = util.ObjToString(tmp["infoid"])
  166. saveM["identity_type"] = tmp["identity_type"]
  167. saveRelationPool <- saveM
  168. }(ret)
  169. ret = make(map[string]interface{})
  170. }
  171. _ = rows.Close()
  172. wg.Wait()
  173. }
  174. }
  175. func taskMgo() {
  176. sess := MongoP.GetMgoConn()
  177. defer MongoP.DestoryMongoConn(sess)
  178. ch := make(chan bool, 3)
  179. wg := &sync.WaitGroup{}
  180. q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
  181. field := map[string]interface{}{"ids": 1}
  182. query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
  183. count := 0
  184. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  185. if count%20000 == 0 {
  186. util.Debug("current ---", count, tmp["_id"])
  187. }
  188. ch <- true
  189. wg.Add(1)
  190. go func(tmp map[string]interface{}) {
  191. defer func() {
  192. <-ch
  193. wg.Done()
  194. }()
  195. id := mongodb.BsonIdToSId(tmp["_id"])
  196. for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
  197. redis.PutCKV("s_id", i, id)
  198. }
  199. }(tmp)
  200. tmp = make(map[string]interface{})
  201. }
  202. wg.Wait()
  203. util.Debug("over ---", count)
  204. }