-
Notifications
You must be signed in to change notification settings - Fork 0
/
Project-1
170 lines (140 loc) · 5.6 KB
/
Project-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import os
import re
import requests
import sys
from num2words import num2words
import pandas as pd
import numpy as np
import tiktoken
import openai
from PyPDF2 import PdfReader
from typing import Any, Dict, List, Tuple
# Set your OpenAI API key as the environment variable OPENAI_API_KEY
os.environ["OPENAI_API_KEY"]
# Function to normalize text (s is input text)
def normalize_text(s, sep_token = " \n "):
s = re.sub(r'\s+', ' ', s).strip()
s = re.sub(r". ,","",s)
# remove all instances of multiple spaces
s = s.replace("..",".")
s = s.replace(". .",".")
s = s.replace("\n", "")
s = s.strip()
return s
#function for df manip
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
#function to extract text from pdf
def extract_text_from_pdf(pdf_path):
reader = PdfReader(pdf_path)
text = []
for page in reader.pages:
text.append(page.extract_text())
return text
#function to get the embedding
def get_embedding(text, model="text-embedding-3-small"): # model = "deployment_name"
return openai.embeddings.create(input = [text], model=model).data[0].embedding
#def generate_embeddings(text, model="text-embedding-3-small"):
#return openai.embeddings.create(input=text, model=model).data[0].embedding
#function to search the embedded documents
def search_docs(df, user_query, top_n=8, to_print=True):
embedding = get_embedding(
user_query,
model="text-embedding-3-small" # model should be set to the deployment name you chose when you deployed the text-embedding-ada-002 (Version 2) model
)
df["similarity"] = df["ada_v2"].apply(lambda x: cosine_similarity(x, embedding))
res = (
df.sort_values("similarity", ascending=False)
.head(top_n)
.reset_index(drop=True) # Reset the index after sorting
)
if to_print:
print(res)
return res
# Function to split text into chunks by tokens
def chunk_text_by_tokens(tokenized_text, tokenizer, max_tokens_per_chunk=8000):
chunks = []
current_chunk = ""
for token in tokenized_text:
try:
token_ids = tokenizer.encode(token) # Convert token string to token IDs
token_str = tokenizer.decode(token_ids) # Decode token IDs to string
except Exception as e:
print(f"Error decoding token: {e}")
token_str = "" # Assign an empty string if decoding fails
if len(current_chunk) + len(token_str) <= max_tokens_per_chunk:
current_chunk += token_str + " "
else:
chunks.append(current_chunk.strip())
current_chunk = token_str + " "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# Function to write PDF text to a text file
def write_pdf_to_text(pdf_path):
with open('output.txt', 'a') as f: # Use 'a' mode for appending
reader = PdfReader(pdf_path)
for page in reader.pages:
page_text = page.extract_text()
f.write(page_text)
f.write("\n")
def main():
# Folder containing PDF files
pdf_folder = "Sample-PDFs"
# List all PDF files in the folder
pdf_files = [os.path.join(pdf_folder, filename) for filename in os.listdir(pdf_folder) if filename.endswith(".pdf")]
#List to store the text from all PDF's
text = []
doc_indices = []
# Write the pages to the text file
for pdf_file in pdf_files:
reader = PdfReader(pdf_file)
for page in reader.pages:
page_text = page.extract_text()
text.append(page_text)
doc_indices.append(len(text) - 1) # Index of the current document
ilist = list(range(len(text)))
len_docs = [len(t) for t in text]
# Create a dictionary with the data
data = {"index": ilist, "text": text, "text_len": len_docs}
# Create a DataFrame from the unchanged pdf data
df = pd.DataFrame(data)
# Set display options to show all columns of df
pd.options.mode.chained_assignment = None #https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#evaluation-order-matters
# Normalize the text
df['text']= df["text"].apply(lambda x : normalize_text(x))
# Tokenize the text
tokenizer = tiktoken.get_encoding("cl100k_base")
df['n_tokens'] = df["text"].apply(lambda x: len(tokenizer.encode(x)))
df = df[df.n_tokens<8192]
len(df)
# Get embeddings
df['ada_v2'] = df["text"].apply(lambda x: get_embedding(x))
# Set display options to show all columns of df
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
# Print the DataFrame
#print(df)
# Save DataFrame to a CSV file
#df.to_csv('dataframe.csv') # Set index=False to exclude index from the saved file
# Make a query on the database
query = "What is being studied? What are the differences between the papers?"
res = search_docs(df, query, top_n=26)
query_text = "Based on the references above, " + query
refs = ""
for row in res.iterrows():
index = row[0]
txt = row[1]["text"]
refs += f"Document {index}:\n{txt}\n\n"
query_text = refs + query_text
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant that summarizes the contents of a database. Make one summary for astronomy related things. Make a seperate summary from things that do not pertain to astronomy."},
{"role": "user", "content": query_text},
]
)
# print results
print(response.choices[0].message.content)
if __name__ == "__main__":
main()