extract.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988
  1. package extract
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "jy/clear"
  6. db "jy/mongodbutil"
  7. "jy/pretreated"
  8. ju "jy/util"
  9. "log"
  10. qu "qfw/util"
  11. redis "qfw/util/redis"
  12. "reflect"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. lock sync.RWMutex
  22. cut = ju.NewCut() //获取正文并清理
  23. ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
  24. TaskList map[string]*ExtractTask //任务列表
  25. saveLimit = 200 //抽取日志批量保存
  26. PageSize = 5000 //查询分页
  27. Fields = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1}`
  28. )
  29. //启动测试抽取
  30. func StartExtractTestTask(taskId, startId, num, resultcoll, trackcoll string) bool {
  31. defer qu.Catch()
  32. ext := &ExtractTask{}
  33. ext.Id = taskId
  34. ext.IsRun = true
  35. ext.InitTestTaskInfo(resultcoll, trackcoll)
  36. ext.TaskInfo.DB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  37. ext.InitRulePres()
  38. ext.InitRuleBacks()
  39. ext.InitRuleCore()
  40. ext.InitTag()
  41. ext.InitClearFn()
  42. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  43. //初始化城市DFA信息
  44. ext.InitDFA()
  45. }
  46. //质量审核
  47. ext.InitAuditFields()
  48. ext.InitAuditRule()
  49. ext.InitAuditClass()
  50. ext.InitAuditRecogField()
  51. return RunExtractTestTask(ext, startId, num)
  52. }
  53. func IdTrans(startId string) bson.ObjectId {
  54. defer qu.Catch()
  55. return bson.ObjectIdHex(startId)
  56. }
  57. //开始测试任务抽取
  58. func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
  59. n, _ := strconv.Atoi(num)
  60. id := IdTrans(startId)
  61. if id.Valid() {
  62. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(startId)}}
  63. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n)
  64. for _, v := range *list {
  65. //log.Println(v["_id"])
  66. j := PreInfo(v)
  67. ext.TaskInfo.ProcessPool <- true
  68. go ext.ExtractProcess(j)
  69. }
  70. return true
  71. } else {
  72. return false
  73. }
  74. }
  75. //启动抽取
  76. func StartExtractTaskId(taskId string) bool {
  77. isgo := false
  78. ext := TaskList[taskId]
  79. if ext == nil {
  80. ext = &ExtractTask{}
  81. ext.Id = taskId
  82. ext.InitTaskInfo()
  83. isgo = true
  84. } else {
  85. ext.Id = taskId
  86. ext.InitTaskInfo()
  87. }
  88. ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  89. ext.InitRulePres()
  90. ext.InitRuleBacks()
  91. ext.InitRuleCore()
  92. ext.InitTag()
  93. ext.InitClearFn()
  94. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  95. //初始化城市DFA信息
  96. ext.InitDFA()
  97. }
  98. //质量审核
  99. ext.InitAuditFields()
  100. ext.InitAuditRule()
  101. ext.InitAuditClass()
  102. ext.InitAuditRecogField()
  103. ext.IsRun = true
  104. go ext.ResultSave()
  105. go ext.BidSave()
  106. if isgo {
  107. go RunExtractTask(taskId)
  108. }
  109. TaskList[taskId] = ext
  110. return true
  111. }
  112. //停止抽取
  113. func StopExtractTaskId(taskId string) bool {
  114. ext := TaskList[taskId]
  115. if ext != nil {
  116. ext.IsRun = false
  117. TaskList[taskId] = ext
  118. }
  119. //更新task.s_extlastid
  120. db.Mgo.UpdateById("task", taskId, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  121. return true
  122. }
  123. //开始抽取
  124. func RunExtractTask(taskId string) {
  125. ext := TaskList[taskId]
  126. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  127. count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
  128. pageNum := (count + PageSize - 1) / PageSize
  129. limit := PageSize
  130. if count < PageSize {
  131. limit = count
  132. }
  133. log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
  134. for i := 0; i < pageNum; i++ {
  135. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  136. log.Printf("page=%d,query=%v", i+1, query)
  137. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  138. for _, v := range *list {
  139. //log.Println(v["_id"])
  140. if !ext.IsRun {
  141. break
  142. }
  143. j := PreInfo(v)
  144. ext.TaskInfo.ProcessPool <- true
  145. go ext.ExtractProcess(j)
  146. ext.TaskInfo.LastExtId = qu.BsonIdToSId(v["_id"])
  147. }
  148. db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  149. if !ext.IsRun {
  150. break
  151. }
  152. }
  153. //更新task.s_extlastid
  154. time.AfterFunc(1*time.Minute, func() { RunExtractTask(taskId) })
  155. }
  156. //信息预处理
  157. func PreInfo(doc map[string]interface{}) *ju.Job {
  158. detail := ""
  159. d1, _ := doc["detail"].(string)
  160. d2, _ := doc["contenthtml"].(string)
  161. if len(d1) >= len(d2) || d2 == "" {
  162. detail = d1
  163. } else {
  164. detail = d2
  165. }
  166. detail = ju.CutLableStr(detail)
  167. detail = cut.ClearHtml(detail)
  168. doc["detail"] = detail
  169. href := qu.ObjToString(doc["href"])
  170. if strings.HasPrefix(href, "http://") {
  171. href = href[7:]
  172. } else if strings.HasPrefix(href, "https://") {
  173. href = href[8:]
  174. }
  175. pos := strings.Index(href, "/")
  176. if pos > 0 {
  177. href = href[:pos]
  178. }
  179. doc["domain"] = href
  180. toptype := qu.ObjToString(doc["toptype"])
  181. if qu.ObjToString(doc["type"]) == "bid" {
  182. toptype = "结果"
  183. }
  184. if toptype == "" {
  185. toptype = "*"
  186. }
  187. j := &ju.Job{
  188. SourceMid: qu.BsonIdToSId(doc["_id"]),
  189. Category: toptype,
  190. Content: qu.ObjToString(doc["detail"]),
  191. SpiderCode: qu.ObjToString(doc["spidercode"]),
  192. Domain: qu.ObjToString(doc["domain"]),
  193. Href: qu.ObjToString(doc["href"]),
  194. Title: qu.ObjToString(doc["title"]),
  195. Data: &doc,
  196. City: qu.ObjToString(doc["city"]),
  197. Province: qu.ObjToString(doc["area"]),
  198. Result: map[string][]*ju.ExtField{},
  199. BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
  200. }
  201. qu.Try(func() {
  202. pretreated.AnalyStart(j)
  203. }, func(err interface{}) {
  204. log.Println("pretreated.AnalyStart", err)
  205. })
  206. return j
  207. }
  208. //抽取
  209. func (e *ExtractTask) ExtractProcess(j *ju.Job) {
  210. qu.Try(func() {
  211. doc := *j.Data
  212. //全局前置规则,结果覆盖doc属性
  213. for _, v := range e.RulePres {
  214. doc = ExtRegPre(doc, j, v, e.TaskInfo)
  215. }
  216. //抽取规则
  217. for _, vc := range e.RuleCores {
  218. tmp := ju.DeepCopy(doc).(map[string]interface{})
  219. //是否进入逻辑
  220. if !ju.Logic(vc.LuaLogic, tmp) {
  221. continue
  222. }
  223. //抽取-前置规则
  224. for _, v := range vc.RulePres {
  225. tmp = ExtRegPre(tmp, j, v, e.TaskInfo)
  226. }
  227. //log.Println("抽取-前置规则", tmp)
  228. //抽取-规则
  229. for _, v := range vc.RuleCores {
  230. ExtRegCore(vc.ExtFrom, tmp, j, v, e)
  231. }
  232. //log.Println("抽取-规则", tmp)
  233. //抽取-后置规则
  234. for _, v := range vc.RuleBacks {
  235. ExtRegBack(j, v, e.TaskInfo)
  236. }
  237. //log.Println("抽取-后置规则", tmp)
  238. }
  239. //全局后置规则
  240. for _, v := range e.RuleBacks {
  241. ExtRegBack(j, v, e.TaskInfo)
  242. }
  243. //候选人加入
  244. if len(j.Winnerorder) > 0 {
  245. winner := &ju.ExtField{
  246. Field: "winner",
  247. Code: "",
  248. RuleText: "",
  249. Type: "winnerorder",
  250. MatchType: "winnerorder",
  251. ExtFrom: "",
  252. Value: j.Winnerorder[0]["entname"],
  253. Score: 0,
  254. }
  255. if len([]rune(qu.ObjToString(j.Winnerorder[0]["entname"]))) < 4 {
  256. winner.Score = -5
  257. }
  258. winners := j.Result["winner"]
  259. if winners != nil {
  260. winners = append(winners, winner)
  261. } else {
  262. winners = []*ju.ExtField{}
  263. winners = append(winners, winner)
  264. }
  265. j.Result["winner"] = winners
  266. }
  267. //函数清理
  268. for key, val := range j.Result {
  269. for _, v := range val {
  270. lock.Lock()
  271. cfn := e.ClearFn[key]
  272. lock.Unlock()
  273. data := clear.DoClearFn(cfn, []interface{}{v.Value, j.Content})
  274. v.Value = data[0]
  275. //清理特殊符号
  276. if clear.AsyField[key] != nil || clear.SymField[key] != nil ||
  277. clear.MesField[key] != nil {
  278. text := qu.ObjToString(v.Value)
  279. text = clear.OtherClean(key, text)
  280. v.Value = text
  281. }
  282. }
  283. }
  284. PackageDetail(j, e) //处理分包信息
  285. // bs, _ := json.Marshal(j.Result)
  286. // log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
  287. //分析抽取结果并保存 todo
  288. AnalysisSaveResult(j, e)
  289. }, func(err interface{}) {
  290. log.Println((*j.Data)["_id"], err)
  291. <-e.TaskInfo.ProcessPool
  292. })
  293. <-e.TaskInfo.ProcessPool
  294. }
  295. //前置过滤
  296. func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} {
  297. before := ju.DeepCopy(doc).(map[string]interface{})
  298. extinfo := map[string]interface{}{}
  299. if in.IsLua {
  300. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  301. if j != nil {
  302. lua.Block = j.Block
  303. }
  304. extinfo = lua.RunScript("pre")
  305. for k, v := range extinfo { //结果覆盖原doc
  306. doc[k] = v
  307. }
  308. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  309. } else {
  310. key := qu.If(in.Field == "", "detail", in.Field).(string)
  311. text := qu.ObjToString(doc[key])
  312. extinfo[key] = in.RegPreBac.Reg.ReplaceAllString(text, "")
  313. doc[key] = extinfo[key] //结果覆盖原doc
  314. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  315. }
  316. return doc
  317. }
  318. //抽取-规则
  319. func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) {
  320. if in.IsLua {
  321. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  322. lua.KvMap = getKvByLuaFields(extfrom, j, in, et.Tag)
  323. lua.Block = j.Block
  324. extinfo := lua.RunScript("core")
  325. for k, v := range extinfo {
  326. if k == in.Field {
  327. if j.Result[k] == nil {
  328. j.Result[k] = [](*ju.ExtField){}
  329. }
  330. if tmps, ok := v.([]map[string]interface{}); ok {
  331. for _, tmp := range tmps {
  332. j.Result[k] = append(j.Result[k],
  333. &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), extfrom, tmp["value"], 0})
  334. }
  335. }
  336. }
  337. }
  338. if len(extinfo) > 0 {
  339. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  340. }
  341. } else {
  342. //全文正则
  343. text := qu.ObjToString(doc[extfrom])
  344. if in.Field != "" {
  345. extinfo := extRegCoreToResult(extfrom, text, j, in)
  346. if len(extinfo) > 0 {
  347. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  348. }
  349. }
  350. }
  351. }
  352. //lua脚本根据属性设置提取kv值
  353. func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} {
  354. kvmap := map[string][]map[string]interface{}{}
  355. for fieldname, field := range in.LFields {
  356. lock.Lock()
  357. tags := t[field] //获取对应标签库
  358. lock.Unlock()
  359. for _, bl := range j.Block {
  360. //冒号kv
  361. if bl.ColonKV != nil {
  362. kvs := bl.ColonKV.Kvs
  363. kvs2 := bl.ColonKV.Kvs_2
  364. //log.Println("ColonKV1", kvs)
  365. //log.Println("ColonKV2", kvs2)
  366. for _, tag := range tags {
  367. for _, kv := range kvs {
  368. if tag.Type == "string" {
  369. if kv.Key == tag.Key {
  370. text := ju.TrimLRSpace(kv.Value, "")
  371. if text != "" {
  372. kvmap[field] = append(kvmap[field], map[string]interface{}{
  373. "field": field,
  374. "code": in.Code,
  375. "ruletext": tag.Key,
  376. "extfrom": extfrom,
  377. "value": text,
  378. "type": "colon1",
  379. "matchtype": "tag_string",
  380. })
  381. }
  382. break
  383. }
  384. } else if tag.Type == "regexp" {
  385. if tag.Reg.MatchString(kv.Key) {
  386. text := ju.TrimLRSpace(kv.Value, "")
  387. if text != "" {
  388. kvmap[field] = append(kvmap[field], map[string]interface{}{
  389. "field": field,
  390. "code": in.Code,
  391. "ruletext": tag.Key,
  392. "extfrom": extfrom,
  393. "value": text,
  394. "type": "colon1",
  395. "matchtype": "tag_regexp",
  396. })
  397. }
  398. break
  399. }
  400. }
  401. }
  402. for _, kv := range kvs2 {
  403. if tag.Type == "string" {
  404. if kv.Key == tag.Key {
  405. text := ju.TrimLRSpace(kv.Value, "")
  406. if text != "" {
  407. kvmap[field] = append(kvmap[field], map[string]interface{}{
  408. "field": field,
  409. "code": in.Code,
  410. "ruletext": tag.Key,
  411. "extfrom": extfrom,
  412. "value": text,
  413. "type": "colon2",
  414. "matchtype": "tag_string",
  415. })
  416. }
  417. break
  418. }
  419. } else if tag.Type == "regexp" {
  420. if tag.Reg.MatchString(kv.Key) {
  421. text := ju.TrimLRSpace(kv.Value, "")
  422. if text != "" {
  423. kvmap[field] = append(kvmap[field], map[string]interface{}{
  424. "field": field,
  425. "code": in.Code,
  426. "ruletext": tag.Key,
  427. "extfrom": extfrom,
  428. "value": text,
  429. "type": "colon2",
  430. "matchtype": "tag_regexp",
  431. })
  432. }
  433. break
  434. }
  435. }
  436. }
  437. }
  438. }
  439. //空格kv
  440. if bl.SpaceKV != nil {
  441. kvs := bl.SpaceKV.Kvs
  442. //log.Println("SpaceKV", kvs)
  443. for _, tag := range tags {
  444. for _, kv := range kvs {
  445. if tag.Type == "string" {
  446. if kv.Key == tag.Key {
  447. text := ju.TrimLRSpace(kv.Value, "")
  448. if text != "" {
  449. kvmap[field] = append(kvmap[field], map[string]interface{}{
  450. "field": field,
  451. "code": in.Code,
  452. "ruletext": tag.Key,
  453. "extfrom": extfrom,
  454. "value": text,
  455. "type": "space",
  456. "matchtype": "tag_string",
  457. })
  458. }
  459. break
  460. }
  461. } else if tag.Type == "regexp" {
  462. if tag.Reg.MatchString(kv.Key) {
  463. text := ju.TrimLRSpace(kv.Value, "")
  464. if text != "" {
  465. kvmap[field] = append(kvmap[field], map[string]interface{}{
  466. "field": field,
  467. "code": in.Code,
  468. "ruletext": tag.Key,
  469. "extfrom": extfrom,
  470. "value": text,
  471. "type": "space",
  472. "matchtype": "tag_regexp",
  473. })
  474. }
  475. break
  476. }
  477. }
  478. }
  479. }
  480. }
  481. //表格kv
  482. if bl.TableKV != nil {
  483. tkv := bl.TableKV
  484. //log.Println("tkv", tkv)
  485. for k, v := range tkv.Kv {
  486. if k == fieldname {
  487. if len(tags) > -tkv.KvIndex[fieldname] {
  488. ruletext := ""
  489. if fieldname == "项目名称" && -tkv.KvIndex[fieldname] == -100 {
  490. ruletext = "项目名称"
  491. } else {
  492. ruletext = tags[-tkv.KvIndex[fieldname]].Key
  493. }
  494. kvmap[field] = append(kvmap[field], map[string]interface{}{
  495. "field": field,
  496. "code": in.Code,
  497. "ruletext": ruletext,
  498. "extfrom": "table",
  499. "value": v,
  500. "type": "table",
  501. "matchtype": "tag_string",
  502. })
  503. } else { //涉及其他待处理
  504. //log.Println(tags)
  505. }
  506. }
  507. }
  508. }
  509. }
  510. }
  511. return kvmap
  512. }
  513. //正则提取结果
  514. func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} {
  515. extinfo := map[string][]map[string]interface{}{}
  516. if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
  517. apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
  518. if len(apos) > 0 {
  519. pos := apos[0]
  520. for k, p := range v.RegCore.ExtractPos {
  521. if len(pos) > p {
  522. if pos[p] == -1 || pos[p+1] == -1 {
  523. continue
  524. }
  525. val := text[pos[p]:pos[p+1]]
  526. tmps := []map[string]interface{}{}
  527. tmp := map[string]interface{}{
  528. "field": v.Field,
  529. "code": v.Code,
  530. "ruletext": v.RuleText,
  531. "extfrom": extfrom,
  532. "value": val,
  533. "type": "regexp",
  534. "matchtype": "regcontent",
  535. }
  536. tmps = append(tmps, tmp)
  537. extinfo[k] = tmps
  538. if val != "" {
  539. if j.Result[v.Field] == nil {
  540. j.Result[k] = [](*ju.ExtField){}
  541. }
  542. j.Result[k] = append(j.Result[k], &ju.ExtField{k, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  543. }
  544. }
  545. }
  546. }
  547. } else {
  548. pos := v.RegCore.Reg.FindStringIndex(text)
  549. val := ""
  550. if len(pos) == 2 {
  551. text = text[pos[1]:]
  552. rs := regexp.MustCompile("[^\r\n\t]+")
  553. tmp := rs.FindAllString(text, -1)
  554. if len(tmp) > 0 {
  555. val = tmp[0]
  556. }
  557. }
  558. if val != "" {
  559. tmps := []map[string]interface{}{}
  560. tmp := map[string]interface{}{
  561. "field": v.Field,
  562. "code": v.Code,
  563. "ruletext": v.RuleText,
  564. "extfrom": extfrom,
  565. "value": val,
  566. "type": "regexp",
  567. "matchtype": "regcontent",
  568. }
  569. tmps = append(tmps, tmp)
  570. extinfo[v.Field] = tmps
  571. if j.Result[v.Field] == nil {
  572. j.Result[v.Field] = [](*ju.ExtField){}
  573. }
  574. j.Result[v.Field] = append(j.Result[v.Field], &ju.ExtField{v.Field, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  575. }
  576. }
  577. return extinfo
  578. }
  579. //后置过滤
  580. func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
  581. if in.IsLua {
  582. result := GetResultMapForLua(j)
  583. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText}
  584. if j != nil {
  585. lua.Block = j.Block
  586. }
  587. extinfo := lua.RunScript("back")
  588. for k, v := range extinfo {
  589. if tmps, ok := v.([]map[string]interface{}); ok {
  590. j.Result[k] = [](*ju.ExtField){}
  591. for _, tmp := range tmps {
  592. j.Result[k] = append(j.Result[k], &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), qu.ObjToString(tmp["extfrom"]), tmp["value"], 0})
  593. }
  594. }
  595. }
  596. if len(extinfo) > 0 {
  597. AddExtLog("clear", j.SourceMid, result, extinfo, in, t) //抽取日志
  598. }
  599. } else {
  600. extinfo := map[string]interface{}{}
  601. if in.Field != "" {
  602. if j.Result[in.Field] != nil {
  603. tmp := j.Result[in.Field]
  604. exts := []interface{}{}
  605. for k, v := range tmp {
  606. if v.Type == "table" { //table抽取到的数据不清理
  607. continue
  608. }
  609. text := qu.ObjToString(v.Value)
  610. if text != "" {
  611. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  612. }
  613. j.Result[in.Field][k].Value = text
  614. exts = append(exts, map[string]interface{}{
  615. "field": v.Field,
  616. "code": v.Code,
  617. "ruletext": v.RuleText,
  618. "type": v.Type,
  619. "matchtype": v.MatchType,
  620. "extfrom": v.ExtFrom,
  621. "value": text,
  622. })
  623. }
  624. extinfo[in.Field] = exts
  625. if len(extinfo) > 0 {
  626. AddExtLog("clear", j.SourceMid, tmp, extinfo, in, t) //抽取日志
  627. }
  628. }
  629. } else {
  630. for key, tmp := range j.Result {
  631. exts := []interface{}{}
  632. for k, v := range tmp {
  633. if v.Type == "table" { //table抽取到的数据不清理
  634. continue
  635. }
  636. text := qu.ObjToString(v.Value)
  637. if text != "" {
  638. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  639. }
  640. j.Result[key][k].Value = text
  641. exts = append(exts, map[string]interface{}{
  642. "field": v.Field,
  643. "code": v.Code,
  644. "ruletext": v.RuleText,
  645. "type": v.Type,
  646. "matchtype": v.MatchType,
  647. "extfrom": v.ExtFrom,
  648. "value": text,
  649. })
  650. }
  651. extinfo[key] = exts
  652. }
  653. if len(extinfo) > 0 {
  654. AddExtLog("clear", j.SourceMid, j.Result, extinfo, in, t) //抽取日志
  655. }
  656. }
  657. }
  658. }
  659. //获取抽取结果map[string][]interface{},lua脚本使用
  660. func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
  661. result := map[string][]map[string]interface{}{}
  662. for key, val := range j.Result {
  663. if result[key] == nil {
  664. result[key] = []map[string]interface{}{}
  665. }
  666. for _, v := range val {
  667. tmp := map[string]interface{}{
  668. "field": v.Field,
  669. "code": v.Code,
  670. "ruletext": v.RuleText,
  671. "value": v.Value,
  672. "type": v.Type,
  673. "matchtype": v.MatchType,
  674. "extfrom": v.ExtFrom,
  675. }
  676. result[key] = append(result[key], tmp)
  677. }
  678. }
  679. return result
  680. }
  681. //抽取日志
  682. func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) {
  683. if !t.IsEtxLog {
  684. return
  685. }
  686. logdata := map[string]interface{}{
  687. "code": v.Code,
  688. "name": v.Name,
  689. "type": ftype,
  690. "ruletext": v.RuleText,
  691. "islua": v.IsLua,
  692. "field": v.Field,
  693. "version": t.Version,
  694. "taskname": t.Name,
  695. "before": before,
  696. "extinfo": extinfo,
  697. "sid": sid,
  698. "comeintime": time.Now().Unix(),
  699. }
  700. lock.Lock()
  701. ExtLogs[t] = append(ExtLogs[t], logdata)
  702. lock.Unlock()
  703. }
  704. //保存抽取日志
  705. func SaveExtLog() {
  706. tmpLogs := map[*TaskInfo][]map[string]interface{}{}
  707. lock.Lock()
  708. tmpLogs = ExtLogs
  709. ExtLogs = map[*TaskInfo][]map[string]interface{}{}
  710. lock.Unlock()
  711. for k, v := range tmpLogs {
  712. if len(v) < saveLimit {
  713. db.Mgo.SaveBulk(k.TrackColl, v...)
  714. } else {
  715. for {
  716. if len(v) > saveLimit {
  717. tmp := v[:saveLimit]
  718. db.Mgo.SaveBulk(k.TrackColl, tmp...)
  719. v = v[saveLimit:]
  720. } else {
  721. db.Mgo.SaveBulk(k.TrackColl, v...)
  722. break
  723. }
  724. }
  725. }
  726. }
  727. time.AfterFunc(10*time.Second, SaveExtLog)
  728. }
  729. type FieldValue struct {
  730. Value interface{}
  731. Count int
  732. }
  733. //分析抽取结果并保存
  734. func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
  735. doc := j.Data
  736. result := j.Result
  737. _id := qu.BsonIdToSId((*doc)["_id"])
  738. iscore, _ := ju.Config["fieldscore"].(bool)
  739. if iscore { //打分
  740. result = ScoreFields(j)
  741. }
  742. //结果排序
  743. values := map[string][]*ju.SortObject{}
  744. for key, val := range result {
  745. fieldValue := map[string][]interface{}{}
  746. if iscore { //走打分
  747. for _, v := range val {
  748. if len(fmt.Sprint(v.Value)) < 1 {
  749. continue //去除空串
  750. }
  751. fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
  752. }
  753. } else { //不走打分,按出现频次
  754. for _, v := range val {
  755. if len(fmt.Sprint(v.Value)) < 1 {
  756. continue //去除空串
  757. }
  758. if fieldValue[fmt.Sprint(v.Value)] == nil {
  759. fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
  760. } else {
  761. fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
  762. }
  763. }
  764. }
  765. objects := []*ju.SortObject{}
  766. for k, v := range fieldValue {
  767. ValueStr := "" //第二排序
  768. if reflect.TypeOf(v[1]).String() == "string" {
  769. ValueStr = qu.ObjToString(v[1])
  770. }
  771. tmp := &ju.SortObject{
  772. Key: k,
  773. Value: qu.IntAll(v[0]),
  774. Object: v[1],
  775. ValueStr: ValueStr,
  776. }
  777. objects = append(objects, tmp)
  778. }
  779. values[key] = ju.ExtSort(objects)
  780. }
  781. //从排序结果中取值
  782. tmp := map[string]interface{}{} //抽取值
  783. for key, val := range values {
  784. for _, v := range val { //取第一个非负数
  785. if v.Key != "" && v.Value > -1 {
  786. tmp[key] = v.Object
  787. break
  788. }
  789. }
  790. }
  791. if len(j.PackageInfo) > 0 { //分包信息
  792. tmp["package"] = j.PackageInfo
  793. }
  794. if len(j.Winnerorder) > 0 { //候选人信息
  795. tmp["winnerorder"] = j.Winnerorder
  796. }
  797. for k, v := range *doc {
  798. if k == "detail" || k == "contenthtml" {
  799. continue
  800. }
  801. if tmp[k] == nil {
  802. tmp[k] = v
  803. }
  804. }
  805. //质量审核
  806. if ju.Config["qualityaudit"].(bool) {
  807. e.QualityAudit(tmp)
  808. }
  809. if e.IsExtractCity { //城市抽取
  810. b, p, c, d := e.TransmitData(tmp, _id) //抽取省份城市
  811. //log.Println("省份---", p, "城市---", c, "区---", d)
  812. tmp["district"] = d
  813. if b {
  814. tmp["city"] = c
  815. tmp["area"] = p
  816. }
  817. }
  818. if e.TaskInfo.TestColl == "" {
  819. if len(tmp) > 0 { //保存抽取结果
  820. tmparr := []map[string]interface{}{
  821. map[string]interface{}{
  822. "_id": qu.StringTOBsonId(_id),
  823. },
  824. map[string]interface{}{"$set": tmp},
  825. }
  826. e.BidArr = append(e.BidArr, tmparr)
  827. }
  828. if b, ok := ju.Config["saveresult"].(bool); ok && b {
  829. id := tmp["_id"]
  830. tmp["result"] = result
  831. delete(tmp, "_id")
  832. tmparr := []map[string]interface{}{
  833. map[string]interface{}{
  834. "_id": id,
  835. },
  836. map[string]interface{}{"$set": tmp},
  837. }
  838. e.ResultArr = append(e.ResultArr, tmparr)
  839. }
  840. } else { //测试结果
  841. delete(tmp, "_id")
  842. if len(j.BlockPackage) > 0 { //分包详情
  843. bs, _ := json.Marshal(j.BlockPackage)
  844. tmp["epackage"] = string(bs)
  845. }
  846. tmp["result"] = result
  847. b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
  848. if !b {
  849. log.Println(e.TaskInfo.TestColl, _id)
  850. }
  851. }
  852. }
  853. func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
  854. defer qu.Catch()
  855. //获取审核字段
  856. for _, field := range e.AuditFields {
  857. //1.分包
  858. if resulttmp["package"] != nil {
  859. packagedata := resulttmp["package"].(map[string]map[string]interface{})
  860. for _, val := range packagedata {
  861. if val[field] != nil {
  862. fv := qu.ObjToString(val[field])
  863. if fv != "" {
  864. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  865. e.RedisMatch(field, fv, val) //redis匹配
  866. } else { //除了buyer和winner,其他字段走规则匹配
  867. fv := qu.ObjToString(resulttmp[field])
  868. //resulttmp[field+"_isredis"] = false
  869. e.RuleMatch(field, fv, resulttmp)
  870. }
  871. }
  872. }
  873. }
  874. }
  875. //2.外围
  876. if resulttmp[field] != nil {
  877. fv := qu.ObjToString(resulttmp[field])
  878. if fv != "" {
  879. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  880. e.RedisMatch(field, fv, resulttmp) //redis匹配
  881. } else { //除了buyer和winner,其他字段走规则匹配
  882. fv := qu.ObjToString(resulttmp[field])
  883. //resulttmp[field+"_isredis"] = false
  884. e.RuleMatch(field, fv, resulttmp)
  885. }
  886. }
  887. }
  888. }
  889. }
  890. //Redis匹配
  891. func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) {
  892. defer qu.Catch()
  893. i := redis.GetInt(field, field+"_"+fv) //查找redis
  894. if i == 0 { //reids未找到,执行规则匹配
  895. val[field+"_isredis"] = false
  896. e.RuleMatch(field, fv, val) //规则匹配
  897. } else { //redis找到,打标识存库
  898. val[field+"_isredis"] = true
  899. }
  900. }
  901. //规则匹配
  902. func (e *ExtractTask) RuleMatch(field, fieldval string, tmpMap map[string]interface{}) {
  903. defer qu.Catch()
  904. if fieldval != "" {
  905. SMap := e.StartMatch(field, fieldval)
  906. //SMap.AddKey(field+"_isaudit", false)
  907. for _, k := range SMap.Keys {
  908. tmpMap[k] = SMap.Map[k]
  909. }
  910. tmpMap[field+"_isaudit"] = false //添加字段未审核信息
  911. }
  912. }
  913. //开始规则匹配
  914. func (e *ExtractTask) StartMatch(field, text string) *pretreated.SortMap {
  915. defer qu.Catch()
  916. SMap := pretreated.NewSortMap()
  917. lock.Lock()
  918. f := e.RecogFieldMap[field]
  919. lock.Unlock()
  920. if len(f) > 0 {
  921. fid := qu.BsonIdToSId(f["_id"])
  922. recogFieldPreRule := qu.ObjToString(f["s_recogfield_prerule"])
  923. textAfterRecogFieldPrerule := ju.PreFilter(text, recogFieldPreRule) //识别字段的前置过滤
  924. if textAfterRecogFieldPrerule != "" {
  925. lock.Lock()
  926. classMap := e.FidClassMap[fid]
  927. lock.Unlock()
  928. L:
  929. for _, c := range classMap { //class
  930. classid := qu.BsonIdToSId(c["_id"])
  931. classPrerule := qu.ObjToString(c["s_class_prerule"])
  932. savefield := qu.ObjToString(c["s_savefield"]) //保存字段
  933. textAfterClassPrerule := ju.PreFilter(textAfterRecogFieldPrerule, classPrerule) //class的前置过滤
  934. if textAfterClassPrerule != "" {
  935. lock.Lock()
  936. ruleMap := e.CidRuleMap[classid]
  937. lock.Unlock()
  938. for _, r := range ruleMap { //rule
  939. rulePrerule := qu.ObjToString(r["s_rule_prerule"])
  940. s_code := qu.ObjToString(r["s_code"])
  941. rule := r["rule"].([]interface{})
  942. textAfterRulePrerule := ju.PreFilter(textAfterClassPrerule, rulePrerule) //class的前置过滤
  943. if textAfterRulePrerule != "" {
  944. b, _ := ju.RecogAnalyRules(textAfterRulePrerule, rule)
  945. if b { //匹配到一个分类下某个规则时,不再继续匹配
  946. if savefield != "" { //保存字段不为空,存储代码信息
  947. SMap.AddKey(field+"_"+savefield, s_code)
  948. }
  949. break L
  950. }
  951. }
  952. }
  953. }
  954. }
  955. }
  956. }
  957. return SMap
  958. }