Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCI] Support OCI Object Storage #4501

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
35 changes: 35 additions & 0 deletions examples/oci/dataset-mount.yaml
HysunHe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: cpu-task1

resources:
cloud: oci
region: us-sanjose-1
cpus: 2
disk_size: 256
disk_tier: medium
use_spot: False

file_mounts:
# Mount an existing oci bucket
/datasets-storage:
source: oci://skybucket
mode: MOUNT # Either MOUNT or COPY. Optional.

# Working directory (optional) containing the project codebase.
# Its contents are synced to ~/sky_workdir/ on the cluster.
workdir: .

num_nodes: 1

# Typical use: pip install -r requirements.txt
# Invoked under the workdir (i.e., can use its files).
setup: |
echo "*** Running setup for the task. ***"

# Typical use: make use of resources, such as running training.
# Invoked under the workdir (i.e., can use its files).
run: |
echo "*** Running the task on OCI ***"
timestamp=$(date +%s)
ls -lthr /datasets-storage
echo "hi" >> /datasets-storage/foo.txt
ls -lthr /datasets-storage
36 changes: 36 additions & 0 deletions examples/oci/dataset-upload-and-mount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: cpu-task1

resources:
cloud: oci
region: us-sanjose-1
cpus: 2
disk_size: 256
disk_tier: medium
use_spot: False

file_mounts:
/datasets-storage:
name: skybucket # Name of storage, optional when source is bucket URI
source: ['./examples/oci'] # Source path, can be local or bucket URL. Optional, do not specify to create an empty bucket.
store: oci # E.g 'oci', 's3', 'gcs'...; default: None. Optional.
persistent: True # Defaults to True; can be set to false. Optional.
mode: MOUNT # Either MOUNT or COPY. Optional.

# Working directory (optional) containing the project codebase.
# Its contents are synced to ~/sky_workdir/ on the cluster.
workdir: .

num_nodes: 1

# Typical use: pip install -r requirements.txt
# Invoked under the workdir (i.e., can use its files).
setup: |
echo "*** Running setup for the task. ***"

# Typical use: make use of resources, such as running training.
# Invoked under the workdir (i.e., can use its files).
run: |
echo "*** Running the task on OCI ***"
ls -lthr /datasets-storage
echo "hi" >> /datasets-storage/foo.txt
ls -lthr /datasets-storage
26 changes: 26 additions & 0 deletions examples/oci/oci-mounts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
resources:
cloud: oci

file_mounts:
~/tmpfile: ~/tmpfile
~/a/b/c/tmpfile: ~/tmpfile
/tmp/workdir: ~/tmp-workdir

/mydir:
name: skybucket
source: ['~/tmp-workdir']
store: oci
mode: MOUNT

setup: |
echo "*** Setup ***"

run: |
echo "*** Run ***"

ls -lthr ~/tmpfile
ls -lthr ~/a/b/c
echo hi >> /tmp/workdir/new_file
ls -lthr /tmp/workdir

ls -lthr /mydir
33 changes: 32 additions & 1 deletion sky/adaptors/oci.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Oracle OCI cloud adaptor"""

import functools
import logging
import os

from sky.adaptors import common
from sky.clouds.utils import oci_utils

# Suppress OCI circuit breaker logging before lazy import, because
# oci modules prints additional message during imports, i.e., the
Expand All @@ -30,10 +32,16 @@ def get_config_file() -> str:

def get_oci_config(region=None, profile='DEFAULT'):
conf_file_path = get_config_file()
if not profile or profile == 'DEFAULT':
config_profile = oci_utils.oci_config.get_profile()
else:
config_profile = profile

oci_config = oci.config.from_file(file_location=conf_file_path,
profile_name=profile)
profile_name=config_profile)
if region is not None:
oci_config['region'] = region

return oci_config


Expand All @@ -54,6 +62,29 @@ def get_identity_client(region=None, profile='DEFAULT'):
return oci.identity.IdentityClient(get_oci_config(region, profile))


def get_object_storage_client(region=None, profile='DEFAULT'):
return oci.object_storage.ObjectStorageClient(
get_oci_config(region, profile))


def service_exception():
"""OCI service exception."""
return oci.exceptions.ServiceError


def with_oci_env(f):

@functools.wraps(f)
def wrapper(*args, **kwargs):
# pylint: disable=line-too-long
enter_env_cmds = [
'conda info --envs | grep "sky-oci-cli-env" || conda create -n sky-oci-cli-env python=3.10 -y',
'. $(conda info --base 2> /dev/null)/etc/profile.d/conda.sh > /dev/null 2>&1 || true',
'conda activate sky-oci-cli-env', 'pip install oci-cli',
'export OCI_CLI_SUPPRESS_FILE_PERMISSIONS_WARNING=True'
]
operation_cmd = [f(*args, **kwargs)]
leave_env_cmds = ['conda deactivate']
return ' && '.join(enter_env_cmds + operation_cmd + leave_env_cmds)

return wrapper
61 changes: 61 additions & 0 deletions sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Better interface.
* Better implementation (e.g., fsspec, smart_open, using each cloud's SDK).
"""
import os
import shlex
import subprocess
import time
Expand All @@ -18,6 +19,7 @@
from sky.adaptors import azure
from sky.adaptors import cloudflare
from sky.adaptors import ibm
from sky.adaptors import oci
from sky.clouds import gcp
from sky.data import data_utils
from sky.data.data_utils import Rclone
Expand Down Expand Up @@ -470,6 +472,64 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return self.make_sync_dir_command(source, destination)


class OciCloudStorage(CloudStorage):
"""OCI Cloud Storage."""

def is_directory(self, url: str) -> bool:
"""Returns whether OCI 'url' is a directory.
In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
bucket_name, path = data_utils.split_oci_path(url)

client = oci.get_object_storage_client()
namespace = client.get_namespace(
compartment_id=oci.get_oci_config()['tenancy']).data

objects = client.list_objects(namespace_name=namespace,
bucket_name=bucket_name,
prefix=path).data.objects

if len(objects) == 0:
# A directory with few or no items
return True

if len(objects) > 1:
# A directory with more than 1 items
return True

object_name = objects[0].name
if path.endswith(object_name):
# An object path
return False

# A directory with only 1 item
return True

@oci.with_oci_env
def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using OCI CLI."""
bucket_name, path = data_utils.split_oci_path(source)

download_via_ocicli = (f'oci os object sync --no-follow-symlinks '
f'--bucket-name {bucket_name} '
f'--prefix "{path}" --dest-dir "{destination}"')

return download_via_ocicli

@oci.with_oci_env
def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using OCI CLI."""
bucket_name, path = data_utils.split_oci_path(source)
filename = os.path.basename(path)
destination = os.path.join(destination, filename)

download_via_ocicli = (f'oci os object get --bucket-name {bucket_name} '
f'--name "{path}" --file "{destination}"')

return download_via_ocicli


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)
Expand All @@ -485,6 +545,7 @@ def get_storage_from_path(url: str) -> CloudStorage:
's3': S3CloudStorage(),
'r2': R2CloudStorage(),
'cos': IBMCosCloudStorage(),
'oci': OciCloudStorage(),
# TODO: This is a hack, as Azure URL starts with https://, we should
# refactor the registry to be able to take regex, so that Azure blob can
# be identified with `https://(.*?)\.blob\.core\.windows\.net`
Expand Down
37 changes: 37 additions & 0 deletions sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,40 @@ def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:
bucket.set_iam_policy(policy)

logger.debug(f'Added {member} with role {role} to {bucket_name}.')


def s3_to_oci(s3_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to OCI Object Storage.
Args:
s3_bucket_name: str; Name of the Amazon S3 Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
# TODO(HysunHe): Implement sync with other clouds (s3, gs)
raise NotImplementedError('Moving data directly from S3 to OCI bucket '
'is currently not supported. Please specify '
'a local source for the storage object.')


def gcs_to_oci(gs_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to
OCI Object Storage.
Args:
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
# TODO(HysunHe): Implement sync with other clouds (s3, gs)
raise NotImplementedError('Moving data directly from GCS to OCI bucket '
'is currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_oci(r2_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Cloudflare R2 to OCI Bucket.
Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
raise NotImplementedError(
'Moving data directly from Cloudflare R2 to OCI '
'bucket is currently not supported. Please specify '
'a local source for the storage object.')
11 changes: 11 additions & 0 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,14 @@ def _remove_bucket_profile_rclone(bucket_name: str,
lines_to_keep.append(line)

return lines_to_keep


def split_oci_path(oci_path: str) -> Tuple[str, str]:
"""Splits OCI Path into Bucket name and Relative Path to Bucket
Args:
oci_path: str; OCI Path, e.g. oci://imagenet/train/
"""
path_parts = oci_path.replace('oci://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key
43 changes: 43 additions & 0 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
_BLOBFUSE_CACHE_ROOT_DIR = '~/.sky/blobfuse2_cache'
_BLOBFUSE_CACHE_DIR = ('~/.sky/blobfuse2_cache/'
'{storage_account_name}_{container_name}')
RCLONE_VERSION = 'v1.68.2'


def get_s3_mount_install_cmd() -> str:
Expand Down Expand Up @@ -158,6 +159,48 @@ def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
return mount_cmd


def get_rclone_install_cmd() -> str:
""" RClone installation for both apt-get and rpm.
This would be common command.
"""
# pylint: disable=line-too-long
install_cmd = (
f'(which dpkg > /dev/null 2>&1 && (which rclone > /dev/null || (cd ~ > /dev/null'
f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.deb'
f' && sudo dpkg -i rclone-{RCLONE_VERSION}-linux-amd64.deb'
f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.deb)))'
f' || (which rclone > /dev/null || (cd ~ > /dev/null'
f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.rpm'
f' && sudo yum --nogpgcheck install rclone-{RCLONE_VERSION}-linux-amd64.rpm -y'
f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.rpm))')
return install_cmd


def get_oci_mount_cmd(mount_path: str, store_name: str, region: str,
namespace: str, compartment: str, config_file: str,
config_profile: str) -> str:
""" OCI specific RClone mount command for oci object storage. """
# pylint: disable=line-too-long
mount_cmd = (
f'sudo chown -R `whoami` {mount_path}'
f' && rclone config create oos_{store_name} oracleobjectstorage'
f' provider user_principal_auth namespace {namespace}'
f' compartment {compartment} region {region}'
f' oci-config-file {config_file}'
f' oci-config-profile {config_profile}'
f' && sed -i "s/oci-config-file/config_file/g;'
f' s/oci-config-profile/config_profile/g" ~/.config/rclone/rclone.conf'
f' && ([ ! -f /bin/fusermount3 ] && sudo ln -s /bin/fusermount /bin/fusermount3 || true)'
f' && (grep -q {mount_path} /proc/mounts || rclone mount oos_{store_name}:{store_name} {mount_path} --daemon --allow-non-empty)'
)
return mount_cmd


def get_rclone_version_check_cmd() -> str:
""" RClone version check. This would be common command. """
return f'rclone --version | grep -q {RCLONE_VERSION}'


def _get_mount_binary(mount_cmd: str) -> str:
"""Returns mounting binary in string given as the mount command.

Expand Down
Loading
Loading