package main import ( log "github.com/donnie4w/go-logger/logger" "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson" _ "jy/admin" _ "jy/admin/audit" _ "jy/admin/distribution" _ "jy/admin/task" "jy/extract" _ "jy/front" . "jy/router" u "jy/util" "jynats/jnats" "net/http" _ "net/http/pprof" qu "qfw/util" "sync" ) func init() { log.SetConsole(false) log.SetLevel(log.DEBUG) log.SetRollingDaily("./", "out.log") qu.ReadConfig(&u.Config) qu.ReadConfig("./res/regions.json", &u.RegionsConfig) //抽取price和number相关 qu.ReadConfig("./res/pricenumber.json", &u.PriceNumberConfig) //初始化util u.UtilInit() } // 流式... func mainT() { go RunFlowSystem() lock := make(chan bool) <-lock } func main() { extract.ExtractUdpUpdateMachine() //节点上传~构建 extract.ExtractUdp() //udp通知抽取 go Router.Run(":" + qu.ObjToString(u.Config["port"])) go log.Debug("启动..", qu.ObjToString(u.Config["port"])) go func() { http.ListenAndServe("localhost:10000", nil) }() lock := make(chan bool) <-lock } func RunFlowSystem() { addr := qu.ObjToString(u.Config["flowaddr"]) jn := jnats.NewJnats(addr) extract.InitExtractFlowTask() wg_mgo := &sync.WaitGroup{} jn.SubZip("dataprocess.extract", func(msg *nats.Msg) { msgInfo := &u.MsgInfo{} err := bson.Unmarshal(msg.Data, &msgInfo) if err != nil { msgInfo.Err = err.Error() bs, _ := bson.Marshal(msgInfo) msg.Respond(bs) } else { extract.ExtFlow.TaskInfo.ProcessPool <- true wg_mgo.Add(1) go func(msgInfo *u.MsgInfo, msg *nats.Msg) { defer func() { <-extract.ExtFlow.TaskInfo.ProcessPool wg_mgo.Done() }() res := extract.ExtractByExtFlow(msgInfo.Data) msgInfo.Data["ext"] = res bs, _ := bson.Marshal(msgInfo) msg.Respond(bs) }(msgInfo, msg) } }) } // 验证规则 func testMain() { //http://extcity.spdata.jianyu360.com/service/entity/test?text=我是正文开滦(集团)有限责任公司 con := `2134576` text := con[1:2] log.Debug(text) }