package main import ( "encoding/json" "fmt" "io/ioutil" "log" mu "mfw/util" "mongodb" "net" "net/http" _ "net/http/pprof" "qfw/util" elastic "qfw/util/elastic" "qfw/util/redis" "strings" "time" u "util" ) type Province struct { P_Name string } type City struct { P_Name string C_Name string } type District struct { P_Name string C_Name string D_Name string } var ( Sysconfig map[string]interface{} //配置文件 mgo *mongodb.MongodbSim //mongodb操作对象 extractmgo *mongodb.MongodbSim //mongodb操作对象 project2db *mongodb.MongodbSim //mongodb操作对象 mgostandard *mongodb.MongodbSim //mongodb操作对象 qyxydb *mongodb.MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 updport string savesizei = 500 biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"} biddingIndexFieldsMap = map[string]string{} projectinfoFields []string projectinfoFieldsMap = map[string]string{} multiIndex []string purchasinglistFields []string winnerorderlistFields []string purchasinglistFieldsMap = map[string]string{} winnerorderlistFieldsMap = map[string]string{} BulkSize = 400 detailLength = 50000 fileLength = 50000 //bidding_other连接信息 bidding_other_es *elastic.Elastic other_index string other_itype string esAddr string esNode string FilterKeyword []string //正文竟品关键词过滤 ProvinceDict map[string][]Province //省份-map CityDict map[string][]City //城市-map DistrictDict map[string][]District //区县-map winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{} ) var UpdataMgoCache = make(chan []map[string]interface{}, 1000) var SP = make(chan bool, 5) var SaveLogChan = make(chan []map[string]interface{}, 1000) var SaveSp = make(chan bool, 5) var StopFlag = false // 程序生索引停止标志 func init() { util.ReadConfig(&Sysconfig) // company_id redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4) inits() //go checkMapJob() detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000) fileLength = util.IntAllDef(Sysconfig["filelength"], 50000) updport, _ = Sysconfig["updport"].(string) winner, _ = Sysconfig["winner"].(map[string]interface{}) standard, _ = Sysconfig["standard"].(map[string]interface{}) buyer, _ = Sysconfig["buyer"].(map[string]interface{}) bidding, _ = Sysconfig["bidding"].(map[string]interface{}) biddingback, _ = Sysconfig["biddingback"].(map[string]interface{}) project, _ = Sysconfig["project"].(map[string]interface{}) project2, _ = Sysconfig["project2"].(map[string]interface{}) qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{}) mconf, _ := Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ //mongodb为binding连接 MongodbAddr: mconf["addr"].(string), Size: util.IntAllDef(mconf["pool"], 5), DbName: mconf["db"].(string), UserName: Sysconfig["uname"].(string), Password: Sysconfig["upwd"].(string), } mgo.InitPool() project2db = &mongodb.MongodbSim{ MongodbAddr: project2["addr"].(string), Size: util.IntAllDef(project2["pool"], 5), DbName: project2["db"].(string), } project2db.InitPool() //企业数据 qyxydb = &mongodb.MongodbSim{ MongodbAddr: qyxy_ent["addr"].(string), Size: util.IntAllDef(qyxy_ent["pool"], 5), DbName: qyxy_ent["db"].(string), } qyxydb.InitPool() savedb, _ := Sysconfig["savedb"].(map[string]interface{}) if savedb == nil { log.Println("未设置保存数据库,默认使用招标库") extractmgo = mgo } else { //savedb为抽取连接 addr, _ := savedb["addr"].(string) size := util.IntAllDef(savedb["size"], 5) db, _ := savedb["db"].(string) extractmgo = &mongodb.MongodbSim{ MongodbAddr: addr, Size: size, DbName: db, } extractmgo.InitPool() } mgostandard = &mongodb.MongodbSim{ MongodbAddr: standard["addr"].(string), Size: util.IntAllDef(standard["pool"], 5), DbName: standard["db"].(string), UserName: Sysconfig["uname"].(string), Password: Sysconfig["upwd"].(string), } mgostandard.InitPool() //初始化es //bidding econf := Sysconfig["elastic"].(map[string]interface{}) esAddr = econf["addr"].(string) esNode = econf["node"].(string) elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5)) //bidding_other if Sysconfig["elastic_other"] != nil { econf_other := Sysconfig["elastic_other"].(map[string]interface{}) other_index = econf_other["index"].(string) other_itype = econf_other["type"].(string) bidding_other_es = &elastic.Elastic{ S_esurl: econf_other["addr"].(string), I_size: util.IntAllDef(econf_other["pool"], 5), } bidding_other_es.InitElasticSize() } // if bidding["indexfields"] != nil { biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{})) } if bidding["projectinfo"] != nil { pf := util.ObjToString(bidding["projectinfo"]) if pf != "" { projectinfoFields = strings.Split(pf, ",") } } if bidding["purchasinglist"] != nil { pcl := util.ObjToString(bidding["purchasinglist"]) if pcl != "" { purchasinglistFields = strings.Split(pcl, ",") } } if bidding["winnerorder"] != nil { winnerorder := util.ObjToString(bidding["winnerorder"]) if winnerorder != "" { winnerorderlistFields = strings.Split(winnerorder, ",") } } if bidding["multiIndex"] != nil { mi := util.ObjToString(bidding["multiIndex"]) if mi != "" { multiIndex = strings.Split(mi, ",") } } // if bidding["indexfieldsmap"] != nil { for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) { biddingIndexFieldsMap[k] = util.ObjToString(v) } log.Println(biddingIndexFieldsMap) } if bidding["projectinfomap"] != nil { for k, v := range bidding["projectinfomap"].(map[string]interface{}) { projectinfoFieldsMap[k] = util.ObjToString(v) } log.Println(projectinfoFieldsMap) } if bidding["purchasinglistmap"] != nil { for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) { purchasinglistFieldsMap[k] = util.ObjToString(v) } log.Println(purchasinglistFieldsMap) } if bidding["winnerordermap"] != nil { for k, v := range bidding["winnerordermap"].(map[string]interface{}) { winnerorderlistFieldsMap[k] = util.ObjToString(v) } log.Println(winnerorderlistFieldsMap) } log.Println(projectinfoFields) log.Println(purchasinglistFields) initCheckCity() FilterKeyword = util.ObjArrToStringArr(Sysconfig["filter-keyword"].([]interface{})) //初始化oss u.InitOss() } func main() { //go inspectQuery() //go task_index() go UpdateExtract() //抽取表中新增entidlist字段 updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) ch := make(chan bool, 1) <-ch } /** 检查es查询队列 10s查询一次 */ func inspectQuery() { ticker := time.NewTicker(time.Second * 10) url := esAddr + "/_nodes/stats/thread_pool" for range ticker.C { resp, _ := http.Get(url) if resp != nil && resp.Body != nil { defer resp.Body.Close() } body, _ := ioutil.ReadAll(resp.Body) respMap := make(map[string]interface{}) err := json.Unmarshal(body, &respMap) if err == nil { if data, o1 := respMap["nodes"].(map[string]interface{}); o1 { if nodes, o2 := data[esNode].(map[string]interface{}); o2 { if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 { index, _ := pool["index"].(map[string]interface{}) search, _ := pool["search"].(map[string]interface{}) bulk, _ := pool["bulk"].(map[string]interface{}) if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 { util.Debug("es thread_pool index queue---", index["queue"]) util.Debug("es thread_pool search queue---", search["queue"]) util.Debug("es thread_pool bulk queue---", bulk["queue"]) StopFlag = true } else { StopFlag = false } } } } } } } var pool = make(chan bool, 20) func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理生成企业数据 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo, string(data)) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) tasktype, _ := mapInfo["stype"].(string) log.Println("tasktype:", tasktype) switch tasktype { case "winner": pool <- true go func() { defer func() { <-pool }() winnerTask(data, mapInfo) }() case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo, tasktype) }() case "bidding_history": //增量id段历史数据 pool <- true go func() { defer func() { <-pool }() biddingTask(data, mapInfo, tasktype) }() case "project": pool <- true go func() { defer func() { <-pool }() projectTask(data, project, mapInfo) }() case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引 pool <- true go func() { defer func() { <-pool }() biddingBackTask(data, mapInfo) }() case "biddingall": //合并并重新生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingAllTask(data, mapInfo) }() case "biddingdata": //bidding全量数据 pool <- true go func() { defer func() { <-pool }() biddingDataTask(data, mapInfo) }() case "biddingmerge": //重新合并但不生成索引,不生成关键词 pool <- true go func() { defer func() { <-pool }() biddingMergeTask(data, mapInfo) }() case "buyer": pool <- true go func() { defer func() { <-pool }() buyerTask(data, mapInfo) }() case "winnerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("winnerent", mapInfo) }() case "buyerent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("buyerent", mapInfo) }() case "agencyent": //标准库 pool <- true go func() { defer func() { <-pool }() standardTask("agencyent", mapInfo) }() case "biddingdelbyextract": //根据repeat删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtract(data, mapInfo) }() case "biddingdelbyextracttype": //根据extracttype删除es pool <- true go func() { defer func() { <-pool }() biddingDelByExtracttype(data, mapInfo) }() default: pool <- true go func() { defer func() { <-pool }() defaultFunc(data, mapInfo) }() } } case mu.OP_NOOP: //下个节点回应 log.Println("发送成功", string(data)) } } //初始化城市 func initCheckCity() { //初始化-城市配置 ProvinceDict = make(map[string][]Province, 0) CityDict = make(map[string][]City, 0) DistrictDict = make(map[string][]District, 0) q := map[string]interface{}{ "town_code": map[string]interface{}{ "$exists": 0, }, } sess := mgostandard.GetMgoConn() defer mgostandard.DestoryMongoConn(sess) it := sess.DB("mixdata").C(util.ObjToString(standard["coll_area"])).Find(&q).Iter() total := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("当前数量:", total) } district_code := util.IntAll(tmp["district_code"]) city_code := util.IntAll(tmp["city_code"]) if district_code > 0 { province := util.ObjToString(tmp["province"]) city := util.ObjToString(tmp["city"]) district := util.ObjToString(tmp["district"]) data := District{province, city, district} if DistrictDict[district] == nil { DistrictDict[district] = []District{data} } else { arr := DistrictDict[district] arr = append(arr, data) DistrictDict[district] = arr } } else { if city_code > 0 { province := util.ObjToString(tmp["province"]) city := util.ObjToString(tmp["city"]) data := City{province, city} if CityDict[city] == nil { CityDict[city] = []City{data} } else { arr := CityDict[city] arr = append(arr, data) CityDict[city] = arr } } else { province := util.ObjToString(tmp["province"]) data := Province{province} if ProvinceDict[province] == nil { ProvinceDict[province] = []Province{data} } else { arr := ProvinceDict[province] arr = append(arr, data) ProvinceDict[province] = arr } } } tmp = make(map[string]interface{}) } util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict))) } func saveLog() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-SaveLogChan: arru[indexu] = v indexu++ if indexu == 200 { SaveSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-SaveSp }() extractmgo.UpSertBulk("createIndex_log", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SaveSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-SaveSp }() extractmgo.UpSertBulk("createIndex_log", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }