-
Notifications
You must be signed in to change notification settings - Fork 4
/
util_file.py
180 lines (142 loc) · 4.29 KB
/
util_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import re
import logging
import datetime
import paramiko
import config
logger = logging.getLogger('olmo.util_file')
def change_dir(filepath, new_dir):
'''
Changes a file path to have same file name, but be in new dir
Parameters
----------
filepath : str
Returns
-------
str
'''
filename = os.path.split(filepath)[-1]
return os.path.join(new_dir, filename)
def add_timestring(filepath, timestring):
'''
Appends a time_stamp to a filename (before the extension).
Parameters
----------
filepath : str
Returns
-------
str
'''
if '.' in filepath:
name, ext = filepath.rsplit('.', 1)
return name + '_' + timestring + '.' + ext
else:
return filepath + '_' + timestring
def remove_timestring(filepath):
'''
Removes a timestring from the end of a filename.
This function is the assumed pair of the 'util_file.add_timestring()'.
Parameters
----------
filepath : str
Returns
-------
str
'''
if '.' in filepath:
name, ext = filepath.rsplit('.', 1)
base_name, timestring = name.rsplit('_', 1)
error_msg = f"Completion flag date, {timestring}, conatains non ints."
assert not re.findall("[^0-9]", timestring, re.MULTILINE), error_msg
return base_name + '.' + ext
else:
base_name, timestring = filepath.rsplit('_', 1)
error_msg = f"Completion flag date, {timestring}, conatains non ints."
assert not re.findall("[^0-9]", timestring, re.MULTILINE), error_msg
return base_name
def ls_remote(user, machine, directory, port=22):
'''
Perform 'ls' over ssh onto linux machine.
Parameters
----------
user : str
machine : str
IP address of the maching you will connect to.
directory : str
port : int, default 22
Returns
-------
str
'''
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(machine, username=user, port=port)
command = f"ls {directory}"
stdin, stdout, stderr = ssh.exec_command(command)
stdout = stdout.read().decode(errors='ignore'), stderr.read().decode(errors='ignore')
return stdout
def find_remote(user, machine, directory, search, port=22):
'''
Perform 'find {directory} -name '{search}'"' over ssh onto linux machine.
Note this returns the full file path, not relative to 'directory'.
Parameters
----------
user : str
machine : str
IP address of the maching you will connect to.
directory : str
search : str
port : int, default 22
Returns
-------
str
'''
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(machine, username=user, port=port)
command = f"find {directory} -name '{search}'"
stdin, stdout, stderr = ssh.exec_command(command)
stdout = stdout.read().decode(errors='ignore'), stderr.read().decode(errors='ignore')
return stdout
def get_user_pwd(file):
'''
Get user and pwd from a "credentials file".
File is expect to be formatted like this:
USER=good_username
PWD=12345password
Parameters
----------
file : str
Full path to credential file.
Returns
-------
str, str
'''
with open(file, 'r') as f:
user_line = f.readline().rstrip('\n')
pwd_line = f.readline().rstrip('\n')
assert user_line[:5] == 'USER=', f"Credentials file {file} not correct format."
assert pwd_line[:4] == 'PWD=', f"Credentials file {file} not correct format."
user = user_line[5:]
pwd = pwd_line[4:]
return user, pwd
def init_logger(logfile, name='olmo'):
'''
Define the logger object for logging.
Parameters
----------
logfile : str
Full path of the output log file.
name : str
Name of the logger, used by the logging library.
Returns
-------
logger."logging-object"
'''
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
fh = logging.FileHandler(os.path.join(
config.output_dir, logfile + datetime.datetime.now().strftime('%Y%m%d')), 'a+')
fh.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(fh)
return logger