123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- package main
- /**
- 修复没有合并的抽取数据到bidding表中,并删除缓存和生成索引
- ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -gtid 5b582c12a5cb26b9b77fcce6 -lteid 5b582c12a5cb26b9b77fcd01
- ./sendtask -ip 127.0.0.1 -p 14899 -stype bidding -q "{'comeintime':{'\$gte':1537246800,'\$lte':1537254000}}"
- **/
- import (
- "encoding/json"
- "flag"
- "log"
- mu "mfw/util"
- "net"
- "qfw/util"
- elastic "qfw/util/elastic"
- "qfw/util/mongodb"
- "qfw/util/redis"
- "strings"
- "time"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mgo *mongodb.MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- updport string
- winner, bidding, biddingback, project, buyer map[string]interface{}
- savesizei = 500
- biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
- projectinfoFields []string
- force = 0
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- inits()
- updport, _ = Sysconfig["updport"].(string)
- bidding, _ = Sysconfig["bidding"].(map[string]interface{})
- index, _ = bidding["index"].(string)
- itype, _ = bidding["type"].(string)
- c, _ = bidding["collect"].(string)
- extractc, _ = bidding["extractcollect"].(string)
- db, _ = bidding["db"].(string)
- //extractdb, _ := bidding["extractdb"].(string)
- fields = strings.Split(bidding["fields"].(string), ",")
- biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
- mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- Size: util.IntAllDef(mconf["pool"], 5),
- DbName: mconf["db"].(string),
- }
- mgo.InitPool()
- econf := Sysconfig["elastic"].(map[string]interface{})
- elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
- redisaddr, _ := Sysconfig["redisaddr"].(string)
- redis.InitRedisBySize(redisaddr, 10, 5, 240)
- if bidding["indexfields"] != nil {
- biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
- }
- if bidding["projectinfo"] != nil {
- pf := util.ObjToString(bidding["projectinfo"])
- if pf != "" {
- projectinfoFields = strings.Split(pf, ",")
- }
- }
- log.Println(projectinfoFields)
- }
- var lastId = ""
- //func CheckMgoTask() {
- // if lastId != "" {
- // res, bres := mgo.Find(extractc, `{}`, `{"_id":-1}`, `{"_id":1}`, true, -1, -1)
- // if bres && res != nil && len(*res) == 1 {
- // id := util.BsonIdToSId(((*res)[0])["_id"])
- // if id > lastId {
- // mapInfo := map[string]interface{}{
- // "gtid": lastId,
- // "lteid": id,
- // }
- // biddingTask(nil, mapInfo)
- // log.Println("task over!", lastId, id)
- // lastId = id
- // }
- // }
- // }
- // time.AfterFunc(5*time.Minute, CheckMgoTask)
- //}
- var (
- index, itype, c, extractc, db string
- fields []string
- )
- func main() {
- flag.IntVar(&force, "f", 0, "是否强制执行")
- flag.StringVar(&lastId, "id", "", "上次执行id")
- flag.Parse()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- go delEsTask()
- time.Sleep(99999 * time.Hour)
- }
- var pool = make(chan bool, 20)
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo, string(data))
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- udpclient.WriteUdp([]byte("ok,run"), mu.OP_NOOP, ra)
- tasktype, _ := mapInfo["stype"].(string)
- log.Println("tasktype:", tasktype)
- switch tasktype {
- case "bidding":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingTask(data, mapInfo)
- }()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("发送成功", string(data))
- }
- }
|