|
@@ -1,419 +1,16 @@
|
|
|
-{
|
|
|
- "port": "9090",
|
|
|
- "mgodb": "127.0.0.1:27017",
|
|
|
- "dbsize": 3,
|
|
|
- "dbname": "extract_local",
|
|
|
- "dbname_addrs": "extract_local",
|
|
|
- "dbname_addrs_c": "address_new_2020",
|
|
|
- "redis": "qyk_redis=192.168.3.207:6379",
|
|
|
- "elasticsearch": "http://127.0.0.1:9800",
|
|
|
- "elasticsearch_index": "winner_enterprise_tmp",
|
|
|
- "elasticsearch_type": "winnerent",
|
|
|
- "elasticsearch_db": "winner_enterprise",
|
|
|
- "elasticsearch_buyer_index": "buyer_enterprise_tmp",
|
|
|
- "elasticsearch_buyer_type": "buyerent",
|
|
|
- "elasticsearch_buyer_db": "buyer_enterprise",
|
|
|
- "elasticsearch_agency_index": "agency_enterprise_tmp",
|
|
|
- "elasticsearch_agency_type": "agencyent",
|
|
|
- "elasticsearch_agency_db": "agency_enterprise",
|
|
|
- "redis_qyk": "qyk_redis",
|
|
|
- "redis_winner_db": "1",
|
|
|
- "redis_buyer_db": "2",
|
|
|
- "redis_agency_db": "3",
|
|
|
- "elasticPoolSize": 1,
|
|
|
- "mergetable": "projectset",
|
|
|
- "mergetablealias": "projectset_v1",
|
|
|
- "ffield": true,
|
|
|
- "saveresult": false,
|
|
|
- "fieldsfind": false,
|
|
|
- "qualityaudit": false,
|
|
|
- "saveblock": false,
|
|
|
- "filelength": 150000,
|
|
|
- "iscltlog": false,
|
|
|
- "brandgoods": false,
|
|
|
- "pricenumber":true,
|
|
|
- "udptaskid": "60b493c2e138234cb4adb640",
|
|
|
- "udpport": "1484",
|
|
|
- "nextNode": [
|
|
|
- {
|
|
|
- "addr": "127.0.0.1",
|
|
|
- "port": 1485,
|
|
|
- "memo": "抽取城市"
|
|
|
- }
|
|
|
- ],
|
|
|
- "esconfig": {
|
|
|
- "available": false,
|
|
|
- "AccessID": "",
|
|
|
- "AccessSecret": "",
|
|
|
- "ZoneIds": [
|
|
|
- {
|
|
|
- "zoneid": "cn-beijing-f",
|
|
|
- "LaunchTemplateId4": "lt-2zejb8ayql48hn0hcjpy",
|
|
|
- "LaunchTemplateId8": "lt-2zegx87hj07phcdtoh61",
|
|
|
- "vswitchid": "vsw-2zei6snkgmqxcnnx6g04d"
|
|
|
- },
|
|
|
- {
|
|
|
- "zoneid": "cn-beijing-g",
|
|
|
- "LaunchTemplateId4": "lt-2ze5ktfgopayi48ok0hu",
|
|
|
- "LaunchTemplateId8": "lt-2ze0qfrxdnkuwldj9s0u",
|
|
|
- "vswitchid": "vsw-2ze586sxfwsaov4s5w88d"
|
|
|
- },
|
|
|
- {
|
|
|
- "zoneid": "cn-beijing-h",
|
|
|
- "LaunchTemplateId4": "lt-2ze5ir54gy4ui8okr71f",
|
|
|
- "LaunchTemplateId8": "lt-2ze5fzxwgt8jcqczvmjy",
|
|
|
- "vswitchid": "vsw-2ze1n1k3mo3fv2irsfdps"
|
|
|
- }
|
|
|
- ]
|
|
|
- },
|
|
|
- "istest": true,
|
|
|
- "isSaveTag": false,
|
|
|
- "tomail": "zhengkun@topnet.net.cn",
|
|
|
- "api": "http://10.171.112.160:19281/_send/_mail",
|
|
|
- "deleteInstanceTimeHour": 1,
|
|
|
- "jsondata_extweight": 1
|
|
|
-}
|
|
|
-
|
|
|
+//初始化redis
|
|
|
+redis.InitRedisBySize(qu.ObjToString(u.Config["redis"]), 50, 30, 240)
|
|
|
+//初始化elastic连接
|
|
|
+elastic.InitElasticSize(qu.ObjToString(u.Config["elasticsearch"]), qu.IntAllDef(3, 30))
|
|
|
+u.WinnerDB = qu.IntAll(u.Config["redis_winner_db"])
|
|
|
+u.BuyerDB = qu.IntAll(u.Config["redis_buyer_db"])
|
|
|
+u.AgencyDB = qu.IntAll(u.Config["redis_agency_db"])
|
|
|
|
|
|
|
|
|
|
|
|
-// extractudp
|
|
|
-package extract
|
|
|
-
|
|
|
-import (
|
|
|
- "encoding/json"
|
|
|
- "fmt"
|
|
|
- db "jy/mongodbutil"
|
|
|
- ju "jy/util"
|
|
|
- mu "mfw/util"
|
|
|
- "net"
|
|
|
- qu "qfw/util"
|
|
|
- "sync"
|
|
|
- "time"
|
|
|
-
|
|
|
- log "github.com/donnie4w/go-logger/logger"
|
|
|
- "gopkg.in/mgo.v2/bson"
|
|
|
-)
|
|
|
-
|
|
|
-var Udpclient mu.UdpClient //udp对象
|
|
|
-var nextNodes []map[string]interface{}
|
|
|
-
|
|
|
-//udp通知抽取
|
|
|
-func ExtractUdp() {
|
|
|
- nextNodes = qu.ObjArrToMapArr(ju.Config["nextNode"].([]interface{}))
|
|
|
- Udpclient = mu.UdpClient{Local: ":" + qu.ObjToString(ju.Config["udpport"]), BufSize: 1024}
|
|
|
- Udpclient.Listen(processUdpMsg)
|
|
|
-}
|
|
|
-
|
|
|
-func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
- switch act {
|
|
|
- case mu.OP_TYPE_DATA:
|
|
|
- var rep map[string]interface{}
|
|
|
- err := json.Unmarshal(data, &rep)
|
|
|
- if err != nil {
|
|
|
- log.Debug(err)
|
|
|
- } else {
|
|
|
- stype, _ := rep["stype"].(string)
|
|
|
- if stype == "distributed" { //分布式抽取分支
|
|
|
- go Udpclient.WriteUdp([]byte("发送分布式抽取分支"+qu.ObjToString(rep["ip"])+"udpok"), mu.OP_NOOP, ra)
|
|
|
- InstanceId := qu.ObjToString(rep["InstanceId"])
|
|
|
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "extstatus": "running",
|
|
|
- },
|
|
|
- }, true, false)
|
|
|
- ExtractByUdp("", "", ra, qu.ObjToString(rep["InstanceId"]), qu.ObjToString(rep["ip"]))
|
|
|
- db.Mgo.Update("ecs", `{"InstanceId":"`+InstanceId+`"}`,
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "extstatus": "ok",
|
|
|
- },
|
|
|
- }, true, false)
|
|
|
- log.Debug("分布式抽取完成,可以释放esc实例", qu.ObjToString(rep["ip"]))
|
|
|
- } else {
|
|
|
- sid, _ := rep["gtid"].(string)
|
|
|
- eid, _ := rep["lteid"].(string)
|
|
|
- if sid == "" || eid == "" {
|
|
|
- log.Debug("err", "sid=", sid, ",eid=", eid)
|
|
|
- } else {
|
|
|
- udpinfo, _ := rep["key"].(string)
|
|
|
- if udpinfo == "" {
|
|
|
- udpinfo = "udpok"
|
|
|
- }
|
|
|
- go Udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
|
|
|
- log.Debug("udp通知抽取id段", sid, " ", eid)
|
|
|
- ExtractByUdp(sid, eid, ra)
|
|
|
- for _, m := range nextNodes {
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
- "stype": qu.ObjToString(m["stype"]),
|
|
|
- })
|
|
|
- err := Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
|
|
|
- IP: net.ParseIP(m["addr"].(string)),
|
|
|
- Port: qu.IntAll(m["port"]),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- log.Debug(err)
|
|
|
- }
|
|
|
- }
|
|
|
- log.Debug("udp通知抽取完成,eid=", eid)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- case mu.OP_NOOP: //下个节点回应
|
|
|
- log.Debug(string(data))
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-var ext *ExtractTask
|
|
|
-
|
|
|
-//根据id区间抽取-udp模式
|
|
|
-func ExtractByUdp(sid, eid string, ra *net.UDPAddr, instanceId ...string) {
|
|
|
- defer qu.Catch()
|
|
|
- if ext == nil {
|
|
|
- ext = &ExtractTask{}
|
|
|
- ext.Id = qu.ObjToString(ju.Config["udptaskid"])
|
|
|
- ext.InitTaskInfo()
|
|
|
- ext.TaskInfo.FDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
|
|
|
- ext.TaskInfo.TDB = db.MgoFactory(3, 5, 600, ext.TaskInfo.ToDbAddr, ext.TaskInfo.ToDB)
|
|
|
- ext.InitSite()
|
|
|
- ext.InitRulePres()
|
|
|
- ext.InitRuleBacks(false)
|
|
|
- ext.InitRuleBacks(true)
|
|
|
- ext.InitRuleCore(false)
|
|
|
- ext.InitRuleCore(true)
|
|
|
- ext.InitBlockRule()
|
|
|
- ext.InitPkgCore()
|
|
|
- ext.InitTag(false)
|
|
|
- ext.InitTag(true)
|
|
|
- ext.InitClearFn(false)
|
|
|
- ext.InitClearFn(true)
|
|
|
- ext.Lock()
|
|
|
- //ext.IsExtractCity = false
|
|
|
- if ext.IsExtractCity { //版本上控制是否开始城市抽取
|
|
|
- //初始化城市DFA信息
|
|
|
- //ext.InitCityDFA()
|
|
|
- ext.InitCityInfo()
|
|
|
- ext.InitAreaCode()
|
|
|
- ext.InitPostCode()
|
|
|
- }
|
|
|
- ext.Unlock()
|
|
|
- //质量审核
|
|
|
- ext.InitAuditFields()
|
|
|
- ext.InitAuditRule()
|
|
|
- ext.InitAuditClass()
|
|
|
- ext.InitAuditRecogField()
|
|
|
|
|
|
- //品牌抽取是否开启
|
|
|
- ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
|
|
|
-
|
|
|
- ext.ResultSave(true)
|
|
|
- ext.BidSave(true)
|
|
|
- ext.IsRun = true
|
|
|
- ext.InitFile()
|
|
|
- } else {
|
|
|
- ext.BidTotal = 0
|
|
|
- }
|
|
|
- index := 0
|
|
|
- if len(instanceId) > 0 { //分布式抽取进度
|
|
|
- go Udpclient.WriteUdp([]byte(fmt.Sprintf("IP=%s,数据接收成功", instanceId[1])), mu.OP_NOOP, ra)
|
|
|
- for {
|
|
|
- tsk, b := db.Mgo.FindOne("esctask", `{"state":{"$lt":1}}`)
|
|
|
- if tsk != nil && !b {
|
|
|
- break
|
|
|
- }
|
|
|
- db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "InstanceId": instanceId[0],
|
|
|
- "state": 1,
|
|
|
- "runtime": time.Now().Format(qu.Date_Full_Layout),
|
|
|
- },
|
|
|
- })
|
|
|
- query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(qu.ObjToString((*tsk)["sid"])), "$lte": bson.ObjectIdHex(qu.ObjToString((*tsk)["eid"]))}}
|
|
|
- count1 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
|
- count2 := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl+"_back", query)
|
|
|
- log.Debug("timestr", (*tsk)["timestr"], "count", count1+count2)
|
|
|
- list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, -1, -1)
|
|
|
- for _, v := range *list {
|
|
|
- //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
- // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
|
|
|
- // continue
|
|
|
- //}
|
|
|
- if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
|
|
|
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
|
|
|
- continue
|
|
|
- }
|
|
|
- var j, jf *ju.Job
|
|
|
- var isSite bool
|
|
|
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
|
|
|
- v["isextFile"] = true
|
|
|
- j, jf, isSite = ext.PreInfo(v)
|
|
|
- } else {
|
|
|
- j, _, isSite = ext.PreInfo(v)
|
|
|
- }
|
|
|
- go ext.ExtractProcess(j, jf, isSite)
|
|
|
- index++
|
|
|
- ext.TaskInfo.ProcessPool <- true
|
|
|
- }
|
|
|
- list2, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl+"_back", query, nil, Fields, false, -1, -1)
|
|
|
- for _, v := range *list2 {
|
|
|
- if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
- continue
|
|
|
- }
|
|
|
- var j, jf *ju.Job
|
|
|
- var isSite bool
|
|
|
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
|
|
|
- v["isextFile"] = true
|
|
|
- j, jf, isSite = ext.PreInfo(v)
|
|
|
- } else {
|
|
|
- j, _, isSite = ext.PreInfo(v)
|
|
|
- }
|
|
|
- go ext.ExtractProcess(j, jf, isSite)
|
|
|
- index++
|
|
|
- ext.TaskInfo.ProcessPool <- true
|
|
|
- }
|
|
|
- db.Mgo.UpdateById("esctask", (*tsk)["_id"], map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "InstanceId": instanceId[0],
|
|
|
- "oktime": time.Now().Format(qu.Date_Full_Layout),
|
|
|
- "state": 1,
|
|
|
- },
|
|
|
- })
|
|
|
- db.Mgo.Update("ecs", `{"InstanceId":"`+instanceId[0]+`"}`,
|
|
|
- map[string]interface{}{
|
|
|
- "$inc": map[string]interface{}{
|
|
|
- "totalnum": count1 + count2,
|
|
|
- "step": 1,
|
|
|
- },
|
|
|
- }, true, false)
|
|
|
- }
|
|
|
- log.Debug("抽取完成", "index:", index, "bidtotal:", ext.BidTotal)
|
|
|
- } else {
|
|
|
- //普通抽取
|
|
|
- query := bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid), "$lte": bson.ObjectIdHex(eid)}}
|
|
|
- count := ext.TaskInfo.FDB.Count(ext.TaskInfo.FromColl, query)
|
|
|
- log.Debug("查询条件为:", query, "查询条数:", count)
|
|
|
- pageNum := (count + PageSize - 1) / PageSize
|
|
|
- limit := PageSize
|
|
|
- if count < PageSize {
|
|
|
- limit = count
|
|
|
- }
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- for i := 0; i < pageNum; i++ {
|
|
|
- query = bson.M{"_id": bson.M{"$gt": bson.ObjectIdHex(sid)}}
|
|
|
- fmt.Printf("page=%d,query=%v\n", i+1, query)
|
|
|
- list, _ := ext.TaskInfo.FDB.Find(ext.TaskInfo.FromColl, query, `{"_id":1}`, Fields, false, 0, limit)
|
|
|
- for _, v := range *list {
|
|
|
- //if qu.ObjToString(v["sensitive"]) != "" { //去除含敏感词数据
|
|
|
- // log.Debug(index, qu.BsonIdToSId(v["_id"]), "//去除含敏感词数据")
|
|
|
- // continue
|
|
|
- //}
|
|
|
- if spidercode[qu.ObjToString(v["spidercode"])] { //临时开标记录
|
|
|
- log.Debug(index, qu.BsonIdToSId(v["_id"]), "//开标记录")
|
|
|
- continue
|
|
|
- }
|
|
|
- _id := qu.BsonIdToSId(v["_id"])
|
|
|
- var j, jf *ju.Job
|
|
|
- var isSite bool
|
|
|
- if ext.IsFileField && (v["projectinfo"] != nil || v["attach_text"] != nil) {
|
|
|
- v["isextFile"] = true
|
|
|
- j, jf, isSite = ext.PreInfo(v)
|
|
|
- } else {
|
|
|
- j, _, isSite = ext.PreInfo(v)
|
|
|
- }
|
|
|
- ext.TaskInfo.ProcessPool <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(wg *sync.WaitGroup, j, jf *ju.Job) {
|
|
|
- defer wg.Done()
|
|
|
- //log.Debug(index,j.SourceMid,)
|
|
|
- ext.ExtractProcess(j, jf, isSite)
|
|
|
- }(&wg, j, jf)
|
|
|
- index++
|
|
|
- if index%1000 == 0 {
|
|
|
- log.Debug("index:", index, ",页码:", i+1, ",_id:", _id)
|
|
|
- }
|
|
|
- sid = _id
|
|
|
- if sid >= eid {
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
- ext.BidSave(false)
|
|
|
- log.Debug("抽取完成,", "count:", count, ",index:", index, ",bidtotal:", ext.BidTotal, ",eid:", eid)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-//中标预测信息抽取,ossid为附件识别后的id
|
|
|
-var exF *ExtractTask
|
|
|
-
|
|
|
-func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{} {
|
|
|
- defer qu.Catch()
|
|
|
- if exF == nil {
|
|
|
- exF = &ExtractTask{}
|
|
|
- exF.Id = qu.ObjToString(ju.Config["udptaskid"])
|
|
|
- exF.InitTaskInfo()
|
|
|
- exF.TaskInfo.FDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.FromDbAddr, exF.TaskInfo.FromDB)
|
|
|
- exF.TaskInfo.TDB = db.MgoFactory(1, 2, 600, exF.TaskInfo.ToDbAddr, exF.TaskInfo.ToDB)
|
|
|
- exF.InitSite()
|
|
|
- exF.InitRulePres()
|
|
|
- exF.InitRuleBacks(false)
|
|
|
- exF.InitRuleBacks(true)
|
|
|
- exF.InitRuleCore(false)
|
|
|
- exF.InitRuleCore(true)
|
|
|
- exF.InitBlockRule()
|
|
|
- exF.InitPkgCore()
|
|
|
- exF.InitTag(false)
|
|
|
- exF.InitTag(true)
|
|
|
- exF.InitClearFn(false)
|
|
|
- exF.InitClearFn(true)
|
|
|
-
|
|
|
- if exF.IsExtractCity { //版本上控制是否开始城市抽取
|
|
|
- //初始化城市DFA信息
|
|
|
- //exF.InitCityDFA()
|
|
|
- exF.InitCityInfo()
|
|
|
- exF.InitAreaCode()
|
|
|
- exF.InitPostCode()
|
|
|
- }
|
|
|
- //质量审核
|
|
|
- exF.InitAuditFields()
|
|
|
- exF.InitAuditRule()
|
|
|
- exF.InitAuditClass()
|
|
|
- exF.InitAuditRecogField()
|
|
|
-
|
|
|
- //品牌抽取是否开启
|
|
|
- ju.IsBrandGoods, _ = ju.Config["brandgoods"].(bool)
|
|
|
-
|
|
|
- exF.ResultSave(true)
|
|
|
- exF.BidSave(true)
|
|
|
- exF.IsRun = true
|
|
|
- exF.InitFile()
|
|
|
- }
|
|
|
- tmp, _ := exF.TaskInfo.FDB.FindById(exF.TaskInfo.FromColl, infoid, nil)
|
|
|
- if exF.IsFileField && ((*tmp)["projectinfo"] != nil || (*tmp)["attach_text"] != nil) {
|
|
|
- (*tmp)["isextFile"] = true
|
|
|
- }
|
|
|
- exF.TaskInfo.ProcessPool <- true
|
|
|
- j, jf, _ := exF.PreInfo(*tmp)
|
|
|
- wg := sync.WaitGroup{}
|
|
|
- wg.Add(1)
|
|
|
- go func(wg *sync.WaitGroup, j, jf *ju.Job) {
|
|
|
- defer wg.Done()
|
|
|
- exF.ExtractProcess(j, jf, false)
|
|
|
- }(&wg, j, jf)
|
|
|
- wg.Wait()
|
|
|
- exF.BidSave(false)
|
|
|
-
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-//网页版抽取
|
|
|
|
|
|
+//目前线上
|
|
|
{
|
|
|
"port": "9080",
|
|
|
"mgodb": "SJZY_RWExt_Other:SJZY%40E3X4t5O8th@172.17.145.163:27083",
|
|
@@ -447,7 +44,7 @@ func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{}
|
|
|
"iscltlog": false,
|
|
|
"brandgoods": false,
|
|
|
"pricenumber":true,
|
|
|
- "udptaskid": "60b49fab23119b54547d9a11",
|
|
|
+ "udpitaskid": "6275d34223119b206c86182e",
|
|
|
"udpport": "1177",
|
|
|
"nextNode": [ ],
|
|
|
"esconfig": {
|
|
@@ -476,10 +73,3 @@ func ExtractByBidForecast(infoid string, ossid ...string) map[string]interface{}
|
|
|
"deleteInstanceTimeHour": 1,
|
|
|
"jsondata_extweight": 1
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- //go extract.Export() //导出任务
|
|
|
- //extract.ClearUdp() //udp通知清理
|
|
|
- //go heart.HeartMonitor() //心跳监测
|