Skip to content

Commit

Permalink
[Storage] Show logs for storage mount (#4387)
Browse files Browse the repository at this point in the history
* commit for logging change

* logger for storage

* grammar

* fix format

* better comment

* resolve copilot review

* resolve PR comment

* remove unuse var

* Update sky/data/data_utils.py

Co-authored-by: Romil Bhardwaj <[email protected]>

* resolve PR comment

* update comment for get_run_timestamp

* rename backend_util.get_run_timestamp to sky_logging.get_run_timestamp

---------

Co-authored-by: Romil Bhardwaj <[email protected]>
  • Loading branch information
zpoint and romilbhardwaj authored Dec 24, 2024
1 parent 65cca37 commit 6b62570
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 57 deletions.
4 changes: 0 additions & 4 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,10 +1019,6 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str):
common_utils.dump_yaml(cluster_config_file, config)


def get_run_timestamp() -> str:
return 'sky-' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')


def get_timestamp_from_run_timestamp(run_timestamp: str) -> float:
return datetime.strptime(
run_timestamp.partition('-')[2], '%Y-%m-%d-%H-%M-%S-%f').timestamp()
Expand Down
2 changes: 1 addition & 1 deletion sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2599,7 +2599,7 @@ class CloudVmRayBackend(backends.Backend['CloudVmRayResourceHandle']):
ResourceHandle = CloudVmRayResourceHandle # pylint: disable=invalid-name

def __init__(self):
self.run_timestamp = backend_utils.get_run_timestamp()
self.run_timestamp = sky_logging.get_run_timestamp()
# NOTE: do not expanduser() here, as this '~/...' path is used for
# remote as well to be expanded on the remote side.
self.log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY,
Expand Down
2 changes: 1 addition & 1 deletion sky/benchmark/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def launch_benchmark_clusters(benchmark: str, clusters: List[str],
for yaml_fd, cluster in zip(yaml_fds, clusters)]

