123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- package main
- import (
- "encoding/json"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- elastic "qfw/util/elastic"
- "qfw/util/mongodb"
- "strings"
- "time"
- u "util"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mgo *mongodb.MongodbSim //mongodb操作对象
- extractmgo *mongodb.MongodbSim //mongodb操作对象
- project2db *mongodb.MongodbSim //mongodb操作对象
- mgostandard *mongodb.MongodbSim //mongodb操作对象
- qyxydb *mongodb.MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- updport string
- savesizei = 500
- biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
- biddingIndexFieldsMap = map[string]string{}
- projectinfoFields []string
- projectinfoFieldsMap = map[string]string{}
- multiIndex []string
- purchasinglistFields []string
- winnerorderlistFields []string
- purchasinglistFieldsMap = map[string]string{}
- winnerorderlistFieldsMap = map[string]string{}
- BulkSize = 400
- detailLength = 50000
- fileLength = 50000
- //bidding_other连接信息
- bidding_other_es *elastic.Elastic
- other_index string
- other_itype string
- winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- inits()
- go checkMapJob()
- detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
- fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
- updport, _ = Sysconfig["updport"].(string)
- winner, _ = Sysconfig["winner"].(map[string]interface{})
- standard, _ = Sysconfig["standard"].(map[string]interface{})
- buyer, _ = Sysconfig["buyer"].(map[string]interface{})
- bidding, _ = Sysconfig["bidding"].(map[string]interface{})
- biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
- project, _ = Sysconfig["project"].(map[string]interface{})
- project2, _ = Sysconfig["project2"].(map[string]interface{})
- qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
- mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{ //mongodb为binding连接
- MongodbAddr: mconf["addr"].(string),
- Size: util.IntAllDef(mconf["pool"], 5),
- DbName: mconf["db"].(string),
- }
- mgo.InitPool()
- project2db = &mongodb.MongodbSim{
- MongodbAddr: project2["addr"].(string),
- Size: util.IntAllDef(project2["pool"], 5),
- DbName: project2["db"].(string),
- }
- project2db.InitPool()
- //企业信用
- qyxydb = &mongodb.MongodbSim{
- MongodbAddr: qyxy_ent["addr"].(string),
- Size: util.IntAllDef(qyxy_ent["pool"], 5),
- DbName: qyxy_ent["db"].(string),
- }
- qyxydb.InitPool()
- savedb, _ := Sysconfig["savedb"].(map[string]interface{})
- if savedb == nil {
- log.Println("未设置保存数据库,默认使用招标库")
- extractmgo = mgo
- } else { //savedb为抽取连接
- addr, _ := savedb["addr"].(string)
- size := util.IntAllDef(savedb["size"], 5)
- db, _ := savedb["db"].(string)
- extractmgo = &mongodb.MongodbSim{
- MongodbAddr: addr,
- Size: size,
- DbName: db,
- }
- extractmgo.InitPool()
- }
- mgostandard = &mongodb.MongodbSim{
- MongodbAddr: standard["addr"].(string),
- Size: util.IntAllDef(standard["pool"], 5),
- DbName: standard["db"].(string),
- }
- mgostandard.InitPool()
- //初始化es
- //bidding
- econf := Sysconfig["elastic"].(map[string]interface{})
- elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
- //bidding_other
- if Sysconfig["elastic_other"] != nil {
- econf_other := Sysconfig["elastic_other"].(map[string]interface{})
- other_index = econf_other["index"].(string)
- other_itype = econf_other["type"].(string)
- bidding_other_es = &elastic.Elastic{
- S_esurl: econf_other["addr"].(string),
- I_size: util.IntAllDef(econf_other["pool"], 5),
- }
- bidding_other_es.InitElasticSize()
- }
- //
- if bidding["indexfields"] != nil {
- biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
- }
- if bidding["projectinfo"] != nil {
- pf := util.ObjToString(bidding["projectinfo"])
- if pf != "" {
- projectinfoFields = strings.Split(pf, ",")
- }
- }
- if bidding["purchasinglist"] != nil {
- pcl := util.ObjToString(bidding["purchasinglist"])
- if pcl != "" {
- purchasinglistFields = strings.Split(pcl, ",")
- }
- }
- if bidding["winnerorder"] != nil {
- winnerorder := util.ObjToString(bidding["winnerorder"])
- if winnerorder != "" {
- winnerorderlistFields = strings.Split(winnerorder, ",")
- }
- }
- if bidding["multiIndex"] != nil {
- mi := util.ObjToString(bidding["multiIndex"])
- if mi != "" {
- multiIndex = strings.Split(mi, ",")
- }
- }
- //
- if bidding["indexfieldsmap"] != nil {
- for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
- biddingIndexFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(biddingIndexFieldsMap)
- }
- if bidding["projectinfomap"] != nil {
- for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
- projectinfoFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(projectinfoFieldsMap)
- }
- if bidding["purchasinglistmap"] != nil {
- for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
- purchasinglistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(purchasinglistFieldsMap)
- }
- if bidding["winnerordermap"] != nil {
- for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
- winnerorderlistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(winnerorderlistFieldsMap)
- }
- log.Println(projectinfoFields)
- log.Println(purchasinglistFields)
- //初始化oss
- u.InitOss()
- }
- func main() {
- go task_index()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- // time.Sleep(99999 * time.Hour)
- ch := make(chan bool, 1)
- <-ch
- }
- var pool = make(chan bool, 20)
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo, string(data))
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- tasktype, _ := mapInfo["stype"].(string)
- log.Println("tasktype:", tasktype)
- switch tasktype {
- case "winner":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- winnerTask(data, mapInfo)
- }()
- case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingTask(data, mapInfo)
- }()
- case "project":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- projectTask(data, project, mapInfo)
- }()
- case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingBackTask(data, mapInfo)
- }()
- case "biddingall": //合并并重新生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingAllTask(data, mapInfo)
- }()
- case "biddingdata": //联表生成索引不合并,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDataTask(data, mapInfo)
- }()
- case "biddingmerge": //重新合并但不生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingMergeTask(data, mapInfo)
- }()
- case "buyer":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- buyerTask(data, mapInfo)
- }()
- case "winnerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("winnerent", mapInfo)
- }()
- case "buyerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("buyerent", mapInfo)
- }()
- case "agencyent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("agencyent", mapInfo)
- }()
- case "biddingdelbyextract": //根据repeat删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtract(data, mapInfo)
- }()
- case "biddingdelbyextracttype": //根据extracttype删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtracttype(data, mapInfo)
- }()
- default:
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- defaultFunc(data, mapInfo)
- }()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("发送成功", string(data))
- }
- }
|