package main import ( "encoding/json" "log" mu "mfw/util" "net" "qfw/util" elastic "qfw/util/elastic" "qfw/util/mongodb" "strings" "time" u "util" ) var ( Sysconfig map[string]interface{} //配置文件 mgo *mongodb.MongodbSim //mongodb操作对象 extractmgo *mongodb.MongodbSim //mongodb操作对象 project2db *mongodb.MongodbSim //mongodb操作对象 mgostandard *mongodb.MongodbSim //mongodb操作对象 qyxydb *mongodb.MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 updport string savesizei = 500 biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"} biddingIndexFieldsMap = map[string]string{} projectinfoFields []string projectinfoFieldsMap = map[string]string{} multiIndex []string purchasinglistFields []string winnerorderlistFields []string purchasinglistFieldsMap = map[string]string{} winnerorderlistFieldsMap = map[string]string{} BulkSize = 400 detailLength = 50000 fileLength = 50000 //bidding_other连接信息 bidding_other_es *elastic.Elastic other_index string other_itype string winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{} ) func init() { util.ReadConfig(&Sysconfig) inits() go checkMapJob() detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000) fileLength = util.IntAllDef(Sysconfig["filelength"], 50000) updport, _ = Sysconfig["updport"].(string) winner, _ = Sysconfig["winner"].(map[string]interface{}) standard, _ = Sysconfig["standard"].(map[string]interface{}) buyer, _ = Sysconfig["buyer"].(map[string]interface{}) bidding, _ = Sysconfig["bidding"].(map[string]interface{}) biddingback, _ = Sysconfig["biddingback"].(map[string]interface{}) project, _ = Sysconfig["project"].(map[string]interface{}) project2, _ = Sysconfig["project2"].(map[string]interface{}) qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{}) mconf, _ := Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ //mongodb为binding连接 MongodbAddr: mconf["addr"].(string), Size: util.IntAllDef(mconf["pool"], 5), DbName: mconf["db"].(string), } mgo.InitPool() project2db = &mongodb.MongodbSim{ MongodbAddr: project2["addr"].(string), Size: util.IntAllDef(project2["pool"], 5), DbName: project2["db"].(string), } project2db.InitPool() //企业信用 qyxydb = &mongodb.MongodbSim{ MongodbAddr: qyxy_ent["addr"].(string), Size: util.IntAllDef(qyxy_ent["pool"], 5), DbName: qyxy_ent["db"].(string), } qyxydb.InitPool() savedb, _ := Sysconfig["savedb"].(map[string]interface{}) if savedb == nil { log.Println("未设置保存数据库,默认使用招标库") extractmgo = mgo } else { //savedb为抽取连接 addr, _ := savedb["addr"].(string) size := util.IntAllDef(savedb["size"], 5) db, _ := savedb["db"].(string) extractmgo = &mongodb.MongodbSim{ MongodbAddr: addr, Size: size, DbName: db, } extractmgo.InitPool() } mgostandard = &mongodb.MongodbSim{ MongodbAddr: standard["addr"].(string), Size: util.IntAllDef(standard["pool"], 5), DbName: standard["db"].(string), } mgostandard.InitPool() //初始化es //bidding econf := Sysconfig["elastic"].(map[string]interface{}) elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5)) //bidding_other if Sysconfig["elastic_other"] != nil { econf_other := Sysconfig["elastic_other"].(map[string]interface{}) other_index = econf_other["index"].(string) other_itype = econf_other["type"].(string) bidding_other_es = &elastic.Elastic{ S_esurl: econf_other["addr"].(string), I_size: util.IntAllDef(econf_other["pool"], 5), } bidding_other_es.InitElasticSize() } // if bidding["indexfields"] != nil { biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{})) } if bidding["projectinfo"] != nil { pf := util.ObjToString(bidding["projectinfo"]) if pf != "" { projectinfoFields = strings.Split(pf, ",") } } if bidding["purchasinglist"] != nil { pcl := util.ObjToString(bidding["purchasinglist"]) if pcl != "" { purchasinglistFields = strings.Split(pcl, ",") } } if bidding["winnerorder"] != nil { winnerorder := util.ObjToString(bidding["winnerorder"]) if winnerorder != "" { winnerorderlistFields = strings.Split(winnerorder, ",") } } if bidding["multiIndex"] != nil { mi := util.ObjToString(bidding["multiIndex"]) if mi != "" { multiIndex = strings.Split(mi, ",") } } // if bidding["indexfieldsmap"] != nil { for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) { biddingIndexFieldsMap[k] = util.ObjToString(v) } log.Println(biddingIndexFieldsMap) } if bidding["projectinfomap"] != nil { for k, v := range bidding["projectinfomap"].(map[string]interface{}) { projectinfoFieldsMap[k] = util.ObjToString(v) } log.Println(projectinfoFieldsMap) } if bidding["purchasinglistmap"] != nil { for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) { purchasinglistFieldsMap[k] = util.ObjToString(v) } log.Println(purchasinglistFieldsMap) } if bidding["winnerordermap"] != nil { for k, v := range bidding["winnerordermap"].(map[string]interface{}) { winnerorderlistFieldsMap[k] = util.ObjToString(v) } log.Println(winnerorderlistFieldsMap) } log.Println(projectinfoFields) log.Println(purchasinglistFields) //初始化oss u.InitOss() } func main() { go task_index() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) // time.Sleep(99999 * time.Hour) ch := make(chan bool, 1) <-ch } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "winner": pool <- true go func() { defer func() { <-pool }() winnerTask(data, mapInfo) }() case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() case "project": pool <- true go func() { defer func() { <-pool }() projectTask(data, project, mapInfo) }() case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引 pool <- true go func() { defer func() { <-pool }() biddingBackTask(data, mapInfo) }() case "biddingall": //合并并重新生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingAllTask(data, mapInfo) }() case "biddingdata": //联表生成索引不合并,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingDataTask(data, mapInfo) }() case "biddingmerge": //重新合并但不生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingMergeTask(data, mapInfo) }() case "buyer": pool <- true go func() { defer func() { <-pool }() buyerTask(data, mapInfo) }() case "winnerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("winnerent", mapInfo) }() case "buyerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("buyerent", mapInfo) }() case "agencyent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("agencyent", mapInfo) }() case "biddingdelbyextract": //根据repeat删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtract(data, mapInfo) }() case "biddingdelbyextracttype": //根据extracttype删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtracttype(data, mapInfo) }() default: pool <- true go func() { defer func() { <-pool }() defaultFunc(data, mapInfo) }() } } case mu.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } }