File size: 5,315 Bytes
23ee1af e41a696 23ee1af e41a696 311e164 23ee1af aeffade b864895 23ee1af b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 b864895 e41a696 23ee1af e41a696 23ee1af e41a696 23ee1af aeffade |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import nltk
import os
import json
import math
import re
import gradio as gr
from collections import defaultdict, Counter
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
nltk.download("punkt")
nltk.download("wordnet")
nltk.download("punkt_tab")
stop_words = {"a", "is", "the", "of", "all", "and", "to", "can", "be", "as", "once", "for", "at", "am", "are", "has", "have", "had", "up", "his", "her", "in", "on", "no", "we", "do"}
with open("docs.json", "r", encoding="utf-8") as f:
docs_ds = json.load(f)
with open("queries.json", "r", encoding="utf-8") as f:
queries_ds = json.load(f)
documents = {int(doc["doc_id"]): doc["text"] for doc in docs_ds}
queries = {int(q["query_id"]): q["text"] for q in queries_ds}
inverted_index = defaultdict(set)
positional_index = defaultdict(lambda: defaultdict(list))
tf_idf_vectors = defaultdict(dict)
idf_scores = {}
def process_documents(documents):
stemmer = PorterStemmer()
lemmatizer = WordNetLemmatizer()
doc_freq = defaultdict(int)
term_freqs = {}
for doc_id, text in documents.items():
words = word_tokenize(text.lower())
filtered_words = [lemmatizer.lemmatize(w) for w in words if w.isalnum() and w not in stop_words]
term_counts = Counter(filtered_words)
term_freqs[doc_id] = term_counts
for pos, word in enumerate(filtered_words):
stemmed = stemmer.stem(word)
inverted_index[stemmed].add(doc_id)
positional_index[stemmed][doc_id].append(pos)
for word in set(filtered_words):
doc_freq[word] += 1
total_docs = len(documents)
for word, df in doc_freq.items():
idf_scores[word] = math.log(total_docs / df)
for doc_id, term_counts in term_freqs.items():
tf_idf_vectors[doc_id] = {word: count * idf_scores[word] for word, count in term_counts.items()}
def execute_boolean_query(query, documents):
query = query.lower()
tokens = query.split()
stemmer = PorterStemmer()
operators = {'and', 'or', 'not'}
term_stack = []
operator_stack = []
for token in tokens:
if token in operators:
operator_stack.append(token)
else:
stemmed_word = stemmer.stem(token)
term_set = inverted_index.get(stemmed_word, set())
term_stack.append(term_set)
while 'not' in operator_stack:
idx = operator_stack.index('not')
term_stack[idx] = set(documents.keys()) - term_stack[idx]
operator_stack.pop(idx)
while operator_stack:
op = operator_stack.pop(0)
left = term_stack.pop(0)
right = term_stack.pop(0)
if op == 'and':
term_stack.insert(0, left & right)
elif op == 'or':
term_stack.insert(0, left | right)
return sorted(term_stack[0]) if term_stack else []
def execute_proximity_query(query):
match = re.match(r'(\w+)\s+(\w+)\s*/\s*(\d+)', query)
if not match:
return []
word1, word2, k = match.groups()
k = int(k)
stemmer = PorterStemmer()
word1 = stemmer.stem(word1.lower())
word2 = stemmer.stem(word2.lower())
result_docs = set()
if word1 in positional_index and word2 in positional_index:
for doc_id in positional_index[word1]:
if doc_id in positional_index[word2]:
positions1 = positional_index[word1][doc_id]
positions2 = positional_index[word2][doc_id]
if any(0 < abs(p1 - p2) <= k for p1 in positions1 for p2 in positions2):
result_docs.add(doc_id)
return sorted(result_docs)
def evaluate_cosine_similarity_score(vec1, vec2):
common = set(vec1.keys()) & set(vec2.keys())
dot_product = sum(vec1[k] * vec2[k] for k in common)
norm1 = math.sqrt(sum(v**2 for v in vec1.values()))
norm2 = math.sqrt(sum(v**2 for v in vec2.values()))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def process_query(user_input_query):
lemmatizer = WordNetLemmatizer()
tokens = word_tokenize(user_input_query.lower())
filtered = [lemmatizer.lemmatize(w) for w in tokens if w.isalnum() and w not in stop_words]
query_counts = Counter(filtered)
return {w: query_counts[w] * idf_scores.get(w, 0) for w in query_counts}
def execute_vsm_query(user_input_query, alpha=0.001):
query_vector = process_query(user_input_query)
scores = {}
for doc_id, doc_vector in tf_idf_vectors.items():
sim = evaluate_cosine_similarity_score(query_vector, doc_vector)
if sim >= alpha:
scores[doc_id] = sim
return sorted(scores, key=scores.get, reverse=True)
process_documents(documents)
def chatbot_fn(query, method):
if not query:
return "Query cannot be empty"
if method == "Boolean":
result = execute_boolean_query(query, documents)
elif method == "Proximity":
result = execute_proximity_query(query)
elif method == "Vector Space Model":
result = execute_vsm_query(query)
return f"Result-set: {result}"
iface = gr.Interface(
fn=chatbot_fn,
inputs=["text", gr.Radio(["Boolean", "Proximity", "Vector Space Model"], label="Method")],
outputs="text",
title="Information Retrieval Chatbot",
)
iface.launch() |