main.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "fmt"
  10. "github.com/robfig/cron"
  11. "github.com/spf13/cobra"
  12. "go.mongodb.org/mongo-driver/bson"
  13. "go.uber.org/zap"
  14. "net"
  15. "os"
  16. "proposed_project/config"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. UdpClient udp.UdpClient
  22. )
  23. func main() {
  24. rootCmd := &cobra.Command{Use: "my cmd"}
  25. rootCmd.AddCommand(tags())
  26. rootCmd.AddCommand(project())
  27. rootCmd.AddCommand(nzjData())
  28. rootCmd.AddCommand(projectAdd())
  29. rootCmd.AddCommand(tidbSave())
  30. rootCmd.AddCommand(tidbAddSave())
  31. rootCmd.AddCommand(redisSave())
  32. if err := rootCmd.Execute(); err != nil {
  33. fmt.Println("rootCmd.Execute failed", err.Error())
  34. }
  35. }
  36. func tags() *cobra.Command {
  37. cmdClient := &cobra.Command{
  38. Use: "tags",
  39. Short: "Start processing tags data",
  40. Run: func(cmd *cobra.Command, args []string) {
  41. InitRule()
  42. go UpdateMethod()
  43. taskRun()
  44. c := make(chan bool, 1)
  45. <-c
  46. },
  47. }
  48. return cmdClient
  49. }
  50. func project() *cobra.Command {
  51. cmdClient := &cobra.Command{
  52. Use: "project",
  53. Short: "Start processing project data",
  54. Run: func(cmd *cobra.Command, args []string) {
  55. loadProject()
  56. go updateAllQueue()
  57. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  58. UdpClient.Listen(processUdpMsg)
  59. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  60. c := make(chan bool, 1)
  61. <-c
  62. },
  63. }
  64. return cmdClient
  65. }
  66. func nzjData() *cobra.Command {
  67. cmdClient := &cobra.Command{
  68. Use: "nzj-data",
  69. Short: "Start processing project data",
  70. Run: func(cmd *cobra.Command, args []string) {
  71. if LastId == "" {
  72. _ = cmd.Help()
  73. os.Exit(0)
  74. }
  75. go saveMethod()
  76. sid, eid := taskQ()
  77. if sid != "" && eid != "" {
  78. doTask1(sid, eid)
  79. LastId = eid
  80. }
  81. crn := cron.New()
  82. cronstr := "0 */10 * * * ?" // 每10min执行一次
  83. _ = crn.AddFunc(cronstr, func() {
  84. if TaskSingle {
  85. sid, eid := taskQ()
  86. if sid != "" && eid != "" {
  87. TaskSingle = false
  88. doTask1(sid, eid)
  89. LastId = eid
  90. }
  91. } else {
  92. log.Info("上次任务未执行完成")
  93. }
  94. })
  95. crn.Start()
  96. c := make(chan bool, 1)
  97. <-c
  98. },
  99. }
  100. cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id")
  101. return cmdClient
  102. }
  103. func projectAdd() *cobra.Command {
  104. cmdClient := &cobra.Command{
  105. Use: "project-add",
  106. Short: "Start processing project data",
  107. Run: func(cmd *cobra.Command, args []string) {
  108. if LastId == "" {
  109. _ = cmd.Help()
  110. os.Exit(0)
  111. }
  112. InitRule()
  113. loadProject()
  114. go updateAllQueue()
  115. info, _ := MgoBid.Find("nzj_bidding", nil, `{"publishtime": -1}`, nil, true, -1, 1)
  116. doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"]))
  117. LastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  118. crn := cron.New()
  119. cronstr := "0 */30 * * * ?" // 每30min执行一次
  120. _ = crn.AddFunc(cronstr, func() {
  121. if TaskSingle {
  122. info, _ := MgoBid.Find("nzj_bidding", nil, `{"publishtime": -1}`, nil, true, -1, 1)
  123. TaskSingle = false
  124. doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"]))
  125. LastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  126. } else {
  127. log.Info("上次任务未执行完成")
  128. }
  129. })
  130. crn.Start()
  131. c := make(chan bool, 1)
  132. <-c
  133. },
  134. }
  135. cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id")
  136. return cmdClient
  137. }
  138. func tidbSave() *cobra.Command {
  139. cmdClient := &cobra.Command{
  140. Use: "tidb",
  141. Short: "Start processing project save to tidb",
  142. Run: func(cmd *cobra.Command, args []string) {
  143. InitMysql()
  144. InitArea()
  145. //go SaveFunc("dwd_f_nzj_baseinfo_new", BaseField)
  146. //go SaveRFunc("dwd_f_nzj_follw_record_new", RecordField)
  147. //go SaveCFunc("dwd_f_nzj_contact_new", ContactField)
  148. //go SaveCyFunc("dwd_f_nzj_category_tags_new", CategoryField)
  149. go SaveEntFunc("dwd_f_nzj_ent", EntField)
  150. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  151. taskTidb(nil)
  152. c := make(chan bool, 1)
  153. <-c
  154. },
  155. }
  156. return cmdClient
  157. }
  158. func tidbAddSave() *cobra.Command {
  159. cmdClient := &cobra.Command{
  160. Use: "tidb-add",
  161. Short: "Start processing project save to tidb",
  162. Run: func(cmd *cobra.Command, args []string) {
  163. InitMysql()
  164. InitArea()
  165. //go SaveFunc("dwd_f_nzj_baseinfo", BaseField)
  166. //go SaveRFunc("dwd_f_nzj_follw_record", RecordField)
  167. //go SaveCFunc("dwd_f_nzj_contact", ContactField)
  168. //go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField)
  169. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  170. //taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}})
  171. crn := cron.New()
  172. cronstr := "0 */30 * * * ?" // 每30min执行一次
  173. _ = crn.AddFunc(cronstr, func() {
  174. if TaskSingle {
  175. TaskSingle = false
  176. taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}})
  177. } else {
  178. log.Info("上次任务未执行完成")
  179. }
  180. })
  181. crn.Start()
  182. c := make(chan bool, 1)
  183. <-c
  184. },
  185. }
  186. cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time")
  187. return cmdClient
  188. }
  189. func redisSave() *cobra.Command {
  190. cmdClient := &cobra.Command{
  191. Use: "redis",
  192. Short: "Start processing project save to tidb",
  193. Run: func(cmd *cobra.Command, args []string) {
  194. InitMysql()
  195. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  196. redisDisp()
  197. },
  198. }
  199. return cmdClient
  200. }
  201. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  202. switch act {
  203. case udp.OP_TYPE_DATA:
  204. var mapInfo map[string]interface{}
  205. err := json.Unmarshal(data, &mapInfo)
  206. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  207. gtid, _ := mapInfo["gtid"].(string)
  208. lteid, _ := mapInfo["lteid"].(string)
  209. if err != nil || gtid == "" || lteid == "" {
  210. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra)
  211. } else {
  212. //udp成功回写
  213. if k := util.ObjToString(mapInfo["key"]); k != "" {
  214. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  215. } else {
  216. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  217. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  218. }
  219. log.Info("start merge ...")
  220. doTask(gtid, lteid)
  221. }
  222. case udp.OP_NOOP: //下个节点回应
  223. }
  224. }
  225. func taskQ() (string, string) {
  226. log.Info("taskQ", zap.String("lastid", LastId))
  227. query := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(LastId)}, "dataprocess": 8}
  228. info, _ := MgoBid.Find("bidding_processing_ids", query, `{"_id": -1}`, nil, false, -1, 1)
  229. if len(*info) > 0 {
  230. newid := util.ObjToString((*info)[0]["lteid"])
  231. log.Info("taskQ", zap.String("新lastid", newid))
  232. return LastId, newid
  233. } else {
  234. log.Info("taskQ 未发现新数据")
  235. return "", ""
  236. }
  237. }
  238. func redisDisp() {
  239. pool := make(chan bool, 5) //控制线程数
  240. wg := &sync.WaitGroup{}
  241. finalId := 0
  242. lastInfo := MysqlTool1.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
  243. if len(*lastInfo) > 0 {
  244. finalId = util.IntAll((*lastInfo)[0]["id"])
  245. }
  246. util.Debug("taskIterateSql---", "finally id", finalId)
  247. lastid, count := 0, 0
  248. for {
  249. util.Debug("重新查询,lastid---", lastid)
  250. q := fmt.Sprintf("SELECT id, name, name_id, area_code, city_code FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_ent_baseinfo", lastid)
  251. rows, err := MysqlTool1.DB.Query(q)
  252. if err != nil {
  253. util.Debug("taskIterateSql---", err)
  254. }
  255. columns, err := rows.Columns()
  256. if finalId == lastid {
  257. util.Debug("----finish----------", count)
  258. break
  259. }
  260. for rows.Next() {
  261. scanArgs := make([]interface{}, len(columns))
  262. values := make([]interface{}, len(columns))
  263. ret := make(map[string]interface{})
  264. for k := range values {
  265. scanArgs[k] = &values[k]
  266. }
  267. err = rows.Scan(scanArgs...)
  268. if err != nil {
  269. util.Debug("taskIterateSql---", err)
  270. break
  271. }
  272. for i, col := range values {
  273. if v, ok := col.([]uint8); ok {
  274. ret[columns[i]] = string(v)
  275. } else {
  276. ret[columns[i]] = col
  277. }
  278. }
  279. lastid = util.IntAll(ret["id"])
  280. count++
  281. if count%20000 == 0 {
  282. util.Debug("current-------", count, lastid)
  283. }
  284. pool <- true
  285. wg.Add(1)
  286. go func(tmp map[string]interface{}) {
  287. defer func() {
  288. <-pool
  289. wg.Done()
  290. }()
  291. redis.PutCKV("ent_id", util.ObjToString(tmp["name"]),
  292. fmt.Sprintf("%s_%s_%s", util.ObjToString(tmp["name_id"]), util.ObjToString(tmp["area_code"]), util.ObjToString(tmp["city_code"])))
  293. }(ret)
  294. ret = make(map[string]interface{})
  295. }
  296. _ = rows.Close()
  297. wg.Wait()
  298. }
  299. }
  300. func saveMethod() {
  301. arru := make([]map[string]interface{}, saveSize)
  302. indexu := 0
  303. for {
  304. select {
  305. case v := <-savePool:
  306. arru[indexu] = v
  307. indexu++
  308. if indexu == saveSize {
  309. saveSp <- true
  310. go func(arru []map[string]interface{}) {
  311. defer func() {
  312. <-saveSp
  313. }()
  314. MgoBid.SaveBulk("nzj_bidding", arru...)
  315. }(arru)
  316. arru = make([]map[string]interface{}, saveSize)
  317. indexu = 0
  318. }
  319. case <-time.After(1000 * time.Millisecond):
  320. if indexu > 0 {
  321. saveSp <- true
  322. go func(arru []map[string]interface{}) {
  323. defer func() {
  324. <-saveSp
  325. }()
  326. MgoBid.SaveBulk("nzj_bidding", arru...)
  327. }(arru[:indexu])
  328. arru = make([]map[string]interface{}, saveSize)
  329. indexu = 0
  330. }
  331. }
  332. }
  333. }