main.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package main
  2. import (
  3. log "github.com/donnie4w/go-logger/logger"
  4. "github.com/nats-io/nats.go"
  5. "go.mongodb.org/mongo-driver/bson"
  6. _ "jy/admin"
  7. _ "jy/admin/audit"
  8. _ "jy/admin/distribution"
  9. _ "jy/admin/task"
  10. "jy/extract"
  11. _ "jy/front"
  12. . "jy/router"
  13. u "jy/util"
  14. "jynats/jnats"
  15. "net/http"
  16. _ "net/http/pprof"
  17. qu "qfw/util"
  18. "sync"
  19. )
  20. func init() {
  21. log.SetConsole(false)
  22. log.SetLevel(log.DEBUG)
  23. log.SetRollingDaily("./", "out.log")
  24. qu.ReadConfig(&u.Config)
  25. qu.ReadConfig("./res/regions.json", &u.RegionsConfig)
  26. //抽取price和number相关
  27. qu.ReadConfig("./res/pricenumber.json", &u.PriceNumberConfig)
  28. //初始化util
  29. u.UtilInit()
  30. }
  31. // 流式...
  32. func mainT() {
  33. go RunFlowSystem()
  34. lock := make(chan bool)
  35. <-lock
  36. }
  37. func main() {
  38. extract.ExtractUdpUpdateMachine() //节点上传~构建
  39. extract.ExtractUdp() //udp通知抽取
  40. go Router.Run(":" + qu.ObjToString(u.Config["port"]))
  41. go log.Debug("启动..", qu.ObjToString(u.Config["port"]))
  42. go func() {
  43. http.ListenAndServe("localhost:10000", nil)
  44. }()
  45. lock := make(chan bool)
  46. <-lock
  47. }
  48. func RunFlowSystem() {
  49. addr := qu.ObjToString(u.Config["flowaddr"])
  50. jn := jnats.NewJnats(addr)
  51. extract.InitExtractFlowTask()
  52. wg_mgo := &sync.WaitGroup{}
  53. jn.SubZip("dataprocess.extract", func(msg *nats.Msg) {
  54. msgInfo := &u.MsgInfo{}
  55. err := bson.Unmarshal(msg.Data, &msgInfo)
  56. if err != nil {
  57. msgInfo.Err = err.Error()
  58. bs, _ := bson.Marshal(msgInfo)
  59. msg.Respond(bs)
  60. } else {
  61. extract.ExtFlow.TaskInfo.ProcessPool <- true
  62. wg_mgo.Add(1)
  63. go func(msgInfo *u.MsgInfo, msg *nats.Msg) {
  64. defer func() {
  65. <-extract.ExtFlow.TaskInfo.ProcessPool
  66. wg_mgo.Done()
  67. }()
  68. res := extract.ExtractByExtFlow(msgInfo.Data)
  69. msgInfo.Data["ext"] = res
  70. bs, _ := bson.Marshal(msgInfo)
  71. msg.Respond(bs)
  72. }(msgInfo, msg)
  73. }
  74. })
  75. }
  76. // 验证规则
  77. func testMain() {
  78. //http://extcity.spdata.jianyu360.com/service/entity/test?text=我是正文开滦(集团)有限责任公司
  79. con := `2134576`
  80. text := con[1:2]
  81. log.Debug(text)
  82. }