extract.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. package extract
  2. import (
  3. "fmt"
  4. "jy/clear"
  5. db "jy/mongodbutil"
  6. "jy/pretreated"
  7. ju "jy/util"
  8. "log"
  9. qu "qfw/util"
  10. redis "qfw/util/redis"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "gopkg.in/mgo.v2/bson"
  17. )
  18. var (
  19. lock sync.RWMutex
  20. cut = ju.NewCut() //获取正文并清理
  21. ExtLogs map[*TaskInfo][]map[string]interface{} //抽取日志
  22. TaskList map[string]*ExtractTask //任务列表
  23. saveLimit = 200 //抽取日志批量保存
  24. PageSize = 5000 //查询分页
  25. Fields = `{"title":1,"detail":1,"contenthtml":1,"href":1,"site":1,"spidercode":1,"toptype":1,"area":1,"city":1}`
  26. AuditFields = []string{} //需要审核的字段名称
  27. )
  28. //启动测试抽取
  29. func StartExtractTestTask(taskId, startId, num, resultcoll, trackcoll string) bool {
  30. defer qu.Catch()
  31. ext := &ExtractTask{}
  32. ext.Id = taskId
  33. ext.IsRun = true
  34. ext.InitTestTaskInfo(resultcoll, trackcoll)
  35. ext.TaskInfo.DB = db.MgoFactory(1, 3, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  36. ext.InitRulePres()
  37. ext.InitRuleBacks()
  38. ext.InitRuleCore()
  39. ext.InitTag()
  40. ext.InitClearFn()
  41. //城市
  42. ext.InitProvince()
  43. ext.InitCityAll()
  44. ext.InitCitySim()
  45. InitDFA()
  46. //质量审核
  47. InitAuditRule()
  48. InitAuditClass()
  49. InitAuditRecogField()
  50. return RunExtractTestTask(ext, startId, num)
  51. }
  52. func IdTrans(startId string) bson.ObjectId {
  53. defer qu.Catch()
  54. return bson.ObjectIdHex(startId)
  55. }
  56. //开始测试任务抽取
  57. func RunExtractTestTask(ext *ExtractTask, startId, num string) bool {
  58. n, _ := strconv.Atoi(num)
  59. id := IdTrans(startId)
  60. if id.Valid() {
  61. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(startId)}}
  62. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, n)
  63. for _, v := range *list {
  64. j := PreInfo(v)
  65. ext.TaskInfo.ProcessPool <- true
  66. go ext.ExtractProcess(j)
  67. }
  68. return true
  69. } else {
  70. return false
  71. }
  72. }
  73. //启动抽取
  74. func StartExtractTaskId(taskId string) bool {
  75. isgo := false
  76. ext := TaskList[taskId]
  77. if ext == nil {
  78. ext = &ExtractTask{}
  79. ext.Id = taskId
  80. ext.InitTaskInfo()
  81. isgo = true
  82. } else {
  83. ext.Id = taskId
  84. ext.InitTaskInfo()
  85. }
  86. ext.TaskInfo.DB = db.MgoFactory(10, 30, 120, ext.TaskInfo.FromDbAddr, ext.TaskInfo.FromDB)
  87. ext.InitRulePres()
  88. ext.InitRuleBacks()
  89. ext.InitRuleCore()
  90. ext.InitTag()
  91. ext.InitClearFn()
  92. //城市
  93. ext.InitProvince()
  94. ext.InitCityAll()
  95. ext.InitCitySim()
  96. InitDFA()
  97. //质量审核
  98. InitAuditRule()
  99. InitAuditClass()
  100. InitAuditRecogField()
  101. ext.IsRun = true
  102. if isgo {
  103. go RunExtractTask(taskId)
  104. }
  105. TaskList[taskId] = ext
  106. return true
  107. }
  108. //停止抽取
  109. func StopExtractTaskId(taskId string) bool {
  110. ext := TaskList[taskId]
  111. if ext != nil {
  112. ext.IsRun = false
  113. TaskList[taskId] = ext
  114. }
  115. //更新task.s_extlastid
  116. db.Mgo.UpdateById("task", taskId, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  117. return true
  118. }
  119. //开始抽取
  120. func RunExtractTask(taskId string) {
  121. ext := TaskList[taskId]
  122. query := bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  123. count := ext.TaskInfo.DB.Count(ext.TaskInfo.FromColl, query)
  124. pageNum := (count + PageSize - 1) / PageSize
  125. log.Printf("count=%d,pageNum=%d,query=%v", count, pageNum, query)
  126. for i := 0; i < pageNum; i++ {
  127. query = bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(ext.TaskInfo.LastExtId)}}
  128. log.Printf("page=%d,query=%v", i+1, query)
  129. list, _ := ext.TaskInfo.DB.Find(ext.TaskInfo.FromColl, query, nil, Fields, false, 0, PageSize)
  130. for _, v := range *list {
  131. log.Println(v["_id"])
  132. if !ext.IsRun {
  133. break
  134. }
  135. j := PreInfo(v)
  136. ext.TaskInfo.ProcessPool <- true
  137. go ext.ExtractProcess(j)
  138. ext.TaskInfo.LastExtId = qu.BsonIdToSId(v["_id"])
  139. }
  140. db.Mgo.UpdateById("task", ext.Id, `{"$set":{"s_extlastid":"`+ext.TaskInfo.LastExtId+`"}}`)
  141. if !ext.IsRun {
  142. break
  143. }
  144. }
  145. //更新task.s_extlastid
  146. time.AfterFunc(1*time.Minute, func() { RunExtractTask(taskId) })
  147. }
  148. //信息预处理
  149. func PreInfo(doc map[string]interface{}) *ju.Job {
  150. detail := ""
  151. d1, _ := doc["detail"].(string)
  152. d2, _ := doc["contenthtml"].(string)
  153. if len(d1) >= len(d2) || d2 == "" {
  154. detail = d1
  155. } else {
  156. detail = d2
  157. }
  158. detail = ju.CutLableStr(detail)
  159. detail = cut.ClearHtml(detail)
  160. doc["detail"] = detail
  161. href := qu.ObjToString(doc["href"])
  162. if strings.HasPrefix(href, "http://") {
  163. href = href[7:]
  164. } else if strings.HasPrefix(href, "https://") {
  165. href = href[8:]
  166. }
  167. pos := strings.Index(href, "/")
  168. if pos > 0 {
  169. href = href[:pos]
  170. }
  171. doc["domain"] = href
  172. toptype := qu.ObjToString(doc["toptype"])
  173. if qu.ObjToString(doc["type"]) == "bid" {
  174. toptype = "结果"
  175. }
  176. if toptype == "" {
  177. toptype = "*"
  178. }
  179. j := &ju.Job{
  180. SourceMid: qu.BsonIdToSId(doc["_id"]),
  181. Category: toptype,
  182. Content: qu.ObjToString(doc["detail"]),
  183. SpiderCode: qu.ObjToString(doc["spidercode"]),
  184. Domain: qu.ObjToString(doc["domain"]),
  185. Href: qu.ObjToString(doc["href"]),
  186. Title: qu.ObjToString(doc["title"]),
  187. Data: &doc,
  188. City: qu.ObjToString(doc["city"]),
  189. Province: qu.ObjToString(doc["area"]),
  190. Result: map[string][]*ju.ExtField{},
  191. BuyerAddr: qu.ObjToString(doc["buyeraddr"]),
  192. }
  193. pretreated.AnalyStart(j)
  194. return j
  195. }
  196. //抽取
  197. func (e *ExtractTask) ExtractProcess(j *ju.Job) {
  198. // for _, bl := range j.Block {
  199. // log.Println(bl.ColonKV.Kv)
  200. // }
  201. // for k, v := range j.BlockPackage {
  202. // //bs, _ := json.Marshal(v.TableKV)
  203. // log.Println(k, v.WinnerOrder)
  204. // }
  205. //log.Println("Winnerorder", j.Winnerorder)
  206. qu.Try(func() {
  207. doc := *j.Data
  208. //全局前置规则,结果覆盖doc属性
  209. for _, v := range e.RulePres {
  210. doc = ExtRegPre(doc, j, v, e.TaskInfo)
  211. }
  212. //抽取规则
  213. for _, vc := range e.RuleCores {
  214. tmp := ju.DeepCopy(doc).(map[string]interface{})
  215. //是否进入逻辑
  216. if !ju.Logic(vc.LuaLogic, tmp) {
  217. continue
  218. }
  219. //抽取-前置规则
  220. for _, v := range vc.RulePres {
  221. tmp = ExtRegPre(tmp, j, v, e.TaskInfo)
  222. }
  223. //log.Println("抽取-前置规则", tmp)
  224. //抽取-规则
  225. for _, v := range vc.RuleCores {
  226. ExtRegCore(vc.ExtFrom, tmp, j, v, e)
  227. }
  228. //log.Println("抽取-规则", tmp)
  229. //抽取-后置规则
  230. for _, v := range vc.RuleBacks {
  231. ExtRegBack(j, v, e.TaskInfo)
  232. }
  233. //log.Println("抽取-后置规则", tmp)
  234. }
  235. //全局后置规则
  236. for _, v := range e.RuleBacks {
  237. ExtRegBack(j, v, e.TaskInfo)
  238. }
  239. //函数清理
  240. for key, val := range j.Result {
  241. for _, v := range val {
  242. data := clear.DoClearFn(e.ClearFn[key], []interface{}{v.Value, j.Content})
  243. v.Value = data[0]
  244. }
  245. }
  246. PackageDetail(j, e) //处理分包信息
  247. // bs, _ := json.Marshal(j.Result)
  248. // log.Println("抽取结果", j.Title, j.SourceMid, string(bs))
  249. //分析抽取结果并保存 todo
  250. AnalysisSaveResult(j, e.TaskInfo)
  251. }, func(err interface{}) {
  252. log.Println(err)
  253. <-e.TaskInfo.ProcessPool
  254. })
  255. <-e.TaskInfo.ProcessPool
  256. }
  257. //前置过滤
  258. func ExtRegPre(doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, t *TaskInfo) map[string]interface{} {
  259. before := ju.DeepCopy(doc).(map[string]interface{})
  260. extinfo := map[string]interface{}{}
  261. if in.IsLua {
  262. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  263. if j != nil {
  264. lua.Block = j.Block
  265. }
  266. extinfo = lua.RunScript("pre")
  267. for k, v := range extinfo { //结果覆盖原doc
  268. doc[k] = v
  269. }
  270. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  271. } else {
  272. key := qu.If(in.Field == "", "detail", in.Field).(string)
  273. text := qu.ObjToString(doc[key])
  274. extinfo[key] = in.RegPreBac.Reg.ReplaceAllString(text, "")
  275. doc[key] = extinfo[key] //结果覆盖原doc
  276. AddExtLog("prereplace", j.SourceMid, before, extinfo, in, t) //抽取日志
  277. }
  278. return doc
  279. }
  280. //抽取-规则
  281. func ExtRegCore(extfrom string, doc map[string]interface{}, j *ju.Job, in *RegLuaInfo, et *ExtractTask) {
  282. if in.IsLua {
  283. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Doc: doc, Script: in.RuleText}
  284. lua.KvMap = getKvByLuaFields(extfrom, j, in, et.Tag)
  285. lua.Block = j.Block
  286. extinfo := lua.RunScript("core")
  287. for k, v := range extinfo {
  288. if k == in.Field {
  289. if j.Result[k] == nil {
  290. j.Result[k] = [](*ju.ExtField){}
  291. }
  292. if tmps, ok := v.([]map[string]interface{}); ok {
  293. for _, tmp := range tmps {
  294. j.Result[k] = append(j.Result[k],
  295. &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), extfrom, tmp["value"], 0})
  296. }
  297. }
  298. }
  299. }
  300. if len(extinfo) > 0 {
  301. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  302. }
  303. } else {
  304. //全文正则
  305. text := qu.ObjToString(doc[extfrom])
  306. if in.Field != "" {
  307. extinfo := extRegCoreToResult(extfrom, text, j, in)
  308. if len(extinfo) > 0 {
  309. AddExtLog("extract", j.SourceMid, nil, extinfo, in, et.TaskInfo) //抽取日志
  310. }
  311. }
  312. }
  313. }
  314. //lua脚本根据属性设置提取kv值
  315. func getKvByLuaFields(extfrom string, j *ju.Job, in *RegLuaInfo, t map[string][]*Tag) map[string][]map[string]interface{} {
  316. kvmap := map[string][]map[string]interface{}{}
  317. for fieldname, field := range in.LFields {
  318. tags := t[field] //获取对应标签库
  319. for _, bl := range j.Block {
  320. //冒号kv
  321. if bl.ColonKV != nil {
  322. kvs := bl.ColonKV.Kvs
  323. kvs2 := bl.ColonKV.Kvs_2
  324. //log.Println("ColonKV1", kvs)
  325. //log.Println("ColonKV2", kvs2)
  326. for _, tag := range tags {
  327. for _, kv := range kvs {
  328. if tag.Type == "string" {
  329. if kv.Key == tag.Key {
  330. text := ju.TrimLRSpace(kv.Value, "")
  331. if text != "" {
  332. kvmap[field] = append(kvmap[field], map[string]interface{}{
  333. "field": field,
  334. "code": in.Code,
  335. "ruletext": tag.Key,
  336. "extfrom": extfrom,
  337. "value": text,
  338. "type": "colon1",
  339. "matchtype": "tag_string",
  340. })
  341. }
  342. break
  343. }
  344. } else if tag.Type == "regexp" {
  345. if tag.Reg.MatchString(kv.Key) {
  346. text := ju.TrimLRSpace(kv.Value, "")
  347. if text != "" {
  348. kvmap[field] = append(kvmap[field], map[string]interface{}{
  349. "field": field,
  350. "code": in.Code,
  351. "ruletext": tag.Key,
  352. "extfrom": extfrom,
  353. "value": text,
  354. "type": "colon1",
  355. "matchtype": "tag_regexp",
  356. })
  357. }
  358. break
  359. }
  360. }
  361. }
  362. for _, kv := range kvs2 {
  363. if tag.Type == "string" {
  364. if kv.Key == tag.Key {
  365. text := ju.TrimLRSpace(kv.Value, "")
  366. if text != "" {
  367. kvmap[field] = append(kvmap[field], map[string]interface{}{
  368. "field": field,
  369. "code": in.Code,
  370. "ruletext": tag.Key,
  371. "extfrom": extfrom,
  372. "value": text,
  373. "type": "colon2",
  374. "matchtype": "tag_string",
  375. })
  376. }
  377. break
  378. }
  379. } else if tag.Type == "regexp" {
  380. if tag.Reg.MatchString(kv.Key) {
  381. text := ju.TrimLRSpace(kv.Value, "")
  382. if text != "" {
  383. kvmap[field] = append(kvmap[field], map[string]interface{}{
  384. "field": field,
  385. "code": in.Code,
  386. "ruletext": tag.Key,
  387. "extfrom": extfrom,
  388. "value": text,
  389. "type": "colon2",
  390. "matchtype": "tag_regexp",
  391. })
  392. }
  393. break
  394. }
  395. }
  396. }
  397. }
  398. }
  399. //空格kv
  400. if bl.SpaceKV != nil {
  401. kvs := bl.SpaceKV.Kvs
  402. //log.Println("SpaceKV", kvs)
  403. for _, tag := range tags {
  404. for _, kv := range kvs {
  405. if tag.Type == "string" {
  406. if kv.Key == tag.Key {
  407. text := ju.TrimLRSpace(kv.Value, "")
  408. if text != "" {
  409. kvmap[field] = append(kvmap[field], map[string]interface{}{
  410. "field": field,
  411. "code": in.Code,
  412. "ruletext": tag.Key,
  413. "extfrom": extfrom,
  414. "value": text,
  415. "type": "space",
  416. "matchtype": "tag_string",
  417. })
  418. }
  419. break
  420. }
  421. } else if tag.Type == "regexp" {
  422. if tag.Reg.MatchString(kv.Key) {
  423. text := ju.TrimLRSpace(kv.Value, "")
  424. if text != "" {
  425. kvmap[field] = append(kvmap[field], map[string]interface{}{
  426. "field": field,
  427. "code": in.Code,
  428. "ruletext": tag.Key,
  429. "extfrom": extfrom,
  430. "value": text,
  431. "type": "space",
  432. "matchtype": "tag_regexp",
  433. })
  434. }
  435. break
  436. }
  437. }
  438. }
  439. }
  440. }
  441. //表格kv
  442. if bl.TableKV != nil {
  443. tkv := bl.TableKV
  444. //log.Println("tkv", tkv)
  445. for k, v := range tkv.Kv {
  446. if k == fieldname {
  447. if len(tags) > -tkv.KvIndex[fieldname] {
  448. kvmap[field] = append(kvmap[field], map[string]interface{}{
  449. "field": field,
  450. "code": in.Code,
  451. "ruletext": tags[-tkv.KvIndex[fieldname]].Key,
  452. "extfrom": "table",
  453. "value": v,
  454. "type": "table",
  455. "matchtype": "tag_string",
  456. })
  457. } else { //涉及其他待处理
  458. //log.Println(tags)
  459. }
  460. }
  461. }
  462. }
  463. }
  464. }
  465. return kvmap
  466. }
  467. //正则提取结果
  468. func extRegCoreToResult(extfrom, text string, j *ju.Job, v *RegLuaInfo) map[string][]map[string]interface{} {
  469. extinfo := map[string][]map[string]interface{}{}
  470. if v.RegCore.Bextract { //正则是两部分的,可以直接抽取的(含下划线)
  471. apos := v.RegCore.Reg.FindAllStringSubmatchIndex(text, -1)
  472. if len(apos) > 0 {
  473. pos := apos[0]
  474. for k, p := range v.RegCore.ExtractPos {
  475. if len(pos) > p {
  476. if pos[p] == -1 || pos[p+1] == -1 {
  477. continue
  478. }
  479. val := text[pos[p]:pos[p+1]]
  480. tmps := []map[string]interface{}{}
  481. tmp := map[string]interface{}{
  482. "field": v.Field,
  483. "code": v.Code,
  484. "ruletext": v.RuleText,
  485. "extfrom": extfrom,
  486. "value": val,
  487. "type": "regexp",
  488. "matchtype": "regcontent",
  489. }
  490. tmps = append(tmps, tmp)
  491. extinfo[k] = tmps
  492. if val != "" {
  493. if j.Result[v.Field] == nil {
  494. j.Result[k] = [](*ju.ExtField){}
  495. }
  496. j.Result[k] = append(j.Result[k], &ju.ExtField{k, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  497. }
  498. }
  499. }
  500. }
  501. } else {
  502. pos := v.RegCore.Reg.FindStringIndex(text)
  503. val := ""
  504. if len(pos) == 2 {
  505. text = text[pos[1]:]
  506. rs := regexp.MustCompile("[^\r\n\t]+")
  507. tmp := rs.FindAllString(text, -1)
  508. if len(tmp) > 0 {
  509. val = tmp[0]
  510. }
  511. }
  512. if val != "" {
  513. tmps := []map[string]interface{}{}
  514. tmp := map[string]interface{}{
  515. "field": v.Field,
  516. "code": v.Code,
  517. "ruletext": v.RuleText,
  518. "extfrom": extfrom,
  519. "value": val,
  520. "type": "regexp",
  521. "matchtype": "regcontent",
  522. }
  523. tmps = append(tmps, tmp)
  524. extinfo[v.Field] = tmps
  525. if j.Result[v.Field] == nil {
  526. j.Result[v.Field] = [](*ju.ExtField){}
  527. }
  528. j.Result[v.Field] = append(j.Result[v.Field], &ju.ExtField{v.Field, v.Code, v.RuleText, "regexp", "regcontent", extfrom, val, 0})
  529. }
  530. }
  531. return extinfo
  532. }
  533. //后置过滤
  534. func ExtRegBack(j *ju.Job, in *RegLuaInfo, t *TaskInfo) {
  535. if in.IsLua {
  536. result := GetResultMapForLua(j)
  537. lua := ju.LuaScript{Code: in.Code, Name: in.Name, Result: result, Script: in.RuleText}
  538. if j != nil {
  539. lua.Block = j.Block
  540. }
  541. extinfo := lua.RunScript("back")
  542. for k, v := range extinfo {
  543. if tmps, ok := v.([]map[string]interface{}); ok {
  544. j.Result[k] = [](*ju.ExtField){}
  545. for _, tmp := range tmps {
  546. j.Result[k] = append(j.Result[k], &ju.ExtField{k, qu.ObjToString(tmp["code"]), qu.ObjToString(tmp["ruletext"]), qu.ObjToString(tmp["type"]), qu.ObjToString(tmp["matchtype"]), qu.ObjToString(tmp["extfrom"]), tmp["value"], 0})
  547. }
  548. }
  549. }
  550. if len(extinfo) > 0 {
  551. AddExtLog("clear", j.SourceMid, result, extinfo, in, t) //抽取日志
  552. }
  553. } else {
  554. extinfo := map[string]interface{}{}
  555. if in.Field != "" {
  556. if j.Result[in.Field] != nil {
  557. tmp := j.Result[in.Field]
  558. exts := []interface{}{}
  559. for k, v := range tmp {
  560. if v.Type == "table" { //table抽取到的数据不清理
  561. continue
  562. }
  563. text := qu.ObjToString(v.Value)
  564. if text != "" {
  565. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  566. }
  567. j.Result[in.Field][k].Value = text
  568. exts = append(exts, map[string]interface{}{
  569. "field": v.Field,
  570. "code": v.Code,
  571. "ruletext": v.RuleText,
  572. "type": v.Type,
  573. "matchtype": v.MatchType,
  574. "extfrom": v.ExtFrom,
  575. "value": text,
  576. })
  577. }
  578. extinfo[in.Field] = exts
  579. if len(extinfo) > 0 {
  580. AddExtLog("clear", j.SourceMid, tmp, extinfo, in, t) //抽取日志
  581. }
  582. }
  583. } else {
  584. for key, tmp := range j.Result {
  585. exts := []interface{}{}
  586. for k, v := range tmp {
  587. if v.Type == "table" { //table抽取到的数据不清理
  588. continue
  589. }
  590. text := qu.ObjToString(v.Value)
  591. if text != "" {
  592. text = in.RegPreBac.Reg.ReplaceAllString(text, in.RegPreBac.Replace)
  593. }
  594. j.Result[key][k].Value = text
  595. exts = append(exts, map[string]interface{}{
  596. "field": v.Field,
  597. "code": v.Code,
  598. "ruletext": v.RuleText,
  599. "type": v.Type,
  600. "matchtype": v.MatchType,
  601. "extfrom": v.ExtFrom,
  602. "value": text,
  603. })
  604. }
  605. extinfo[key] = exts
  606. }
  607. if len(extinfo) > 0 {
  608. AddExtLog("clear", j.SourceMid, j.Result, extinfo, in, t) //抽取日志
  609. }
  610. }
  611. }
  612. }
  613. //获取抽取结果map[string][]interface{},lua脚本使用
  614. func GetResultMapForLua(j *ju.Job) map[string][]map[string]interface{} {
  615. result := map[string][]map[string]interface{}{}
  616. for key, val := range j.Result {
  617. if result[key] == nil {
  618. result[key] = []map[string]interface{}{}
  619. }
  620. for _, v := range val {
  621. tmp := map[string]interface{}{
  622. "field": v.Field,
  623. "code": v.Code,
  624. "ruletext": v.RuleText,
  625. "value": v.Value,
  626. "type": v.Type,
  627. "matchtype": v.MatchType,
  628. "extfrom": v.ExtFrom,
  629. }
  630. result[key] = append(result[key], tmp)
  631. }
  632. }
  633. return result
  634. }
  635. //抽取日志
  636. func AddExtLog(ftype, sid string, before interface{}, extinfo interface{}, v *RegLuaInfo, t *TaskInfo) {
  637. if !t.IsEtxLog {
  638. return
  639. }
  640. logdata := map[string]interface{}{
  641. "code": v.Code,
  642. "name": v.Name,
  643. "type": ftype,
  644. "ruletext": v.RuleText,
  645. "islua": v.IsLua,
  646. "field": v.Field,
  647. "version": t.Version,
  648. "taskname": t.Name,
  649. "before": before,
  650. "extinfo": extinfo,
  651. "sid": sid,
  652. "comeintime": time.Now().Unix(),
  653. }
  654. lock.Lock()
  655. ExtLogs[t] = append(ExtLogs[t], logdata)
  656. lock.Unlock()
  657. }
  658. //保存抽取日志
  659. func SaveExtLog() {
  660. tmpLogs := map[*TaskInfo][]map[string]interface{}{}
  661. lock.Lock()
  662. tmpLogs = ExtLogs
  663. ExtLogs = map[*TaskInfo][]map[string]interface{}{}
  664. lock.Unlock()
  665. for k, v := range tmpLogs {
  666. if len(v) < saveLimit {
  667. db.Mgo.SaveBulk(k.TrackColl, v...)
  668. } else {
  669. for {
  670. if len(v) > saveLimit {
  671. tmp := v[:saveLimit]
  672. db.Mgo.SaveBulk(k.TrackColl, tmp...)
  673. v = v[saveLimit:]
  674. } else {
  675. db.Mgo.SaveBulk(k.TrackColl, v...)
  676. break
  677. }
  678. }
  679. }
  680. }
  681. time.AfterFunc(10*time.Second, SaveExtLog)
  682. }
  683. type FieldValue struct {
  684. Value interface{}
  685. Count int
  686. }
  687. //分析抽取结果并保存
  688. func AnalysisSaveResult(j *ju.Job, task *TaskInfo) {
  689. doc := j.Data
  690. result := j.Result
  691. _id := qu.BsonIdToSId((*doc)["_id"])
  692. iscore, _ := ju.Config["fieldscore"].(bool)
  693. if iscore { //打分
  694. result = ScoreFields(result)
  695. }
  696. //结果排序
  697. values := map[string][]*ju.SortObject{}
  698. for key, val := range result {
  699. fieldValue := map[string][]interface{}{}
  700. if iscore { //走打分
  701. for _, v := range val {
  702. if len(fmt.Sprint(v.Value)) < 1 {
  703. continue //去除空串
  704. }
  705. fieldValue[fmt.Sprint(v.Value)+v.Type] = []interface{}{v.Score, v.Value}
  706. }
  707. } else { //不走打分,按出现频次
  708. for _, v := range val {
  709. if len(fmt.Sprint(v.Value)) < 1 {
  710. continue //去除空串
  711. }
  712. if fieldValue[fmt.Sprint(v.Value)] == nil {
  713. fieldValue[fmt.Sprint(v.Value)] = []interface{}{0, v.Value}
  714. } else {
  715. fieldValue[fmt.Sprint(v.Value)][0] = qu.IntAll(fieldValue[fmt.Sprint(v.Value)][0]) + 1
  716. }
  717. }
  718. }
  719. objects := []*ju.SortObject{}
  720. for k, v := range fieldValue {
  721. tmp := &ju.SortObject{
  722. Key: k,
  723. Value: qu.IntAll(v[0]),
  724. Object: v[1],
  725. }
  726. objects = append(objects, tmp)
  727. }
  728. values[key] = ju.ExtSort(objects)
  729. }
  730. //从排序结果中取值
  731. tmp := map[string]interface{}{} //抽取值
  732. for key, val := range values {
  733. for _, v := range val { //取第一个
  734. if v.Key != "" {
  735. tmp[key] = v.Object
  736. break
  737. }
  738. }
  739. }
  740. //resulttmp := tmp
  741. resulttmp, _ := ju.DeepCopy(tmp).(map[string]interface{}) //保存结果
  742. resulttmp["result"] = result
  743. if len(j.PackageInfo) > 0 { //分包信息
  744. resulttmp["package"] = j.PackageInfo
  745. }
  746. if len(j.Winnerorder) > 0 { //候选人信息
  747. resulttmp["winnerorder"] = j.Winnerorder
  748. }
  749. for k, v := range *doc {
  750. if resulttmp[k] == nil { //&& (k != "detail" || k != "contenthtml") {
  751. resulttmp[k] = v
  752. }
  753. }
  754. //质量审核
  755. if ju.Config["qualityaudit"].(bool) {
  756. QualityAudit(resulttmp)
  757. }
  758. b, p, c, d := TransmitData(resulttmp, _id) //抽取省份城市
  759. //log.Println("抽取省份,城市,县结果=====", b, p, c, d)
  760. resulttmp["district"] = d
  761. if b {
  762. resulttmp["city"] = c
  763. resulttmp["area"] = p
  764. }
  765. if task.TestColl == "" {
  766. if len(tmp) > 0 { //保存抽取结果
  767. b := task.DB.Update(task.SaveColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": tmp}, true, false)
  768. if !b {
  769. log.Println(task.SaveColl, _id)
  770. }
  771. }
  772. if b, ok := ju.Config["saveresult"].(bool); ok && b {
  773. b := db.Mgo.Update("extract_result", `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": resulttmp}, true, false)
  774. if !b {
  775. log.Println("extract_result", _id)
  776. }
  777. }
  778. } else { //测试结果
  779. if len(j.BlockPackage) > 0 { //分包详情
  780. resulttmp["epackage"] = j.BlockPackage
  781. }
  782. b := db.Mgo.Update(task.TestColl, `{"_id":"`+_id+`"}`, map[string]interface{}{"$set": resulttmp}, true, false)
  783. if !b {
  784. log.Println(task.TestColl, _id)
  785. }
  786. }
  787. }
  788. func QualityAudit(resulttmp map[string]interface{}) {
  789. //获取审核字段
  790. //log.Println("需要审核的字段-----", AuditFields)
  791. if len(AuditFields) == 0 {
  792. v, _ := db.Mgo.FindOne("version", `{"isuse":true,"delete":false}`) //查找当前使用版本
  793. if len(*v) > 0 { //查找当前使用版本中属性配置需要审核的字段
  794. vid := qu.BsonIdToSId((*v)["_id"])
  795. query := map[string]interface{}{
  796. "isaudit": true,
  797. "delete": false,
  798. "vid": vid,
  799. }
  800. data, _ := db.Mgo.Find("versioninfo", query, `{"_id":-1}`, `{"s_field":1}`, false, -1, -1)
  801. for _, d := range *data {
  802. field := qu.ObjToString(d["s_field"])
  803. AuditFields = append(AuditFields, field)
  804. }
  805. }
  806. }
  807. for _, field := range AuditFields {
  808. //1.分包
  809. if resulttmp["package"] != nil {
  810. packagedata := resulttmp["package"].(map[string]map[string]interface{})
  811. for _, val := range packagedata {
  812. if val[field] != nil {
  813. fv := qu.ObjToString(val[field])
  814. if fv != "" {
  815. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  816. RedisMatch(field, fv, val) //redis匹配
  817. } else { //除了buyer和winner,其他字段走规则匹配
  818. fv := qu.ObjToString(resulttmp[field])
  819. //resulttmp[field+"_isredis"] = false
  820. RuleMatch(field, fv, resulttmp)
  821. }
  822. }
  823. }
  824. }
  825. }
  826. //2.外围
  827. if resulttmp[field] != nil {
  828. fv := qu.ObjToString(resulttmp[field])
  829. if fv != "" {
  830. if field == "buyer" || field == "winner" { //field为buyer和winner时特殊处理,先从Redis中查,有直接通过,没有走匹配规则
  831. RedisMatch(field, fv, resulttmp) //redis匹配
  832. } else { //除了buyer和winner,其他字段走规则匹配
  833. fv := qu.ObjToString(resulttmp[field])
  834. //resulttmp[field+"_isredis"] = false
  835. RuleMatch(field, fv, resulttmp)
  836. }
  837. }
  838. }
  839. }
  840. }
  841. //Redis匹配
  842. func RedisMatch(field, fv string, val map[string]interface{}) {
  843. i := redis.GetInt(field, field+"_"+fv) //查找redis
  844. if i == 0 { //reids未找到,执行规则匹配
  845. val[field+"_isredis"] = false
  846. RuleMatch(field, fv, val) //规则匹配
  847. } else { //redis找到,打标识存库
  848. val[field+"_isredis"] = true
  849. }
  850. }
  851. //规则匹配
  852. func RuleMatch(field, fieldval string, tmpMap map[string]interface{}) {
  853. if fieldval != "" {
  854. SMap := StartMatch(field, fieldval)
  855. //SMap.AddKey(field+"_isaudit", false)
  856. for _, k := range SMap.Keys {
  857. tmpMap[k] = SMap.Map[k]
  858. }
  859. tmpMap[field+"_isaudit"] = false
  860. }
  861. }
  862. //开始规则匹配
  863. func StartMatch(field, text string) *pretreated.SortMap {
  864. SMap := pretreated.NewSortMap()
  865. f := RecogFieldMap[field]
  866. if len(f) > 0 {
  867. fid := qu.BsonIdToSId(f["_id"])
  868. recogFieldPreRule := qu.ObjToString(f["s_recogfield_prerule"])
  869. textAfterRecogFieldPrerule := ju.PreFilter(text, recogFieldPreRule) //识别字段的前置过滤
  870. if textAfterRecogFieldPrerule != "" {
  871. classMap := FidClassMap[fid]
  872. L:
  873. for _, c := range classMap { //class
  874. classid := qu.BsonIdToSId(c["_id"])
  875. classPrerule := qu.ObjToString(c["s_class_prerule"])
  876. savefield := qu.ObjToString(c["s_savefield"]) //保存字段
  877. textAfterClassPrerule := ju.PreFilter(textAfterRecogFieldPrerule, classPrerule) //class的前置过滤
  878. if textAfterClassPrerule != "" {
  879. ruleMap := CidRuleMap[classid]
  880. for _, r := range ruleMap { //rule
  881. rulePrerule := qu.ObjToString(r["s_rule_prerule"])
  882. s_code := qu.ObjToString(r["s_code"])
  883. rule := r["rule"].([]interface{})
  884. textAfterRulePrerule := ju.PreFilter(textAfterClassPrerule, rulePrerule) //class的前置过滤
  885. if textAfterRulePrerule != "" {
  886. b, _ := ju.RecogAnalyRules(textAfterRulePrerule, rule)
  887. if b { //匹配到一个分类下某个规则时,不再继续匹配
  888. if savefield != "" { //保存字段不为空,存储代码信息
  889. SMap.AddKey(field+"_"+savefield, s_code)
  890. }
  891. break L
  892. }
  893. }
  894. }
  895. }
  896. }
  897. }
  898. }
  899. return SMap
  900. }