-
Notifications
You must be signed in to change notification settings - Fork 0
/
spider.py
222 lines (176 loc) · 7.68 KB
/
spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#! /usr/bin/env python3
import logging
import requests
from datetime import datetime, timedelta
from typing import Union, Optional, Set, List
from sqlalchemy import func
from sqlalchemy.orm import load_only
import config
from analyzer import WebPageAnalyzer
from schema import WebPage
from model import DBSession, InvertedIndexModel, WebPageModel, SpiderQueueModel
from utils import keyword_scores_to_dict
logger = logging.getLogger("spider")
class SpiderQueue(object):
def __init__(self, max:int=0) -> None:
self.__max:int = 0
def count(self) -> int:
session = DBSession()
return session.query(func.count(SpiderQueueModel.id)).scalar()
def empty(self) -> bool:
return self.count() == 0
def full(self) -> bool:
return self.__max > 0 and self.count() >= self.__max
def pop_url(self) -> str:
session = DBSession()
item = session.query(SpiderQueueModel).first()
if item is None:
return None
url = item.url
session.delete(item)
session.commit()
return url
def push_url(self, url:str) -> bool:
if len(url) > config.sql_index_field_length_limit:
logger.warning(f"{url} is too long, skip.")
return False
if self.full():
return False
session = DBSession()
item = session.query(SpiderQueueModel).filter_by(url=url).first()
if item is not None:
return False
item = SpiderQueueModel(url=url)
session.add(item)
session.commit()
return True
def push_urls(self, urls:List[str]) -> int:
count:int = self.count()
session = DBSession()
for url in urls:
if len(url) > config.sql_index_field_length_limit:
logger.warning(f"{url} is too long, skip.")
continue
if count >= config.spider_queue_max_size:
break
item = session.query(SpiderQueueModel).filter_by(url=url).first()
if item is not None:
continue
item = SpiderQueueModel(url=url)
session.add(item)
count += 1
session.commit()
return count
class Spider(object):
def __init__(self) -> None:
self.__url_queue = SpiderQueue(config.spider_queue_max_size)
self.__proxy = config.proxy
self.__timeout:int = config.timeout
self.__headers:dict[str, str] = {"User-Agent": config.user_agent}
def push_url(self, url:str) -> bool:
return self.__url_queue.push_url(url)
def pop_url(self) -> Optional[str]:
return self.__url_queue.pop_url()
def push_urls(self, urls:Union[Set[str], List[str]]) -> None:
self.__url_queue.push_urls(urls)
def update_webpage(self, web_page_analyzer:WebPageAnalyzer) -> int:
session = DBSession()
page_model = session.query(WebPageModel).filter_by(url=web_page_analyzer.url()).options(load_only("id")).first()
if page_model is None:
page_model = WebPageModel(url=web_page_analyzer.url(),
update_time=datetime.now(),
title=web_page_analyzer.title(),
content=web_page_analyzer.content())
else:
page_model.update_time = datetime.now()
page_model.title = web_page_analyzer.title()
page_model.content = web_page_analyzer.content()
session.add(page_model)
session.commit()
return page_model.id
def guess_charset(self, response:requests.Response) -> str:
try:
encoding:str = response.encoding.upper()
apparent_encoding:str = response.apparent_encoding.upper()
info:List[str] = [encoding, apparent_encoding]
if "GB2312" in info:
return "GB2312"
if "GBK" in info:
return "GBK"
except Exception as e:
logger.warning(e)
return "UTF-8"
def request(self, url:str) -> Optional[WebPageAnalyzer]:
try:
response:requests.Response = requests.get(url,
stream=True,
proxies=self.__proxy,
timeout=self.__timeout,
headers=self.__headers)
except Exception as e:
logger.warning(e)
return None
if response.status_code != 200:
logger.warning(f"{url} - status code:{response.status_code}")
response.close()
return None
if "content-type" not in response.headers:
logger.warning(f"{url} - response.headers without content-type")
response.close()
return None
if "text/html" not in response.headers["content-type"]:
logger.warning(f"{url} - content-type {response.headers['content-type']} is not text/html")
response.close()
return None
response.encoding = self.guess_charset(response)
web_page_analyzer = WebPageAnalyzer(url=url, html=response.text)
response.close()
return web_page_analyzer
def expired(self, url:str, cd:timedelta) -> bool:
session = DBSession()
page = session.query(WebPageModel).filter_by(url=url).first()
if page is None:
return True
if page.update_time < self.__start_time:
return True
return datetime.now() - page.update_time > cd
def update_inverted_index(self, web_page_id:int, web_page_analyzer:WebPageAnalyzer) -> int:
session = DBSession()
scores:Dict[str, float] = web_page_analyzer.keyword_scores()
for keyword in scores:
index = session.query(InvertedIndexModel).filter_by(keyword=keyword).first()
if index is None:
web_page_id_scores:Dict[int, float] = {web_page_id:scores[keyword]}
index = InvertedIndexModel(keyword=keyword, web_page_id_scores=f"{web_page_id_scores}")
session.add(index)
else:
web_page_id_scores:Dict[int, float] = keyword_scores_to_dict(index.web_page_id_scores)
web_page_id_scores[web_page_id] = scores[keyword]
index.web_page_id_scores = f"{web_page_id_scores}"
session.commit()
def start(self) -> None:
self.__start_time = datetime.now()
while True:
url:Optional[str] = self.pop_url()
if url is None:
break
web_page_analyzer:WebPageAnalyzer = self.request(url)
if web_page_analyzer is None:
continue
id = self.update_webpage(web_page_analyzer)
self.update_inverted_index(id, web_page_analyzer)
urls:List[str] = web_page_analyzer.urls()
for url in urls:
if not self.expired(url, timedelta(seconds=config.cd)):
logger.info(f"{url} is cooling down, skip.")
continue
if not self.push_url(url):
break
logger.info(f"{web_page_analyzer.title()} {url} OK.")
if __name__ == "__main__":
logger.setLevel(level=logging.DEBUG)
console = logging.StreamHandler()
logger.addHandler(console)
spider = Spider()
spider.push_urls(config.entry_urls)
spider.start()