-
Notifications
You must be signed in to change notification settings - Fork 1
/
news_api.py
158 lines (117 loc) · 4.46 KB
/
news_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os
import sys
import json
import requests
import datetime
from dotenv import load_dotenv
from pathlib import Path
env_path = Path('./') / '.env'
load_dotenv(dotenv_path=env_path)
class NewsApi():
def __init__(self, endpoint, output_filename, **kwargs):
self._base_url = 'https://newsapi.org/v2/{}'.format(endpoint)
self._output_filename = output_filename
self._params = {**kwargs, 'apiKey': os.getenv('API_KEY')}
self._data = {'custom_params': {**kwargs},
'datetime': str(datetime.datetime.now()),
'total_articles': 0}
def _query(self, current_page):
print('\nGrabbing page: {}...'.format(current_page))
self._params['page'] = current_page
response = requests.get(self._base_url, params=self._params)
if response.status_code != 200:
print('\nRequest failed with a status code: {}'.format(response.status_code))
print('\nDetails: ', response.text)
self._cleanup()
else:
data = response.json()
if 'articles' not in data:
print('Could not retrieve any articles...')
self._cleanup()
else:
return data
def _calculate_total_page(self, total_result):
page_size = 20
if 'pageSize' in self._params:
page_size = self._params['pageSize']
if total_result <= 100:
return 1
elif total_result % page_size == 0:
return total_result / page_size
else:
return int(total_result / page_size) + 1
def _save(self):
total_articles = len(self._data['articles'])
print('\nSaving the data into a file...')
print('\nTotal number of articles obtained: ', total_articles)
self._data['total_articles'] = total_articles
with open(self._output_filename, 'w') as file:
json.dump(self._data, file, indent=4)
print('\nData saved in {}!'.format(self._output_filename))
def _main(self):
current_page = 1
response = self._query(current_page)
total_result = response['totalResults']
total_page = self._calculate_total_page(total_result)
self._data = {**self._data, 'articles': response['articles']}
print('\nTotal number of pages to paginate: {}'.format(total_page))
while current_page < total_page:
current_page += 1
response = self._query(current_page)
self._data['articles'].extend(response['articles'])
self._save()
def _cleanup(self):
self._save()
sys.exit(1)
def execute(self):
try:
self._main()
except KeyboardInterrupt:
print('\nDetected SIGINT!!!')
self._cleanup()
def create_params_from_input(keys):
params = {}
for key in keys:
value = input('Enter a value for "{}": '.format(key))
if value != '':
if key == 'pageSize':
params[key] = int(value)
else:
params[key] = value
return params
if __name__ == '__main__':
if len(sys.argv) != 3:
print('\nUsage: news_api.py <endpoint> <output_filename>')
print('e.g. news_api.py everything test.txt')
exit(1)
endpoint = sys.argv[1]
output_filename = sys.argv[2]
if os.path.exists(output_filename):
print('\nThe file {} already exists. Please provide another filename.'.format(output_filename))
exit(1)
params = {}
if endpoint == 'everything':
keys = ['q',
'sources',
'domains',
'from',
'to',
'language',
'sortBy',
'pageSize']
elif endpoint == 'top-headlines':
keys = ['q',
'sources',
'category',
'language',
'country',
'pageSize']
else:
print('\nYou must specify the endpoint as either everything or top-headlines')
print('e.g. news_api.py everything test.txt')
exit(1)
print('\nConstructing query parameters for /{} endpoint...'.format(endpoint))
print('Documentation (default/available values etc.): https://newsapi.org/docs/endpoints/{}'.format(endpoint))
print('\nPress Enter to use the default value for each parameter.')
params = create_params_from_input(keys)
NewsApi(endpoint, output_filename, **params).execute()