package main /** 修复没有合并的抽取数据到bidding表中,并删除缓存和生成索引 ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -gtid 5b582c12a5cb26b9b77fcce6 -lteid 5b582c12a5cb26b9b77fcd01 ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -q "{'comeintime':{'\$gte':1537246800,'\$lte':1537254000}}" **/ import ( "encoding/json" "flag" "log" mu "mfw/util" "net" "qfw/util" elastic "qfw/util/elastic" "qfw/util/mongodb" "qfw/util/redis" "strings" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mgo *mongodb.MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 updport string winner, bidding, biddingback, project, buyer map[string]interface{} savesizei = 500 biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"} projectinfoFields []string force = 0 ) func init() { util.ReadConfig(&Sysconfig) inits() updport, _ = Sysconfig["updport"].(string) bidding, _ = Sysconfig["bidding"].(map[string]interface{}) index, _ = bidding["index"].(string) itype, _ = bidding["type"].(string) c, _ = bidding["collect"].(string) extractc, _ = bidding["extractcollect"].(string) db, _ = bidding["db"].(string) //extractdb, _ := bidding["extractdb"].(string) fields = strings.Split(bidding["fields"].(string), ",") biddingback, _ = Sysconfig["biddingback"].(map[string]interface{}) mconf, _ := Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), Size: util.IntAllDef(mconf["pool"], 5), DbName: mconf["db"].(string), } mgo.InitPool() econf := Sysconfig["elastic"].(map[string]interface{}) elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5)) redisaddr, _ := Sysconfig["redisaddr"].(string) redis.InitRedisBySize(redisaddr, 10, 5, 240) if bidding["indexfields"] != nil { biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{})) } if bidding["projectinfo"] != nil { pf := util.ObjToString(bidding["projectinfo"]) if pf != "" { projectinfoFields = strings.Split(pf, ",") } } log.Println(projectinfoFields) } var lastId = "" //func CheckMgoTask() { // if lastId != "" { // res, bres := mgo.Find(extractc, `{}`, `{"_id":-1}`, `{"_id":1}`, true, -1, -1) // if bres && res != nil && len(*res) == 1 { // id := util.BsonIdToSId(((*res)[0])["_id"]) // if id > lastId { // mapInfo := map[string]interface{}{ // "gtid": lastId, // "lteid": id, // } // biddingTask(nil, mapInfo) // log.Println("task over!", lastId, id) // lastId = id // } // } // } // time.AfterFunc(5*time.Minute, CheckMgoTask) //} var ( index, itype, c, extractc, db string fields []string ) func main() { flag.IntVar(&force, "f", 0, "是否强制执行") flag.StringVar(&lastId, "id", "", "上次执行id") flag.Parse() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) go delEsTask() time.Sleep(99999 * time.Hour) } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { udpclient.WriteUdp([]byte("ok,run"), mu.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "bidding": pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo) }() } } case mu.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } }