forked from kbairak/transifex-api-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
177 lines (129 loc) · 5.22 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import time
import jsonapi
from jsonapi.exceptions import JsonApiException
class TransifexApi(jsonapi.JsonApi):
HOST = "https://rest.api.transifex.com"
@TransifexApi.register
class Organization(jsonapi.Resource):
TYPE = "organizations"
@TransifexApi.register
class Team(jsonapi.Resource):
TYPE = "teams"
@TransifexApi.register
class Project(jsonapi.Resource):
TYPE = "projects"
@TransifexApi.register
class Language(jsonapi.Resource):
TYPE = "languages"
@TransifexApi.register
class Resource(jsonapi.Resource):
TYPE = "resources"
def purge(self):
count = 0
# Instead of filter, if Resource had a plural relationship to
# ResourceString, we could do `self.fetch('resource_strings')`
for page in list(ResourceString.filter(resource=self).all_pages()):
count += len(page)
ResourceString.bulk_delete(page)
return count
@TransifexApi.register
class ResourceString(jsonapi.Resource):
TYPE = "resource_strings"
@TransifexApi.register
class ResourceTranslation(jsonapi.Resource):
TYPE = "resource_translations"
EDITABLE = ["strings", 'reviewed', "proofread"]
@TransifexApi.register
class ResourceStringsAsyncUpload(jsonapi.Resource):
TYPE = "resource_strings_async_uploads"
@classmethod
def upload(cls, resource, content, interval=5):
""" Upload source content with multipart/form-data.
:param resource: A (transifex) Resource instance or ID
:param content: A string or file-like object
:param interval: How often (in seconds) to poll for the completion
of the upload job
"""
if isinstance(resource, Resource):
resource = resource.id
upload = cls.create_with_form(data={'resource': resource},
files={'content': content})
while True:
if hasattr(upload, 'errors') and len(upload.errors) > 0:
errors = [{
'code': e['code'],
'detail': e['detail'],
'title': e['detail'],
'status': '409'} for e in upload.errors]
raise JsonApiException(409, errors)
if upload.redirect:
return upload.follow()
if (hasattr(upload, 'attributes')
and upload.attributes.get("details")):
return upload.attributes.get("details")
time.sleep(interval)
upload.reload()
@TransifexApi.register
class ResourceTranslationsAsyncUpload(Resource):
TYPE = "resource_translations_async_uploads"
@classmethod
def upload(cls, resource, content, language, interval=5,
file_type='default'):
""" Upload translation content with multipart/form-data.
:param resource: A (transifex) Resource instance or ID
:param content: A string or file-like object
:param language: A (transifex) Language instance or ID
:param interval: How often (in seconds) to poll for the completion
of the upload job
:param file_type: The content file type
"""
if isinstance(resource, Resource):
resource = resource.id
upload = cls.create_with_form(data={'resource': resource,
'language': language,
'file_type': file_type},
files={'content': content})
while True:
if hasattr(upload, 'errors') and len(upload.errors) > 0:
errors = [{
'code': e['code'],
'detail': e['detail'],
'title': e['detail'],
'status': '409'} for e in upload.errors]
raise JsonApiException(409, errors)
if upload.redirect:
return upload.follow()
if (hasattr(upload, 'attributes')
and upload.attributes.get("details")):
return upload.attributes.get("details")
time.sleep(interval)
upload.reload()
@TransifexApi.register
class User(jsonapi.Resource):
TYPE = "users"
@TransifexApi.register
class TeamMembership(jsonapi.Resource):
TYPE = "team_memberships"
@TransifexApi.register
class ResourceLanguageStats(jsonapi.Resource):
TYPE = "resource_language_stats"
@TransifexApi.register
class ResourceTranslationsAsyncDownload(jsonapi.Resource):
TYPE = "resource_translations_async_downloads"
@classmethod
def download(cls, interval=5, *args, **kwargs):
download = cls.create(*args, **kwargs)
while True:
if hasattr(download, 'errors') and len(download.errors) > 0:
errors = [{'code': e['code'],
'detail': e['detail'],
'title': e['detail'],
'status': '409'}
for e in download.errors]
raise JsonApiException(409, errors)
if download.redirect:
return download.redirect
time.sleep(interval)
download.reload()
# This is our global object
transifex_api = TransifexApi()