#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @File : ac_sensitive.py # @Software: PyCharm # DFA算法查找敏感词 class Node(object): def __init__(self): self.next = {} self.fail = None self.isWord = False self.word = "" class ACAutomation(object): def __init__(self): self.root = Node() # 添加敏感词函数 def add_word(self, word): temp_root = self.root for char in word: if char not in temp_root.next: temp_root.next[char] = Node() temp_root = temp_root.next[char] temp_root.isWord = True temp_root.word = word # 失败指针函数 def make_fail(self): temp_queue = [] temp_queue.append(self.root) while len(temp_queue) != 0: temp = temp_queue.pop(0) p = None for key, value in temp.next.item(): if temp == self.root: temp.next[key].fail = self.root else: p = temp.fail while p is not None: if key in p.next: temp.next[key].fail = p.fail break p = p.fail if p is None: temp.next[key].fail = self.root temp_queue.append(temp.next[key]) # 查找敏感词函数 def search(self, content): p = self.root result = [] currentposition = 0 while currentposition < len(content): word = content[currentposition] while word in p.next == False and p != self.root: p = p.fail if word in p.next: p = p.next[word] else: p = self.root if p.isWord: result.append(p.word) p = self.root currentposition += 1 return result # 加载敏感词库函数 def parse(self, words_path: str): with open(words_path, encoding='utf-8') as f: for keyword in f: self.add_word(str(keyword).strip()) # 敏感词替换函数 def words_replace(self, text): """ :param ah: AC自动机 :param text: 文本 :return: 过滤敏感词之后的文本 """ import time start=time.time() result = list(set(self.search(text))) print(result) for x in result: m = text.replace(x, '*' * len(x)) text = m print(time.time()-start) return text if __name__ == "__main__": gfw = ACAutomation() path = "../data/sensitive_words.txt" gfw.parse(path) text = "苹果新品发布会" result = gfw.words_replace(text) print(result)