Source code for open_archive.open_archive

# Copyright 2025 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import zipfile
import tarfile

from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
from mlrun.artifacts.base import DirArtifact

from urllib.parse import urlparse


[docs] def open_archive( context: MLClientCtx, archive_url: DataItem, subdir: str = "content/", key: str = "content", target_path: str = None, ): """Open a file/object archive into a target directory. Currently, supports zip and tar.gz. :param context: function execution context :param archive_url: url of archive file :param subdir: path within artifact store where extracted files are stored, default is "/content" :param key: key of archive contents in artifact store :param target_path: file system path to store extracted files """ # Resolves the archive locally archive_url = archive_url.local() v3io_subdir = None # When custom artifact path is defined if not target_path and context.artifact_path: parsed_subdir = urlparse(context.artifact_path) if parsed_subdir.scheme == 's3': subdir = os.path.join(context.artifact_path, subdir) elif parsed_subdir.scheme == 'v3io': v3io_subdir = os.path.join(context.artifact_path, subdir) # Using v3io_subdir for logging subdir = '/v3io' + parsed_subdir.path + '/' + subdir context.logger.info(f'Using v3io scheme, extracting to {subdir}') else: context.logger.info(f'Unrecognizable scheme, extracting to {subdir}') # When working on CE, target path might be on s3 if 's3' in (target_path or subdir): context.logger.info(f'Using s3 scheme, extracting to {target_path or subdir}') if archive_url.endswith("gz"): _extract_gz_file(archive_url=archive_url, subdir=subdir, target_path=target_path, in_s3=True) elif archive_url.endswith("zip"): _extract_zip_file(archive_url=archive_url, subdir=subdir, target_path=target_path, in_s3=True) else: raise ValueError(f"unsupported archive type in {archive_url}") else: if archive_url.endswith("gz"): _extract_gz_file(archive_url=archive_url, subdir=subdir, target_path=target_path) elif archive_url.endswith("zip"): _extract_zip_file(archive_url=archive_url, subdir=subdir, target_path=target_path) else: raise ValueError(f"unsupported archive type in {archive_url}") if v3io_subdir: subdir = v3io_subdir context.logger.info(f'Logging artifact to {(target_path or subdir)}') context.log_artifact(DirArtifact(key=key, target_path=(target_path or subdir)))
def _extract_gz_file(archive_url: str, target_path: str = None, subdir: str = "content/", in_s3: bool = False): if in_s3: client = _init_boto3_client() with tarfile.open(archive_url, mode="r|gz") as ref: for member in ref.getmembers(): data = ref.extractfile(member=member).read() client.put_object(Body=data, Bucket=urlparse(target_path or subdir).netloc, Key=f'{urlparse(target_path or subdir).path[1:]}{member.name}') else: os.makedirs(target_path or subdir, exist_ok=True) with tarfile.open(archive_url, mode="r:gz") as ref: for entry in ref: # Validate that there is no path traversal in the archive if os.path.isabs(entry.name) or ".." in entry.name: raise ValueError(f"Illegal tar archive entry: {entry.name}") ref.extract(entry, target_path or subdir) def _extract_zip_file(archive_url, target_path: str = None, subdir: str = "content/", in_s3: bool = False): if in_s3: client = _init_boto3_client() with zipfile.ZipFile(archive_url, "r") as ref: for filename in ref.namelist(): data = ref.read(filename) client.put_object(Body=data, Bucket=urlparse(target_path or subdir).netloc, Key=f'{urlparse(target_path or subdir).path[1:]}{filename}') else: with zipfile.ZipFile(archive_url, "r") as ref: # Validate that there is no path traversal in the archive for entry in ref.namelist(): if os.path.isabs(entry) or ".." in entry: raise ValueError(f"Illegal zip archive entry: {entry}") os.makedirs(target_path or subdir, exist_ok=True) ref.extractall(target_path or subdir) def _init_boto3_client(): import boto3 if os.environ.get('S3_ENDPOINT_URL'): client = boto3.client('s3', endpoint_url=os.environ.get('S3_ENDPOINT_URL')) else: client = boto3.client('s3') return client