123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- mu "mfw/util"
- "mongodb"
- "net"
- "net/http"
- _ "net/http/pprof"
- "qfw/util"
- elastic "qfw/util/elastic"
- "qfw/util/redis"
- "strings"
- "time"
- u "util"
- )
- type Province struct {
- P_Name string
- }
- type City struct {
- P_Name string
- C_Name string
- }
- type District struct {
- P_Name string
- C_Name string
- D_Name string
- }
- var (
- Sysconfig map[string]interface{} //配置文件
- mgo *mongodb.MongodbSim //mongodb操作对象
- extractmgo *mongodb.MongodbSim //mongodb操作对象
- project2db *mongodb.MongodbSim //mongodb操作对象
- mgostandard *mongodb.MongodbSim //mongodb操作对象
- qyxydb *mongodb.MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- updport string
- savesizei = 500
- biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
- biddingIndexFieldsMap = map[string]string{}
- projectinfoFields []string
- projectinfoFieldsMap = map[string]string{}
- multiIndex []string
- purchasinglistFields []string
- winnerorderlistFields []string
- purchasinglistFieldsMap = map[string]string{}
- winnerorderlistFieldsMap = map[string]string{}
- BulkSize = 400
- detailLength = 50000
- fileLength = 50000
- //bidding_other连接信息
- bidding_other_es *elastic.Elastic
- other_index string
- other_itype string
- esAddr string
- esNode string
- FilterKeyword []string //正文竟品关键词过滤
- ProvinceDict map[string][]Province //省份-map
- CityDict map[string][]City //城市-map
- DistrictDict map[string][]District //区县-map
- winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
- )
- var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
- var SP = make(chan bool, 5)
- var SaveLogChan = make(chan []map[string]interface{}, 1000)
- var SaveSp = make(chan bool, 5)
- var StopFlag = false // 程序生索引停止标志
- func init() {
- util.ReadConfig(&Sysconfig)
- // company_id
- redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
- inits()
- //go checkMapJob()
- detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
- fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
- updport, _ = Sysconfig["updport"].(string)
- winner, _ = Sysconfig["winner"].(map[string]interface{})
- standard, _ = Sysconfig["standard"].(map[string]interface{})
- buyer, _ = Sysconfig["buyer"].(map[string]interface{})
- bidding, _ = Sysconfig["bidding"].(map[string]interface{})
- biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
- project, _ = Sysconfig["project"].(map[string]interface{})
- project2, _ = Sysconfig["project2"].(map[string]interface{})
- qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
- mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
- mgo = &mongodb.MongodbSim{ //mongodb为binding连接
- MongodbAddr: mconf["addr"].(string),
- Size: util.IntAllDef(mconf["pool"], 5),
- DbName: mconf["db"].(string),
- UserName: Sysconfig["uname"].(string),
- Password: Sysconfig["upwd"].(string),
- }
- mgo.InitPool()
- project2db = &mongodb.MongodbSim{
- MongodbAddr: project2["addr"].(string),
- Size: util.IntAllDef(project2["pool"], 5),
- DbName: project2["db"].(string),
- }
- project2db.InitPool()
- //企业数据
- qyxydb = &mongodb.MongodbSim{
- MongodbAddr: qyxy_ent["addr"].(string),
- Size: util.IntAllDef(qyxy_ent["pool"], 5),
- DbName: qyxy_ent["db"].(string),
- }
- qyxydb.InitPool()
- savedb, _ := Sysconfig["savedb"].(map[string]interface{})
- if savedb == nil {
- log.Println("未设置保存数据库,默认使用招标库")
- extractmgo = mgo
- } else { //savedb为抽取连接
- addr, _ := savedb["addr"].(string)
- size := util.IntAllDef(savedb["size"], 5)
- db, _ := savedb["db"].(string)
- extractmgo = &mongodb.MongodbSim{
- MongodbAddr: addr,
- Size: size,
- DbName: db,
- }
- extractmgo.InitPool()
- }
- mgostandard = &mongodb.MongodbSim{
- MongodbAddr: standard["addr"].(string),
- Size: util.IntAllDef(standard["pool"], 5),
- DbName: standard["db"].(string),
- UserName: Sysconfig["uname"].(string),
- Password: Sysconfig["upwd"].(string),
- }
- mgostandard.InitPool()
- //初始化es
- //bidding
- econf := Sysconfig["elastic"].(map[string]interface{})
- esAddr = econf["addr"].(string)
- esNode = econf["node"].(string)
- elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
- //bidding_other
- if Sysconfig["elastic_other"] != nil {
- econf_other := Sysconfig["elastic_other"].(map[string]interface{})
- other_index = econf_other["index"].(string)
- other_itype = econf_other["type"].(string)
- bidding_other_es = &elastic.Elastic{
- S_esurl: econf_other["addr"].(string),
- I_size: util.IntAllDef(econf_other["pool"], 5),
- }
- bidding_other_es.InitElasticSize()
- }
- //
- if bidding["indexfields"] != nil {
- biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
- }
- if bidding["projectinfo"] != nil {
- pf := util.ObjToString(bidding["projectinfo"])
- if pf != "" {
- projectinfoFields = strings.Split(pf, ",")
- }
- }
- if bidding["purchasinglist"] != nil {
- pcl := util.ObjToString(bidding["purchasinglist"])
- if pcl != "" {
- purchasinglistFields = strings.Split(pcl, ",")
- }
- }
- if bidding["winnerorder"] != nil {
- winnerorder := util.ObjToString(bidding["winnerorder"])
- if winnerorder != "" {
- winnerorderlistFields = strings.Split(winnerorder, ",")
- }
- }
- if bidding["multiIndex"] != nil {
- mi := util.ObjToString(bidding["multiIndex"])
- if mi != "" {
- multiIndex = strings.Split(mi, ",")
- }
- }
- //
- if bidding["indexfieldsmap"] != nil {
- for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
- biddingIndexFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(biddingIndexFieldsMap)
- }
- if bidding["projectinfomap"] != nil {
- for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
- projectinfoFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(projectinfoFieldsMap)
- }
- if bidding["purchasinglistmap"] != nil {
- for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
- purchasinglistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(purchasinglistFieldsMap)
- }
- if bidding["winnerordermap"] != nil {
- for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
- winnerorderlistFieldsMap[k] = util.ObjToString(v)
- }
- log.Println(winnerorderlistFieldsMap)
- }
- log.Println(projectinfoFields)
- log.Println(purchasinglistFields)
- initCheckCity()
- FilterKeyword = util.ObjArrToStringArr(Sysconfig["filter-keyword"].([]interface{}))
- //初始化oss
- u.InitOss()
- }
- func main() {
- //go inspectQuery()
- //go task_index()
- go UpdateExtract() //抽取表中新增entidlist字段
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- ch := make(chan bool, 1)
- <-ch
- }
- /**
- 检查es查询队列 10s查询一次
- */
- func inspectQuery() {
- ticker := time.NewTicker(time.Second * 10)
- url := esAddr + "/_nodes/stats/thread_pool"
- for range ticker.C {
- resp, _ := http.Get(url)
- if resp != nil && resp.Body != nil {
- defer resp.Body.Close()
- }
- body, _ := ioutil.ReadAll(resp.Body)
- respMap := make(map[string]interface{})
- err := json.Unmarshal(body, &respMap)
- if err == nil {
- if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
- if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
- if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
- index, _ := pool["index"].(map[string]interface{})
- search, _ := pool["search"].(map[string]interface{})
- bulk, _ := pool["bulk"].(map[string]interface{})
- if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
- util.Debug("es thread_pool index queue---", index["queue"])
- util.Debug("es thread_pool search queue---", search["queue"])
- util.Debug("es thread_pool bulk queue---", bulk["queue"])
- StopFlag = true
- } else {
- StopFlag = false
- }
- }
- }
- }
- }
- }
- }
- var pool = make(chan bool, 20)
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理生成企业数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo, string(data))
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- tasktype, _ := mapInfo["stype"].(string)
- log.Println("tasktype:", tasktype)
- switch tasktype {
- case "winner":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- winnerTask(data, mapInfo)
- }()
- case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingTask(data, mapInfo, tasktype)
- }()
- case "bidding_history": //增量id段历史数据
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingTask(data, mapInfo, tasktype)
- }()
- case "project":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- projectTask(data, project, mapInfo)
- }()
- case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingBackTask(data, mapInfo)
- }()
- case "biddingall": //合并并重新生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingAllTask(data, mapInfo)
- }()
- case "biddingdata": //bidding全量数据
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDataTask(data, mapInfo)
- }()
- case "biddingmerge": //重新合并但不生成索引,不生成关键词
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingMergeTask(data, mapInfo)
- }()
- case "buyer":
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- buyerTask(data, mapInfo)
- }()
- case "winnerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("winnerent", mapInfo)
- }()
- case "buyerent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("buyerent", mapInfo)
- }()
- case "agencyent": //标准库
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- standardTask("agencyent", mapInfo)
- }()
- case "biddingdelbyextract": //根据repeat删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtract(data, mapInfo)
- }()
- case "biddingdelbyextracttype": //根据extracttype删除es
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- biddingDelByExtracttype(data, mapInfo)
- }()
- default:
- pool <- true
- go func() {
- defer func() {
- <-pool
- }()
- defaultFunc(data, mapInfo)
- }()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- log.Println("发送成功", string(data))
- }
- }
- //初始化城市
- func initCheckCity() {
- //初始化-城市配置
- ProvinceDict = make(map[string][]Province, 0)
- CityDict = make(map[string][]City, 0)
- DistrictDict = make(map[string][]District, 0)
- q := map[string]interface{}{
- "town_code": map[string]interface{}{
- "$exists": 0,
- },
- }
- sess := mgostandard.GetMgoConn()
- defer mgostandard.DestoryMongoConn(sess)
- it := sess.DB("mixdata").C(util.ObjToString(standard["coll_area"])).Find(&q).Iter()
- total := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("当前数量:", total)
- }
- district_code := util.IntAll(tmp["district_code"])
- city_code := util.IntAll(tmp["city_code"])
- if district_code > 0 {
- province := util.ObjToString(tmp["province"])
- city := util.ObjToString(tmp["city"])
- district := util.ObjToString(tmp["district"])
- data := District{province, city, district}
- if DistrictDict[district] == nil {
- DistrictDict[district] = []District{data}
- } else {
- arr := DistrictDict[district]
- arr = append(arr, data)
- DistrictDict[district] = arr
- }
- } else {
- if city_code > 0 {
- province := util.ObjToString(tmp["province"])
- city := util.ObjToString(tmp["city"])
- data := City{province, city}
- if CityDict[city] == nil {
- CityDict[city] = []City{data}
- } else {
- arr := CityDict[city]
- arr = append(arr, data)
- CityDict[city] = arr
- }
- } else {
- province := util.ObjToString(tmp["province"])
- data := Province{province}
- if ProvinceDict[province] == nil {
- ProvinceDict[province] = []Province{data}
- } else {
- arr := ProvinceDict[province]
- arr = append(arr, data)
- ProvinceDict[province] = arr
- }
- }
- }
- tmp = make(map[string]interface{})
- }
- util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
- }
- func saveLog() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-SaveLogChan:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- SaveSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SaveSp
- }()
- extractmgo.UpSertBulk("createIndex_log", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SaveSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-SaveSp
- }()
- extractmgo.UpSertBulk("createIndex_log", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|