# Copyright 2025 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import zipfile
import tarfile
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
from mlrun.artifacts.base import DirArtifact
from urllib.parse import urlparse
[docs]
def open_archive(
context: MLClientCtx,
archive_url: DataItem,
subdir: str = "content/",
key: str = "content",
target_path: str = None,
):
"""Open a file/object archive into a target directory. Currently, supports zip and tar.gz.
:param context: function execution context
:param archive_url: url of archive file
:param subdir: path within artifact store where extracted files are stored, default is "/content"
:param key: key of archive contents in artifact store
:param target_path: file system path to store extracted files
"""
# Resolves the archive locally
archive_url = archive_url.local()
v3io_subdir = None
# When custom artifact path is defined
if not target_path and context.artifact_path:
parsed_subdir = urlparse(context.artifact_path)
if parsed_subdir.scheme == 's3':
subdir = os.path.join(context.artifact_path, subdir)
elif parsed_subdir.scheme == 'v3io':
v3io_subdir = os.path.join(context.artifact_path, subdir) # Using v3io_subdir for logging
subdir = '/v3io' + parsed_subdir.path + '/' + subdir
context.logger.info(f'Using v3io scheme, extracting to {subdir}')
else:
context.logger.info(f'Unrecognizable scheme, extracting to {subdir}')
# When working on CE, target path might be on s3
if 's3' in (target_path or subdir):
context.logger.info(f'Using s3 scheme, extracting to {target_path or subdir}')
if archive_url.endswith("gz"):
_extract_gz_file(archive_url=archive_url, subdir=subdir, target_path=target_path, in_s3=True)
elif archive_url.endswith("zip"):
_extract_zip_file(archive_url=archive_url, subdir=subdir, target_path=target_path, in_s3=True)
else:
raise ValueError(f"unsupported archive type in {archive_url}")
else:
if archive_url.endswith("gz"):
_extract_gz_file(archive_url=archive_url, subdir=subdir, target_path=target_path)
elif archive_url.endswith("zip"):
_extract_zip_file(archive_url=archive_url, subdir=subdir, target_path=target_path)
else:
raise ValueError(f"unsupported archive type in {archive_url}")
if v3io_subdir:
subdir = v3io_subdir
context.logger.info(f'Logging artifact to {(target_path or subdir)}')
context.log_artifact(DirArtifact(key=key, target_path=(target_path or subdir)))
def _extract_gz_file(archive_url: str, target_path: str = None, subdir: str = "content/", in_s3: bool = False):
if in_s3:
client = _init_boto3_client()
with tarfile.open(archive_url, mode="r|gz") as ref:
for member in ref.getmembers():
data = ref.extractfile(member=member).read()
client.put_object(Body=data, Bucket=urlparse(target_path or subdir).netloc,
Key=f'{urlparse(target_path or subdir).path[1:]}{member.name}')
else:
os.makedirs(target_path or subdir, exist_ok=True)
with tarfile.open(archive_url, mode="r:gz") as ref:
for entry in ref:
# Validate that there is no path traversal in the archive
if os.path.isabs(entry.name) or ".." in entry.name:
raise ValueError(f"Illegal tar archive entry: {entry.name}")
ref.extract(entry, target_path or subdir)
def _extract_zip_file(archive_url, target_path: str = None, subdir: str = "content/", in_s3: bool = False):
if in_s3:
client = _init_boto3_client()
with zipfile.ZipFile(archive_url, "r") as ref:
for filename in ref.namelist():
data = ref.read(filename)
client.put_object(Body=data, Bucket=urlparse(target_path or subdir).netloc,
Key=f'{urlparse(target_path or subdir).path[1:]}{filename}')
else:
with zipfile.ZipFile(archive_url, "r") as ref:
# Validate that there is no path traversal in the archive
for entry in ref.namelist():
if os.path.isabs(entry) or ".." in entry:
raise ValueError(f"Illegal zip archive entry: {entry}")
os.makedirs(target_path or subdir, exist_ok=True)
ref.extractall(target_path or subdir)
def _init_boto3_client():
import boto3
if os.environ.get('S3_ENDPOINT_URL'):
client = boto3.client('s3', endpoint_url=os.environ.get('S3_ENDPOINT_URL'))
else:
client = boto3.client('s3')
return client