Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用自定义域名时 oss2.ObjectIteratorV2prefix 参数无效 #409

Open
Sun-ZhenXing opened this issue Oct 26, 2024 · 1 comment

Comments

@Sun-ZhenXing
Copy link

如题,bucket 使用自定义域名,发现 ObjectIteratorV2 不能正常工作。

@Sun-ZhenXing
Copy link
Author

提供一个 workaround 方法:同时使用 CDN 加速的 OSS 并且使用一个不加速的桶实例。

实际业务代码:

class OSSManager(RemoteFileManager):
    """阿里云 OSS 文件"""

    def __init__(
        self,
        bucket: oss2.Bucket,
        cname_bucket: oss2.Bucket | None = None,
    ) -> None:
        self._bucket = bucket
        # V2 API 目前有许多 BUG,使用 cname_bucket 过渡
        self._cname_bucket = cname_bucket or bucket

    @staticmethod
    def from_config(d: OSSConfigDict) -> "OSSManager":
        """从配置文件中创建 OSSManager"""
        access_key_id = d["access_key_id"]
        access_key_secret = d["access_key_secret"]
        endpoint = d["endpoint"]
        bucket_name = d["bucket_name"]
        cname = d["cname"] if "cname" in d else None

        auth = oss2.Auth(access_key_id, access_key_secret)
        bucket = oss2.Bucket(
            auth=auth,
            endpoint=endpoint,
            bucket_name=bucket_name,
        )
        cname_bucket = oss2.Bucket(
            auth=auth,
            endpoint=cname or endpoint,
            bucket_name=bucket_name,
            is_cname=cname is not None,
        )
        return OSSManager(bucket, cname_bucket)

    def upload_file(
        self,
        filename: str,
        remote_path: str,
        callback: Callable[[int, int], None],
    ) -> bool:
        """上传文件到 OSS"""
        with open(filename, "rb") as f:
            result = self._bucket.put_object(remote_path, f, progress_callback=callback)
        return result.status in range(200, 300)

    def download_file(
        self,
        remote_path: str,
        filename: str,
        callback: Callable[[int, int], None] | None = None,
    ) -> bool:
        """下载文件"""
        result = self._cname_bucket.get_object_to_file(
            remote_path,
            filename,
            progress_callback=callback,
        )
        return result.status in range(200, 300)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant