22# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
33#
44
5- from typing import Iterator , Tuple
5+ from sys import maxsize
6+ from typing import Iterator , Tuple , Optional
67
78from aistore .sdk .obj .content_iterator import ContentIterator
89from aistore .sdk .obj .obj_file .errors import ObjectFileReaderMaxResumeError
1112logger = get_logger (__name__ )
1213
1314
15+ def compute_loop_size (size : int ) -> int :
16+ """
17+ Compute the size of the loop for reading data and return it.
18+ If requested read size is -1, return `sys.maxsize` to loop until `StopIteration`.
19+
20+ Args:
21+ size (int): The requested size to be read.
22+
23+ Returns:
24+ int: The size for the loop.
25+
26+ """
27+ return maxsize if size == - 1 else size
28+
29+
30+ def get_iterator (
31+ content_iterator : ContentIterator , resume_position : int
32+ ) -> Optional [Iterator [bytes ]]:
33+ """
34+ Create a new iterator from the content iterator starting at the specified byte position.
35+ Returns None if the object is not cached.
36+
37+ Args:
38+ content_iterator (ContentIterator): The content iterator used to read the data.
39+ resume_position (int): The byte position from which to resume reading.
40+
41+ Returns:
42+ Optional[Iterator[bytes]]: A new iterator starting from the specified byte position.
43+ None if the object is not cached in the bucket.
44+ """
45+ # If remote object is not cached, start over
46+ if not content_iterator .client .head ().present :
47+ return None
48+ # Otherwise, resume from last known position
49+ else :
50+ return content_iterator .iter (offset = resume_position )
51+
52+
1453def increment_resume (resume_total : int , max_resume : int , err : Exception ) -> int :
1554 """
1655 Increment the number of resume attempts and raise an error if the maximum allowed is exceeded.
@@ -38,7 +77,7 @@ def handle_broken_stream(
3877 resume_total : int ,
3978 max_resume : int ,
4079 err : Exception ,
41- ) -> Tuple [Iterator [bytes ], int ]:
80+ ) -> Tuple [Optional [ Iterator [bytes ] ], int ]:
4281 """
4382 Handle the broken stream/iterator by incrementing the resume count, logging a warning,
4483 and returning a newly instanatiated iterator from the last known position.
@@ -51,7 +90,8 @@ def handle_broken_stream(
5190 err (Exception): The error that caused the resume attempt.
5291
5392 Returns:
54- Tuple[Iterator[bytes], int]: The new iterator and the updated resume total.
93+ Optional[Iterator[bytes]]: The new iterator. None if the object is not cached.
94+ int: The updated number of resume attempts.
5595
5696 Raises:
5797 ObjectFileReaderMaxResumeError: If the maximum number of resume attempts is exceeded.
@@ -66,6 +106,8 @@ def handle_broken_stream(
66106 exc_info = err ,
67107 )
68108
69- # Create a new iterator from the last read position
70- new_iter = content_iterator .iter (offset = resume_position )
109+ new_iter = get_iterator (
110+ content_iterator = content_iterator , resume_position = resume_position
111+ )
112+
71113 return new_iter , resume_total
0 commit comments