sensitive_word.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # coding:utf-8
  2. """
  3. 以AC自动机机制为基础,编写获取关键词程序,优点速度快
  4. """
  5. class Node(object):
  6. def __init__(self):
  7. self.next = {}
  8. self.fail = None
  9. self.isWord = False
  10. self.word = ""
  11. class AcAutomation(object):
  12. def __init__(self):
  13. self.root = Node()
  14. self.catch=set()
  15. # 添加敏感词函数
  16. def add_word(self, word):
  17. temp_root = self.root
  18. self.catch.add(word)
  19. for char in word:
  20. if char not in temp_root.next:
  21. temp_root.next[char] = Node()
  22. temp_root = temp_root.next[char]
  23. temp_root.isWord = True
  24. temp_root.word = word
  25. @staticmethod
  26. def __check_word(cache_result: list, char: str):
  27. """
  28. 获取keywords
  29. :param cache_result:
  30. :param char:
  31. :return:
  32. """
  33. update_catch = [] # 更新之后的指针列表
  34. words = [] # 获取关键词列表
  35. for cache in cache_result:
  36. if char in cache.next:
  37. next_p = cache.next[char]
  38. if next_p.isWord:
  39. words.append(next_p.word)
  40. update_catch.append(next_p)
  41. return update_catch, words
  42. @staticmethod
  43. def __check_word_position(cache_result: dict, char: str, position: int):
  44. """
  45. 获取keywords包含位置信心
  46. :param cache_result:
  47. :param char:字符
  48. :param position:位置
  49. :return:
  50. """
  51. update_catch = {} # 更新之后的指针列表
  52. words = [] # 获取关键词列表
  53. for d_key, (start, cache) in cache_result.items():
  54. if char in cache.next:
  55. next_p = cache.next[char]
  56. if next_p.isWord:
  57. words.append((next_p.word, start, position))
  58. update_catch[len(update_catch)] = (start, next_p)
  59. return update_catch, words
  60. # 查找关键词函数
  61. def search(self, content):
  62. """
  63. 关键词查找
  64. :param content:
  65. :return:
  66. """
  67. cache_result = [] # 缓存节点
  68. words_result = [] # 关键词列表
  69. position = 0 # 当前位置
  70. while position < len(content):
  71. p = self.root
  72. char = content[position]
  73. cache_result, words = self.__check_word(cache_result, char)
  74. words_result.extend(words)
  75. if char in p.next:
  76. if p.next[char].isWord:
  77. words_result.append(p.next[char].word)
  78. cache_result.append(p.next[char])
  79. position += 1
  80. return words_result
  81. # 查找关键词函数并记录位置
  82. def search_and_position(self, content):
  83. """
  84. 关键词查找包含位置
  85. :param content:
  86. :return:
  87. """
  88. cache_result = {} # 缓存节点
  89. words_result = [] # 关键词列表
  90. position = 0 # 当前位置
  91. while position < len(content):
  92. p = self.root
  93. char = content[position]
  94. cache_result, words = self.__check_word_position(cache_result, char, position)
  95. words_result.extend(words)
  96. if char in p.next:
  97. if p.next[char].isWord:
  98. words_result.append((position, p.next[char].word))
  99. cache_result[len(cache_result)] = (position, p.next[char])
  100. position += 1
  101. return words_result
  102. if __name__ == '__main__':
  103. ah = AcAutomation()
  104. for w in ["国", "国产", "产化", "国产化", "路由器123", "国产化路由器", "国产化+服务器", "国产化+防火墙", "国产化+无线"]:
  105. ah.add_word(w)
  106. result = ah.search_and_position("国产化路由器123")
  107. print(result)