1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
| """ coding:UTF-8 author:LemontreeN date:2022-05-08 """ import json import math
import tqdm from ltp import LTP
ltp = LTP(path="base")
class PreProcessed: def __init__(self): self.stop_words = None self.cnt = 0 self.index_path = 'data/inverted_index.txt' self.word_dict = {} self.cnts = []
def read_stop_words(self, file_path: str): with open(file_path, 'r', encoding='utf-8') as fp: self.stop_words = set(fp.read().split('\n'))
def generate_index(self, input_path: str): progress_read_index = tqdm.tqdm(range(14768), f'建立索引中,目前进度') with open(input_path, 'r', encoding='utf-8') as js: for line in js.readlines(): data = json.loads(line) pid = data.get('pid') seg_list = data.get('document') seg = ltp.seg(seg_list) data['document'] = [' '.join(item) for item in seg[0]] word_cnt = 0 for item in seg[0]: word_cnt += len(item) for word in item: flag = 0 if word in self.stop_words: pass elif word not in self.word_dict: self.word_dict[word] = [] self.word_dict[word].append([pid, 1]) else: index_list = self.word_dict[word] for index in index_list: if index[0] == pid: index[1] += 1 flag = 1 if flag == 0: self.word_dict[word].append([pid, 1]) progress_read_index.update(1) self.cnts.append(word_cnt) with open(self.index_path, 'w', encoding='utf-8') as index_output: for key, value in self.word_dict.items(): index_output.write(str(key) + ';;;') for i in value: index_output.write(str(i) + '.') index_output.write('\n') with open('data/words.txt', 'w', encoding='utf-8') as fp: for item in self.cnts: fp.write('%d\n' % item) exit(0)
def read_index(self): progress_read_index = tqdm.tqdm(range(355109), f'读取索引中,目前进度') with open('data/words.txt', 'r', encoding='utf-8') as fp: self.cnts = fp.read().split('\n') with open(self.index_path, 'r', encoding='utf-8') as fp: for line in fp.readlines(): line = line.split(';;;') if len(line) != 2: print('error') pass word, index = line[0], line[1] pid_list = index[:-2].split('.') self.word_dict[word] = pid_list progress_read_index.update(1) print('-----***索引读取完毕***-----')
def search(self, conds: str) -> list: """ 检索TOP3相关文档 :param conds: 查询条件 :return: 可能的文档列表: list """ seg, hidden = ltp.seg([conds]) conds = '||'.join(seg[0]) if conds != '!quit': if '&&' in conds: conds = conds.split('&&') pid_list = self.word_dict.get(conds[0]) for i in range(1, len(conds)): merge_list = self.word_dict.get(conds[i]) temp_list = [] for item in merge_list: if item in pid_list: temp_list.append(item) pid_list = temp_list elif '||' in conds: conds = conds.split('||') pid_list = [] weight = [] for cond in conds: merge_list = self.word_dict.get(cond) if merge_list is not None: df = len(merge_list) idf = 1 / df for item in merge_list: item = item.split(',') pid = int(item[0][1:]) fre = int(item[1][:-1]) tf=math.log(fre+3) tf_idf = tf * idf if pid not in pid_list: pid_list.append(pid) weight.append(tf_idf) else: for i in range(len(pid_list)): if pid_list[i] == pid: weight[i] += tf_idf break else: pid_list = self.word_dict.get(conds) if not pid_list: print('None!') return [-1] else: pid_weight = sorted([(w, p) for w, p in zip(weight, pid_list)], reverse=True) print(pid_weight[0]) if len(pid_weight)==1: return [pid_weight[0][1]] elif len(pid_weight)==2: return [pid_weight[0][1], pid_weight[1][1]] else: return [pid_weight[0][1],pid_weight[1][1],pid_weight[2][1]]
if __name__ == "__main__": pre = PreProcessed() choice = input('\n****Inverted Index IR System****\nChoice:\n1. 读取文件建立索引\n2. 使用已有索引查询') if choice == '1': pre.read_stop_words('data/stopwords.txt') pre.generate_index('data/passages_multi_sentences.json') elif choice == '2': pre.read_index() pre.search('家||中国') exit(0)
|