# Save stdout/stderr from cluster launches.
run_timestamp = backend_utils.get_run_timestamp()
run_timestamp = sky_logging.get_run_timestamp()
log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp)
log_dir = os.path.expanduser(log_dir)
logger.info(
Expand Down
8 changes: 4 additions & 4 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ class _NaturalOrderGroup(click.Group):
Reference: https://github.com/pallets/click/issues/513
"""

def list_commands(self, ctx):
def list_commands(self, ctx): # pylint: disable=unused-argument
return self.commands.keys()

@usage_lib.entrypoint('sky.cli', fallback=True)
Expand Down Expand Up @@ -5286,7 +5286,7 @@ def _deploy_local_cluster(gpus: bool):
run_command = shlex.split(run_command)

# Setup logging paths
run_timestamp = backend_utils.get_run_timestamp()
run_timestamp = sky_logging.get_run_timestamp()
log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp,
'local_up.log')
tail_cmd = 'tail -n100 -f ' + log_path
Expand Down Expand Up @@ -5400,7 +5400,7 @@ def _deploy_remote_cluster(ip_file: str, ssh_user: str, ssh_key_path: str,
deploy_command = shlex.split(deploy_command)

# Setup logging paths
run_timestamp = backend_utils.get_run_timestamp()
run_timestamp = sky_logging.get_run_timestamp()
log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp,
'local_up.log')
tail_cmd = 'tail -n100 -f ' + log_path
Expand Down Expand Up @@ -5515,7 +5515,7 @@ def local_down():
run_command = shlex.split(down_script_path)

# Setup logging paths
run_timestamp = backend_utils.get_run_timestamp()
run_timestamp = sky_logging.get_run_timestamp()
log_path = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp,
'local_down.log')
tail_cmd = 'tail -n100 -f ' + log_path
Expand Down
57 changes: 25 additions & 32 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sky.adaptors import cloudflare
from sky.adaptors import gcp
from sky.adaptors import ibm
from sky.skylet import log_lib
from sky.utils import common_utils
from sky.utils import ux_utils

Expand Down Expand Up @@ -430,6 +431,7 @@ def _group_files_by_dir(
def parallel_upload(source_path_list: List[str],
filesync_command_generator: Callable[[str, List[str]], str],
dirsync_command_generator: Callable[[str, str], str],
log_path: str,
bucket_name: str,
access_denied_message: str,
create_dirs: bool = False,
Expand All @@ -445,6 +447,7 @@ def parallel_upload(source_path_list: List[str],
for a list of files belonging to the same dir.
dirsync_command_generator: Callable that generates rsync command
for a directory.
log_path: Path to the log file.
access_denied_message: Message to intercept from the underlying
upload utility when permissions are insufficient. Used in
exception handling.
Expand Down Expand Up @@ -477,7 +480,7 @@ def parallel_upload(source_path_list: List[str],
p.starmap(
run_upload_cli,
zip(commands, [access_denied_message] * len(commands),
[bucket_name] * len(commands)))
[bucket_name] * len(commands), [log_path] * len(commands)))


def get_gsutil_command() -> Tuple[str, str]:
Expand Down Expand Up @@ -518,37 +521,27 @@ def get_gsutil_command() -> Tuple[str, str]:
return gsutil_alias, alias_gen


def run_upload_cli(command: str, access_denied_message: str, bucket_name: str):
# TODO(zhwu): Use log_lib.run_with_log() and redirect the output
# to a log file.
with subprocess.Popen(command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
shell=True) as process:
stderr = []
assert process.stderr is not None # for mypy
while True:
line = process.stderr.readline()
if not line:
break
str_line = line.decode('utf-8')
stderr.append(str_line)
if access_denied_message in str_line:
process.kill()
with ux_utils.print_exception_no_traceback():
raise PermissionError(
'Failed to upload files to '
'the remote bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
returncode = process.wait()
if returncode != 0:
stderr_str = '\n'.join(stderr)
with ux_utils.print_exception_no_traceback():
logger.error(stderr_str)
raise exceptions.StorageUploadError(
f'Upload to bucket failed for store {bucket_name}. '
'Please check the logs.')
def run_upload_cli(command: str, access_denied_message: str, bucket_name: str,
log_path: str):
returncode, stdout, stderr = log_lib.run_with_log(command,
log_path,
shell=True,
require_outputs=True)
if access_denied_message in stderr:
with ux_utils.print_exception_no_traceback():
raise PermissionError('Failed to upload files to '
'the remote bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
if returncode != 0:
with ux_utils.print_exception_no_traceback():
logger.error(stderr)
raise exceptions.StorageUploadError(
f'Upload to bucket failed for store {bucket_name}. '
f'Please check the logs: {log_path}')
if not stdout:
logger.debug('No file uploaded. This could be due to an error or '
'because all files already exist on the cloud.')


def get_cos_regions() -> List[str]:
Expand Down
71 changes: 57 additions & 14 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
'Bucket {bucket_name!r} does not exist. '
'It may have been deleted externally.')

_STORAGE_LOG_FILE_NAME = 'storage_sync.log'


def get_cached_enabled_storage_clouds_or_refresh(
raise_if_no_cloud_access: bool = False) -> List[str]:
Expand Down Expand Up @@ -1331,17 +1333,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> s3://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f's3://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_s3(self) -> None:
assert isinstance(self.source, str), self.source
Expand Down Expand Up @@ -1741,13 +1750,19 @@ def batch_gsutil_cp(self,
gsutil_alias, alias_gen = data_utils.get_gsutil_command()
sync_command = (f'{alias_gen}; echo "{copy_list}" | {gsutil_alias} '
f'cp -e -n -r -I gs://{self.name}')

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> gs://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'gs://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.run_upload_cli(sync_command,
self._ACCESS_DENIED_MESSAGE,
bucket_name=self.name)
bucket_name=self.name,
log_path=log_path)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def batch_gsutil_rsync(self,
source_path_list: List[Path],
Expand Down Expand Up @@ -1797,17 +1812,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> gs://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'gs://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_gcs(self) -> None:
if isinstance(self.source, str) and self.source.startswith('s3://'):
Expand Down Expand Up @@ -2535,17 +2557,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name) -> str:
container_endpoint = data_utils.AZURE_CONTAINER_URL.format(
storage_account_name=self.storage_account_name,
container_name=self.name)
log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> {container_endpoint}/'
with rich_utils.safe_status(
ux_utils.spinner_message(
f'Syncing {source_message} -> {container_endpoint}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _get_bucket(self) -> Tuple[str, bool]:
"""Obtains the AZ Container.
Expand Down Expand Up @@ -2938,17 +2967,24 @@ def get_dir_sync_command(src_dir_path, dest_dir_name):
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> r2://{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(
f'Syncing {source_message} -> r2://{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _transfer_to_r2(self) -> None:
assert isinstance(self.source, str), self.source
Expand Down Expand Up @@ -3379,17 +3415,24 @@ def get_file_sync_command(base_dir_path, file_names) -> str:
else:
source_message = source_path_list[0]

log_path = sky_logging.generate_tmp_logging_file_path(
_STORAGE_LOG_FILE_NAME)
sync_path = f'{source_message} -> cos://{self.region}/{self.name}/'
with rich_utils.safe_status(
ux_utils.spinner_message(f'Syncing {source_message} -> '
f'cos://{self.region}/{self.name}/')):
ux_utils.spinner_message(f'Syncing {sync_path}',
log_path=log_path)):
data_utils.parallel_upload(
source_path_list,
get_file_sync_command,
get_dir_sync_command,
log_path,
self.name,
self._ACCESS_DENIED_MESSAGE,
create_dirs=create_dirs,
max_concurrent_uploads=_MAX_CONCURRENT_UPLOADS)
logger.info(
ux_utils.finishing_message(f'Storage synced: {sync_path}',
log_path))

def _get_bucket(self) -> Tuple[StorageHandle, bool]:
"""returns IBM COS bucket object if exists, otherwise creates it.
Expand Down
18 changes: 17 additions & 1 deletion sky/sky_logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Logging utilities."""
import builtins
import contextlib
from datetime import datetime
import logging
import os
import sys
import threading

import colorama

from sky.skylet import constants
from sky.utils import env_options
from sky.utils import rich_utils

Expand Down Expand Up @@ -113,7 +116,7 @@ def reload_logger():
_setup_logger()


def init_logger(name: str):
def init_logger(name: str) -> logging.Logger:
return logging.getLogger(name)


Expand Down Expand Up @@ -161,3 +164,16 @@ def is_silent():
# threads.
_logging_config.is_silent = False
return _logging_config.is_silent


def get_run_timestamp() -> str:
return 'sky-' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')


def generate_tmp_logging_file_path(file_name: str) -> str:
"""Generate an absolute path of a tmp file for logging."""
run_timestamp = get_run_timestamp()
log_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, run_timestamp)
log_path = os.path.expanduser(os.path.join(log_dir, file_name))

return log_path

0 comments on commit 6b62570

Please sign in to comment.