package main import ( "encoding/json" "log" mu "mfw/util" "net" "qfw/util" elastic "qfw/util/elastic" "qfw/util/mongodb" "strings" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mgo *mongodb.MongodbSim //mongodb操作对象 extractmgo *mongodb.MongodbSim //mongodb操作对象 mgostandard *mongodb.MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 updport string savesizei = 500 biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"} projectinfoFields []string multiIndex []string BulkSize = 400 winner, bidding, biddingback, project, buyer, standard map[string]interface{} ) func init() { util.ReadConfig(&Sysconfig) inits() go checkMapJob() updport, _ = Sysconfig["updport"].(string) winner, _ = Sysconfig["winner"].(map[string]interface{}) standard, _ = Sysconfig["standard"].(map[string]interface{}) buyer, _ = Sysconfig["buyer"].(map[string]interface{}) bidding, _ = Sysconfig["bidding"].(map[string]interface{}) biddingback, _ = Sysconfig["biddingback"].(map[string]interface{}) project, _ = Sysconfig["project"].(map[string]interface{}) mconf, _ := Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), Size: util.IntAllDef(mconf["pool"], 5), DbName: mconf["db"].(string), } mgo.InitPool() savedb, _ := Sysconfig["savedb"].(map[string]interface{}) if savedb == nil { log.Println("未设置保存数据库,默认使用招标库") extractmgo = mgo } else { addr, _ := savedb["addr"].(string) size := util.IntAllDef(savedb["size"], 5) db, _ := savedb["db"].(string) extractmgo = &mongodb.MongodbSim{ MongodbAddr: addr, Size: size, DbName: db, } extractmgo.InitPool() } mgostandard = &mongodb.MongodbSim{ MongodbAddr: standard["addr"].(string), Size: util.IntAllDef(standard["pool"], 5), DbName: standard["db"].(string), } mgostandard.InitPool() log.Println(standard["addr"].(string)) econf := Sysconfig["elastic"].(map[string]interface{}) elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5)) if bidding["indexfields"] != nil { biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{})) } if bidding["projectinfo"] != nil { pf := util.ObjToString(bidding["projectinfo"]) if pf != "" { projectinfoFields = strings.Split(pf, ",") } } if bidding["multiIndex"] != nil { mi := util.ObjToString(bidding["multiIndex"]) if mi != "" { multiIndex = strings.Split(mi, ",") } } log.Println(projectinfoFields) } func main() { updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "winner": pool <- true go func() { defer func() { <-pool }() winnerTask(data, mapInfo) }() case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "project": pool <- true go func() { defer func() { <-pool }() projectTask(data, mapInfo) }() case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引 pool <- true go func() { defer func() { <-pool }() biddingBackTask(data, mapInfo) }() case "biddingall": //合并并重新生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingAllTask(data, mapInfo) }() case "biddingdata": //联表生成索引不合并,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingDataTask(data, mapInfo) }() case "biddingmerge": //重新合并但不生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingMergeTask(data, mapInfo) }() case "buyer": pool <- true go func() { defer func() { <-pool }() buyerTask(data, mapInfo) }() case "winnerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("winnerent", mapInfo) }() case "buyerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("buyerent", mapInfo) }() case "agencyent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("agencyent", mapInfo) }() default: pool <- true go func() { defer func() { <-pool }() defaultFunc(data, mapInfo) }() } } case mu.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } }