extract.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
  1. package extract
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "jy/clear"
  6. db "jy/mongodbutil"
  7. "jy/pretreated"
  8. ju "jy/util"
  9. "log"
  10. qu "qfw/util"
  11. redis "qfw/util/redis"
  12. "reflect"
  13. "regexp"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. lock sync.RWMutex
  22. cut = ju.NewCut() //获取正文并清理
  23. ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
  24. TaskList map[string]*ExtractTask //任务列表
  25. saveLimit = 200 //抽取日志批量保存
  26. PageSize = 5000 //查询分页
  27. Fields = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"subtype":1,"area":1,"city":1,"comeintime":1,"publishtime":1}`
  28. )
  29. //启动测试抽取
  30. func StartExtractTestTask(taskId, startId, num, resultcoll, trackcoll string) bool {
  31. defer qu.Catch()
  32. ext := &ExtractTask{}
  33. ext.Id = taskId
  34. ext.IsRun = true
  35. ext.InitTestTaskInfo(resultcoll, trackcoll)
  36. ext.TaskInfo.DB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  37. ext.InitRulePres()
  38. ext.InitRuleBacks()
  39. ext.InitRuleCore()
  40. ext.InitPkgCore()
  41. ext.InitTag()
  42. ext.InitClearFn()
  43. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  44. //初始化城市DFA信息
  45. ext.InitDFA()
  46. }
  47. //质量审核
  48. ext.InitAuditFields()
  49. ext.InitAuditRule()
  50. ext.InitAuditClass()
  51. ext.InitAuditRecogField()
  52. return RunExtractTestTask(ext, startId, num)
  53. }
  54. func IdTrans(startId string) bson.ObjectId {
  55. defer qu.Catch()
  56. return bson.ObjectIdHex(startId)
  57. }
  58. //开始测试任务抽取
  59. func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
  60. n, _ := strconv.Atoi(num)
  61. id := IdTrans(startId)
  62. if id.Valid() {
  63. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(startId)}}
  64. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n)
  65. for _, v := range *list {
  66. //log.Println(v["_id"])
  67. j := PreInfo(v)
  68. ext.TaskInfo.ProcessPool <- true
  69. go ext.ExtractProcess(j)
  70. }
  71. return true
  72. } else {
  73. return false
  74. }
  75. }
  76. //启动抽取
  77. func StartExtractTaskId(taskId string) bool {
  78. isgo := false
  79. ext := TaskList[taskId]
  80. if ext == nil {
  81. ext = &ExtractTask{}
  82. ext.Id = taskId
  83. ext.InitTaskInfo()
  84. isgo = true
  85. } else {
  86. ext.Id = taskId
  87. ext.InitTaskInfo()
  88. }
  89. ext.TaskInfo.DB = db.MgoFactory(2, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  90. ext.InitRulePres()
  91. ext.InitRuleBacks()
  92. ext.InitRuleCore()
  93. ext.InitPkgCore()
  94. ext.InitTag()
  95. ext.InitClearFn()
  96. if ext.IsExtractCity { //版本上控制是否开始城市抽取
  97. //初始化城市DFA信息
  98. ext.InitDFA()
  99. }
  100. //质量审核
  101. ext.InitAuditFields()
  102. ext.InitAuditRule()
  103. ext.InitAuditClass()
  104. ext.InitAuditRecogField()
  105. ext.IsRun = true
  106. go ext.ResultSave()
  107. go ext.BidSave()
  108. if isgo {
  109. go RunExtractTask(taskId)
  110. }
  111. TaskList[taskId] = ext
  112. return true
  113. }
  114. //停止抽取
  115. func StopExtractTaskId(taskId string) bool {
  116. ext := TaskList[taskId]
  117. if ext != nil {
  118. ext.IsRun = false
  119. TaskList[taskId] = ext
  120. }
  121. //更新task.s_extlastid
  122. db.Mgo.UpdateById("task", taskId, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  123. return true
  124. }
  125. //开始抽取
  126. func RunExtractTask(taskId string) {
  127. ext := TaskList[taskId]
  128. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  129. count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
  130. pageNum := (count + PageSize - 1) / PageSize
  131. limit := PageSize
  132. if count < PageSize {
  133. limit = count
  134. }
  135. log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
  136. for i := 0; i < pageNum; i++ {
  137. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  138. log.Printf("page=%d,query=%v", i+1, query)
  139. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, limit)
  140. for _, v := range *list {
  141. //log.Println(v["_id"])
  142. if !ext.IsRun {
  143. break
  144. }
  145. j := PreInfo(v)
  146. ext.TaskInfo.ProcessPool <- true
  147. go ext.ExtractProcess(j)
  148. ext.TaskInfo.LastExtId = qu.BsonIdToSId(v["_id"])
  149. }
  150. db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  151. if !ext.IsRun {
  152. break
  153. }
  154. }
  155. //更新task.s_extlastid
  156. time.AfterFunc(1*time.Minute, func() { RunExtractTask(taskId) })
  157. }
  158. //信息预处理
  159. func PreInfo(doc map[string]interface{}) *ju.Job {
  160. detail := ""
  161. d1, _ := doc["detail"].(string)
  162. d2, _ := doc["contenthtml"].(string)
  163. if len(d1) >= len(d2) || d2 == "" {
  164. detail = d1
  165. } else {
  166. detail = d2
  167. }
  168. detail = ju.CutLableStr(detail)
  169. detail = cut.ClearHtml(detail)
  170. doc["detail"] = detail
  171. href := qu.ObjToString(doc["href"])
  172. if strings.HasPrefix(href, "http://") {
  173. href = href[7:]
  174. } else if strings.HasPrefix(href, "https://") {
  175. href = href[8:]
  176. }
  177. pos := strings.Index(href, "/")
  178. if pos > 0 {
  179. href = href[:pos]
  180. }
  181. doc["domain"] = href
  182. toptype := qu.ObjToString(doc["toptype"])
  183. if qu.ObjToString(doc["type"]) == "bid" {
  184. toptype = "结果"
  185. }
  186. if toptype == "" {
  187. toptype = "*"
  188. }
  189. j := &ju.Job{
  190. SourceMid: qu.BsonIdToSId(doc["_id"]),
  191. Category: toptype,
  192. Content: qu.ObjToString(doc["detail"]),
  193. SpiderCode: qu.ObjToString(doc["spidercode"]),
  194. Domain: qu.ObjToString(doc["domain"]),
  195. Href: qu.ObjToString(doc["href"]),
  196. Title: qu.ObjToString(doc["title"]),
  197. Data: &doc,
  198. City: qu.ObjToString(doc["city"]),
  199. Province: qu.ObjToString(doc["area"]),
  200. Result: map[string][]*ju.ExtField{},
  201. BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
  202. }
  203. qu.Try(func() {
  204. pretreated.AnalyStart(j)
  205. }, func(err interface{}) {
  206. log.Println("pretreated.AnalyStart", err)
  207. })
  208. return j
  209. }
  210. //抽取
  211. func (e *ExtractTask) ExtractProcess(j *ju.Job) {
  212. qu.Try(func() {
  213. doc := *j.Data
  214. //全局前置规则,结果覆盖doc属性
  215. for _, v := range e.RulePres {
  216. doc = ExtRegPre(doc, j, v, e.TaskInfo)
  217. }
  218. //抽取规则
  219. for _, vc := range e.RuleCores {
  220. tmp := ju.DeepCopy(doc).(map[string]interface{})
  221. //是否进入逻辑
  222. if !ju.Logic(vc.LuaLogic, tmp) {
  223. continue
  224. }
  225. //抽取-前置规则
  226. for _, v := range vc.RulePres {
  227. tmp = ExtRegPre(tmp, j, v, e.TaskInfo)
  228. }
  229. //log.Println("抽取-前置规则", tmp)
  230. //抽取-规则
  231. for _, v := range vc.RuleCores {
  232. ExtRegCore(vc.ExtFrom, tmp, j, v, e)
  233. }
  234. //log.Println("抽取-规则", tmp)
  235. //抽取-后置规则
  236. for _, v := range vc.RuleBacks {
  237. ExtRegBack(j, v, e.TaskInfo)
  238. }
  239. //log.Println("抽取-后置规则", tmp)
  240. }
  241. //全局后置规则
  242. for _, v := range e.RuleBacks {
  243. ExtRegBack(j, v, e.TaskInfo)
  244. }
  245. //候选人加入
  246. if len(j.Winnerorder) > 0 {
  247. winner := &ju.ExtField{
  248. Field: "winner",
  249. Code: "",
  250. RuleText: "",
  251. Type: "winnerorder",
  252. MatchType: "winnerorder",
  253. ExtFrom: "",
  254. Value: j.Winnerorder[0]["entname"],
  255. Score: 0,
  256. }
  257. if len([]rune(qu.ObjToString(j.Winnerorder[0]["entname"]))) < 4 {
  258. winner.Score = -5
  259. }
  260. winners := j.Result["winner"]
  261. if winners != nil {
  262. winners = append(winners, winner)
  263. } else {
  264. winners = []*ju.ExtField{}
  265. winners = append(winners, winner)
  266. }
  267. j.Result["winner"] = winners
  268. }
  269. //函数清理
  270. for key, val := range j.Result {
  271. for _, v := range val {
  272. lock.Lock()
  273. cfn := e.ClearFn[key]
  274. lock.Unlock()
  275. data := clear.DoClearFn(cfn, []interface{}{v.Value, j.Content})
  276. v.Value = data[0]
  277. //清理特殊符号
  278. if clear.AsyField[key] != nil || clear.SymField[key] != nil ||
  279. clear.MesField[key] != nil {
  280. text := qu.ObjToString(v.Value)
  281. text = clear.OtherClean(key, text)
  282. v.Value = text
  283. }
  284. }
  285. }
  286. PackageDetail(j, e) //处理分包信息
  287. // bs, _ := json.Marshal(j.Result)
  288. // log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
  289. //分析抽取结果并保存 todo
  290. AnalysisSaveResult(j, e)
  291. }, func(err interface{}) {
  292. log.Println((*j.Data)["_id"], err)
  293. <-e.TaskInfo.ProcessPool
  294. })
  295. <-e.TaskInfo.ProcessPool
  296. }
  297. //前置过滤
  298. func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} {
  299. before := ju.DeepCopy(doc).(map[string]interface{})
  300. extinfo := map[string]interface{}{}
  301. if in.IsLua {
  302. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  303. if j != nil {
  304. lua.Block = j.Block
  305. }
  306. extinfo = lua.RunScript("pre")
  307. for k, v := range extinfo { //结果覆盖原doc
  308. doc[k] = v
  309. }
  310. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  311. } else {
  312. key := qu.If(in.Field == "", "detail", in.Field).(string)
  313. text := qu.ObjToString(doc[key])
  314. extinfo[key] = in.RegPreBac.Reg.ReplaceAllString(text, "")
  315. doc[key] = extinfo[key] //结果覆盖原doc
  316. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  317. }
  318. return doc
  319. }
  320. //抽取-规则
  321. func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) {
  322. if in.IsLua {
  323. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  324. lua.KvMap = getKvByLuaFields(extfrom, j, in, et.Tag)
  325. lua.Block = j.Block
  326. extinfo := lua.RunScript("core")
  327. for k, v := range extinfo {
  328. if k == in.Field {
  329. if j.Result[k] == nil {
  330. j.Result[k] = [](*ju.ExtField){}
  331. }
  332. if tmps, ok := v.([]map[string]interface{}); ok {
  333. for _, tmp := range tmps {
  334. j.Result[k] = append(j.Result[k],
  335. &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), extfrom, tmp["value"], 0})
  336. }
  337. }
  338. }
  339. }
  340. if len(extinfo) > 0 {
  341. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  342. }
  343. } else {
  344. //全文正则
  345. text := qu.ObjToString(doc[extfrom])
  346. if in.Field != "" {
  347. extinfo := extRegCoreToResult(extfrom, text, j, in)
  348. if len(extinfo) > 0 {
  349. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  350. }
  351. }
  352. }
  353. }
  354. //lua脚本根据属性设置提取kv值
  355. func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} {
  356. kvmap := map[string][]map[string]interface{}{}
  357. for fieldname, field := range in.LFields {
  358. lock.Lock()
  359. tags := t[field] //获取对应标签库
  360. lock.Unlock()
  361. for _, bl := range j.Block {
  362. //冒号kv
  363. if bl.ColonKV != nil {
  364. kvs := bl.ColonKV.Kvs
  365. kvs2 := bl.ColonKV.Kvs_2
  366. //log.Println("ColonKV1", kvs)
  367. //log.Println("ColonKV2", kvs2)
  368. for _, tag := range tags {
  369. for _, kv := range kvs {
  370. if tag.Type == "string" {
  371. if kv.Key == tag.Key {
  372. text := ju.TrimLRSpace(kv.Value, "")
  373. if text != "" {
  374. kvmap[field] = append(kvmap[field], map[string]interface{}{
  375. "field": field,
  376. "code": in.Code,
  377. "ruletext": tag.Key,
  378. "extfrom": extfrom,
  379. "value": text,
  380. "type": "colon1",
  381. "matchtype": "tag_string",
  382. })
  383. }
  384. break
  385. }
  386. } else if tag.Type == "regexp" {
  387. if tag.Reg.MatchString(kv.Key) {
  388. text := ju.TrimLRSpace(kv.Value, "")
  389. if text != "" {
  390. kvmap[field] = append(kvmap[field], map[string]interface{}{
  391. "field": field,
  392. "code": in.Code,
  393. "ruletext": tag.Key,
  394. "extfrom": extfrom,
  395. "value": text,
  396. "type": "colon1",
  397. "matchtype": "tag_regexp",
  398. })
  399. }
  400. break
  401. }
  402. }
  403. }
  404. for _, kv := range kvs2 {
  405. if tag.Type == "string" {
  406. if kv.Key == tag.Key {
  407. text := ju.TrimLRSpace(kv.Value, "")
  408. if text != "" {
  409. kvmap[field] = append(kvmap[field], map[string]interface{}{
  410. "field": field,
  411. "code": in.Code,
  412. "ruletext": tag.Key,
  413. "extfrom": extfrom,
  414. "value": text,
  415. "type": "colon2",
  416. "matchtype": "tag_string",
  417. })
  418. }
  419. break
  420. }
  421. } else if tag.Type == "regexp" {
  422. if tag.Reg.MatchString(kv.Key) {
  423. text := ju.TrimLRSpace(kv.Value, "")
  424. if text != "" {
  425. kvmap[field] = append(kvmap[field], map[string]interface{}{
  426. "field": field,
  427. "code": in.Code,
  428. "ruletext": tag.Key,
  429. "extfrom": extfrom,
  430. "value": text,
  431. "type": "colon2",
  432. "matchtype": "tag_regexp",
  433. })
  434. }
  435. break
  436. }
  437. }
  438. }
  439. }
  440. }
  441. //空格kv
  442. if bl.SpaceKV != nil {
  443. kvs := bl.SpaceKV.Kvs
  444. //log.Println("SpaceKV", kvs)
  445. for _, tag := range tags {
  446. for _, kv := range kvs {
  447. if tag.Type == "string" {
  448. if kv.Key == tag.Key {
  449. text := ju.TrimLRSpace(kv.Value, "")
  450. if text != "" {
  451. kvmap[field] = append(kvmap[field], map[string]interface{}{
  452. "field": field,
  453. "code": in.Code,
  454. "ruletext": tag.Key,
  455. "extfrom": extfrom,
  456. "value": text,
  457. "type": "space",
  458. "matchtype": "tag_string",
  459. })
  460. }
  461. break
  462. }
  463. } else if tag.Type == "regexp" {
  464. if tag.Reg.MatchString(kv.Key) {
  465. text := ju.TrimLRSpace(kv.Value, "")
  466. if text != "" {
  467. kvmap[field] = append(kvmap[field], map[string]interface{}{
  468. "field": field,
  469. "code": in.Code,
  470. "ruletext": tag.Key,
  471. "extfrom": extfrom,
  472. "value": text,
  473. "type": "space",
  474. "matchtype": "tag_regexp",
  475. })
  476. }
  477. break
  478. }
  479. }
  480. }
  481. }
  482. }
  483. //表格kv
  484. if bl.TableKV != nil {
  485. tkv := bl.TableKV
  486. //log.Println("tkv", tkv)
  487. for k, v := range tkv.Kv {
  488. if k == fieldname {
  489. if len(tags) > -tkv.KvIndex[fieldname] {
  490. ruletext := ""
  491. if fieldname == "项目名称" && -tkv.KvIndex[fieldname] == -100 {
  492. ruletext = "项目名称"
  493. } else {
  494. ruletext = tags[-tkv.KvIndex[fieldname]].Key
  495. }
  496. kvmap[field] = append(kvmap[field], map[string]interface{}{
  497. "field": field,
  498. "code": in.Code,
  499. "ruletext": ruletext,
  500. "extfrom": "table",
  501. "value": v,
  502. "type": "table",
  503. "matchtype": "tag_string",
  504. })
  505. } else { //涉及其他待处理
  506. //log.Println(tags)
  507. }
  508. }
  509. }
  510. }
  511. }
  512. }
  513. return kvmap
  514. }
  515. //正则提取结果
  516. func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} {
  517. extinfo := map[string][]map[string]interface{}{}
  518. if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
  519. apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
  520. if len(apos) > 0 {
  521. pos := apos[0]
  522. for k, p := range v.RegCore.ExtractPos {
  523. if len(pos) > p {
  524. if pos[p] == -1 || pos[p+1] == -1 {
  525. continue
  526. }
  527. val := text[pos[p]:pos[p+1]]
  528. tmps := []map[string]interface{}{}
  529. tmp := map[string]interface{}{
  530. "field": v.Field,
  531. "code": v.Code,
  532. "ruletext": v.RuleText,
  533. "extfrom": extfrom,
  534. "value": val,
  535. "type": "regexp",
  536. "matchtype": "regcontent",
  537. }
  538. tmps = append(tmps, tmp)
  539. extinfo[k] = tmps
  540. if val != "" {
  541. if j.Result[v.Field] == nil {
  542. j.Result[k] = [](*ju.ExtField){}
  543. }
  544. j.Result[k] = append(j.Result[k], &ju.ExtField{k, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  545. }
  546. }
  547. }
  548. }
  549. } else {
  550. pos := v.RegCore.Reg.FindStringIndex(text)
  551. val := ""
  552. if len(pos) == 2 {
  553. text = text[pos[1]:]
  554. rs := regexp.MustCompile("[^\r\n\t]+")
  555. tmp := rs.FindAllString(text, -1)
  556. if len(tmp) > 0 {
  557. val = tmp[0]
  558. }
  559. }
  560. if val != "" {
  561. tmps := []map[string]interface{}{}
  562. tmp := map[string]interface{}{
  563. "field": v.Field,
  564. "code": v.Code,
  565. "ruletext": v.RuleText,
  566. "extfrom": extfrom,
  567. "value": val,
  568. "type": "regexp",
  569. "matchtype": "regcontent",
  570. }
  571. tmps = append(tmps, tmp)
  572. extinfo[v.Field] = tmps
  573. if j.Result[v.Field] == nil {
  574. j.Result[v.Field] = [](*ju.ExtField){}
  575. }
  576. j.Result[v.Field] = append(j.Result[v.Field], &ju.ExtField{v.Field, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  577. }
  578. }
  579. return extinfo
  580. }
  581. //后置过滤
  582. func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
  583. if in.IsLua {
  584. result := GetResultMapForLua(j)
  585. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText}
  586. if j != nil {
  587. lua.Block = j.Block
  588. }
  589. extinfo := lua.RunScript("back")
  590. for k, v := range extinfo {
  591. if tmps, ok := v.([]map[string]interface{}); ok {
  592. j.Result[k] = [](*ju.ExtField){}
  593. for _, tmp := range tmps {
  594. j.Result[k] = append(j.Result[k], &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), qu.ObjToString(tmp["extfrom"]), tmp["value"], 0})
  595. }
  596. }
  597. }
  598. if len(extinfo) > 0 {
  599. AddExtLog("clear", j.SourceMid, result, extinfo, in, t) //抽取日志
  600. }
  601. } else {
  602. extinfo := map[string]interface{}{}
  603. if in.Field != "" {
  604. if j.Result[in.Field] != nil {
  605. tmp := j.Result[in.Field]
  606. exts := []interface{}{}
  607. for k, v := range tmp {
  608. if v.Type == "table" { //table抽取到的数据不清理
  609. continue
  610. }
  611. text := qu.ObjToString(v.Value)
  612. if text != "" {
  613. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  614. }
  615. j.Result[in.Field][k].Value = text
  616. exts = append(exts, map[string]interface{}{
  617. "field": v.Field,
  618. "code": v.Code,
  619. "ruletext": v.RuleText,
  620. "type": v.Type,
  621. "matchtype": v.MatchType,
  622. "extfrom": v.ExtFrom,
  623. "value": text,
  624. })
  625. }
  626. extinfo[in.Field] = exts
  627. if len(extinfo) > 0 {
  628. AddExtLog("clear", j.SourceMid, tmp, extinfo, in, t) //抽取日志
  629. }
  630. }
  631. } else {
  632. for key, tmp := range j.Result {
  633. exts := []interface{}{}
  634. for k, v := range tmp {
  635. if v.Type == "table" { //table抽取到的数据不清理
  636. continue
  637. }
  638. text := qu.ObjToString(v.Value)
  639. if text != "" {
  640. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  641. }
  642. j.Result[key][k].Value = text
  643. exts = append(exts, map[string]interface{}{
  644. "field": v.Field,
  645. "code": v.Code,
  646. "ruletext": v.RuleText,
  647. "type": v.Type,
  648. "matchtype": v.MatchType,
  649. "extfrom": v.ExtFrom,
  650. "value": text,
  651. })
  652. }
  653. extinfo[key] = exts
  654. }
  655. if len(extinfo) > 0 {
  656. AddExtLog("clear", j.SourceMid, j.Result, extinfo, in, t) //抽取日志
  657. }
  658. }
  659. }
  660. }
  661. //获取抽取结果map[string][]interface{},lua脚本使用
  662. func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
  663. result := map[string][]map[string]interface{}{}
  664. for key, val := range j.Result {
  665. if result[key] == nil {
  666. result[key] = []map[string]interface{}{}
  667. }
  668. for _, v := range val {
  669. tmp := map[string]interface{}{
  670. "field": v.Field,
  671. "code": v.Code,
  672. "ruletext": v.RuleText,
  673. "value": v.Value,
  674. "type": v.Type,
  675. "matchtype": v.MatchType,
  676. "extfrom": v.ExtFrom,
  677. }
  678. result[key] = append(result[key], tmp)
  679. }
  680. }
  681. return result
  682. }
  683. //抽取日志
  684. func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) {
  685. if !t.IsEtxLog {
  686. return
  687. }
  688. logdata := map[string]interface{}{
  689. "code": v.Code,
  690. "name": v.Name,
  691. "type": ftype,
  692. "ruletext": v.RuleText,
  693. "islua": v.IsLua,
  694. "field": v.Field,
  695. "version": t.Version,
  696. "taskname": t.Name,
  697. "before": before,
  698. "extinfo": extinfo,
  699. "sid": sid,
  700. "comeintime": time.Now().Unix(),
  701. }
  702. lock.Lock()
  703. ExtLogs[t] = append(ExtLogs[t], logdata)
  704. lock.Unlock()
  705. }
  706. //保存抽取日志
  707. func SaveExtLog() {
  708. tmpLogs := map[*TaskInfo][]map[string]interface{}{}
  709. lock.Lock()
  710. tmpLogs = ExtLogs
  711. ExtLogs = map[*TaskInfo][]map[string]interface{}{}
  712. lock.Unlock()
  713. for k, v := range tmpLogs {
  714. if len(v) < saveLimit {
  715. db.Mgo.SaveBulk(k.TrackColl, v...)
  716. } else {
  717. for {
  718. if len(v) > saveLimit {
  719. tmp := v[:saveLimit]
  720. db.Mgo.SaveBulk(k.TrackColl, tmp...)
  721. v = v[saveLimit:]
  722. } else {
  723. db.Mgo.SaveBulk(k.TrackColl, v...)
  724. break
  725. }
  726. }
  727. }
  728. }
  729. time.AfterFunc(10*time.Second, SaveExtLog)
  730. }
  731. type FieldValue struct {
  732. Value interface{}
  733. Count int
  734. }
  735. //分析抽取结果并保存
  736. func AnalysisSaveResult(j *ju.Job, e *ExtractTask) {
  737. doc := j.Data
  738. result := j.Result
  739. _id := qu.BsonIdToSId((*doc)["_id"])
  740. iscore, _ := ju.Config["fieldscore"].(bool)
  741. if iscore { //打分
  742. result = ScoreFields(j)
  743. }
  744. //结果排序
  745. values := map[string][]*ju.SortObject{}
  746. for key, val := range result {
  747. fieldValue := map[string][]interface{}{}
  748. if iscore { //走打分
  749. for _, v := range val {
  750. if len(fmt.Sprint(v.Value)) < 1 {
  751. continue //去除空串
  752. }
  753. fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
  754. }
  755. } else { //不走打分,按出现频次
  756. for _, v := range val {
  757. if len(fmt.Sprint(v.Value)) < 1 {
  758. continue //去除空串
  759. }
  760. if fieldValue[fmt.Sprint(v.Value)] == nil {
  761. fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
  762. } else {
  763. fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
  764. }
  765. }
  766. }
  767. objects := []*ju.SortObject{}
  768. for k, v := range fieldValue {
  769. ValueStr := "" //第二排序
  770. if reflect.TypeOf(v[1]).String() == "string" {
  771. ValueStr = qu.ObjToString(v[1])
  772. }
  773. tmp := &ju.SortObject{
  774. Key: k,
  775. Value: qu.IntAll(v[0]),
  776. Object: v[1],
  777. ValueStr: ValueStr,
  778. }
  779. objects = append(objects, tmp)
  780. }
  781. values[key] = ju.ExtSort(objects)
  782. }
  783. //从排序结果中取值
  784. tmp := map[string]interface{}{} //抽取值
  785. for key, val := range values {
  786. for _, v := range val { //取第一个非负数
  787. if v.Key != "" && v.Value > -1 {
  788. tmp[key] = v.Object
  789. break
  790. }
  791. }
  792. }
  793. if len(j.PackageInfo) > 0 { //分包信息
  794. tmp["package"] = j.PackageInfo
  795. }
  796. if len(j.Winnerorder) > 0 { //候选人信息
  797. tmp["winnerorder"] = j.Winnerorder
  798. }
  799. for k, v := range *doc {
  800. if k == "detail" || k == "contenthtml" {
  801. continue
  802. }
  803. if tmp[k] == nil {
  804. tmp[k] = v
  805. }
  806. }
  807. //质量审核
  808. if ju.Config["qualityaudit"].(bool) {
  809. e.QualityAudit(tmp)
  810. }
  811. if e.IsExtractCity { //城市抽取
  812. b, p, c, d := e.TransmitData(tmp, _id) //抽取省份城市
  813. //log.Println("省份---", p, "城市---", c, "区---", d)
  814. tmp["district"] = d
  815. if b {
  816. tmp["city"] = c
  817. tmp["area"] = p
  818. }
  819. }
  820. if e.TaskInfo.TestColl == "" {
  821. if len(tmp) > 0 { //保存抽取结果
  822. tmparr := []map[string]interface{}{
  823. map[string]interface{}{
  824. "_id": qu.StringTOBsonId(_id),
  825. },
  826. map[string]interface{}{"$set": tmp},
  827. }
  828. e.BidArr = append(e.BidArr, tmparr)
  829. }
  830. if b, ok := ju.Config["saveresult"].(bool); ok && b {
  831. id := tmp["_id"]
  832. tmp["result"] = result
  833. delete(tmp, "_id")
  834. tmparr := []map[string]interface{}{
  835. map[string]interface{}{
  836. "_id": id,
  837. },
  838. map[string]interface{}{"$set": tmp},
  839. }
  840. e.ResultArr = append(e.ResultArr, tmparr)
  841. }
  842. } else { //测试结果
  843. delete(tmp, "_id")
  844. if len(j.BlockPackage) > 0 { //分包详情
  845. bs, _ := json.Marshal(j.BlockPackage)
  846. tmp["epackage"] = string(bs)
  847. }
  848. tmp["result"] = result
  849. b := db.Mgo.Update(e.TaskInfo.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
  850. if !b {
  851. log.Println(e.TaskInfo.TestColl, _id)
  852. }
  853. }
  854. }
  855. func (e *ExtractTask) QualityAudit(resulttmp map[string]interface{}) {
  856. defer qu.Catch()
  857. //获取审核字段
  858. for _, field := range e.AuditFields {
  859. //1.分包
  860. if resulttmp["package"] != nil {
  861. packagedata := resulttmp["package"].(map[string]map[string]interface{})
  862. for _, val := range packagedata {
  863. if val[field] != nil {
  864. fv := qu.ObjToString(val[field])
  865. if fv != "" {
  866. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  867. e.RedisMatch(field, fv, val) //redis匹配
  868. } else { //除了buyer和winner,其他字段走规则匹配
  869. e.RuleMatch(field, fv, val)
  870. }
  871. }
  872. }
  873. }
  874. }
  875. //2.外围
  876. if resulttmp[field] != nil {
  877. fv := qu.ObjToString(resulttmp[field])
  878. if fv != "" {
  879. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  880. e.RedisMatch(field, fv, resulttmp) //redis匹配
  881. } else { //除了buyer和winner,其他字段走规则匹配
  882. e.RuleMatch(field, fv, resulttmp)
  883. }
  884. }
  885. }
  886. }
  887. }
  888. //Redis匹配
  889. func (e *ExtractTask) RedisMatch(field, fv string, val map[string]interface{}) {
  890. defer qu.Catch()
  891. i := redis.GetInt(field, field+"_"+fv) //查找redis
  892. if i == 0 { //reids未找到,执行规则匹配
  893. val[field+"_isredis"] = false
  894. e.RuleMatch(field, fv, val) //规则匹配
  895. } else { //redis找到,打标识存库
  896. val[field+"_isredis"] = true
  897. }
  898. }
  899. //规则匹配
  900. func (e *ExtractTask) RuleMatch(field, fieldval string, tmpMap map[string]interface{}) {
  901. defer qu.Catch()
  902. if fieldval != "" {
  903. SMap := e.StartMatch(field, fieldval)
  904. //SMap.AddKey(field+"_isaudit", false)
  905. for _, k := range SMap.Keys {
  906. tmpMap[k] = SMap.Map[k]
  907. }
  908. tmpMap[field+"_isaudit"] = false //添加字段未审核信息
  909. }
  910. }
  911. //开始规则匹配
  912. func (e *ExtractTask) StartMatch(field, text string) *pretreated.SortMap {
  913. defer qu.Catch()
  914. SMap := pretreated.NewSortMap()
  915. lock.Lock()
  916. f := e.RecogFieldMap[field]
  917. lock.Unlock()
  918. if len(f) > 0 {
  919. fid := qu.BsonIdToSId(f["_id"])
  920. recogFieldPreRule := qu.ObjToString(f["s_recogfield_prerule"])
  921. textAfterRecogFieldPrerule := ju.PreFilter(text, recogFieldPreRule) //识别字段的前置过滤
  922. if textAfterRecogFieldPrerule != "" {
  923. lock.Lock()
  924. classMap := e.FidClassMap[fid]
  925. lock.Unlock()
  926. L:
  927. for _, c := range classMap { //class
  928. classid := qu.BsonIdToSId(c["_id"])
  929. classPrerule := qu.ObjToString(c["s_class_prerule"])
  930. savefield := qu.ObjToString(c["s_savefield"]) //保存字段
  931. textAfterClassPrerule := ju.PreFilter(textAfterRecogFieldPrerule, classPrerule) //class的前置过滤
  932. if textAfterClassPrerule != "" {
  933. lock.Lock()
  934. ruleMap := e.CidRuleMap[classid]
  935. lock.Unlock()
  936. for _, r := range ruleMap { //rule
  937. rulePrerule := qu.ObjToString(r["s_rule_prerule"])
  938. s_code := qu.ObjToString(r["s_code"])
  939. rule := r["rule"].([]interface{})
  940. textAfterRulePrerule := ju.PreFilter(textAfterClassPrerule, rulePrerule) //class的前置过滤
  941. if textAfterRulePrerule != "" {
  942. b, _ := ju.RecogAnalyRules(textAfterRulePrerule, rule)
  943. if b { //匹配到一个分类下某个规则时,不再继续匹配
  944. if savefield != "" { //保存字段不为空,存储代码信息
  945. SMap.AddKey(field+"_"+savefield, s_code)
  946. }
  947. break L
  948. }
  949. }
  950. }
  951. }
  952. }
  953. }
  954. }
  955. return SMap
  956. }