forked from python/cpython
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path__init__.py
More file actions
251 lines (193 loc) · 8 KB
/
__init__.py
File metadata and controls
251 lines (193 loc) · 8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""Python bindings to Zstandard (zstd) compression library, the API style is
similar to Python's bz2/lzma/zlib modules.
Command line interface of this module: python -m pyzstd --help
"""
__all__ = ( # From this file
"compressionLevel_values",
"get_frame_info",
"CParameter",
"DParameter",
"Strategy",
"finalize_dict",
"train_dict",
"zstd_support_multithread",
# From _zstd
"ZstdCompressor",
"ZstdDecompressor",
"ZstdDict",
"ZstdError",
"compress",
"decompress",
"get_frame_size",
"zstd_version",
"zstd_version_info",
# From zstd.zstdfile
"open",
"ZstdFile",
)
from collections import namedtuple
from enum import IntEnum
from functools import lru_cache
from _zstd import *
import _zstd
from .zstdfile import ZstdFile, open
_ZSTD_CStreamSizes = _zstd._ZSTD_CStreamSizes
_ZSTD_DStreamSizes = _zstd._ZSTD_DStreamSizes
_train_dict = _zstd._train_dict
_finalize_dict = _zstd._finalize_dict
# compressionLevel_values
_nt_values = namedtuple("values", ["default", "min", "max"])
compressionLevel_values = _nt_values(*_zstd._compressionLevel_values)
_nt_frame_info = namedtuple("frame_info", ["decompressed_size", "dictionary_id"])
def get_frame_info(frame_buffer):
"""Get zstd frame information from a frame header.
Parameter
frame_buffer: A bytes-like object. It should starts from the beginning of
a frame, and needs to include at least the frame header (6 to
18 bytes).
Return a two-items namedtuple: (decompressed_size, dictionary_id)
If decompressed_size is None, decompressed size is unknown.
dictionary_id is a 32-bit unsigned integer value. 0 means dictionary ID was
not recorded in the frame header, the frame may or may not need a dictionary
to be decoded, and the ID of such a dictionary is not specified.
It's possible to append more items to the namedtuple in the future."""
ret_tuple = _zstd._get_frame_info(frame_buffer)
return _nt_frame_info(*ret_tuple)
def _nbytes(dat):
if isinstance(dat, (bytes, bytearray)):
return len(dat)
with memoryview(dat) as mv:
return mv.nbytes
def train_dict(samples, dict_size):
"""Train a zstd dictionary, return a ZstdDict object.
Parameters
samples: An iterable of samples, a sample is a bytes-like object
represents a file.
dict_size: The dictionary's maximum size, in bytes.
"""
# Check argument's type
if not isinstance(dict_size, int):
raise TypeError('dict_size argument should be an int object.')
# Prepare data
chunks = []
chunk_sizes = []
for chunk in samples:
chunks.append(chunk)
chunk_sizes.append(_nbytes(chunk))
chunks = b''.join(chunks)
if not chunks:
raise ValueError("The samples are empty content, can't train dictionary.")
# samples_bytes: samples be stored concatenated in a single flat buffer.
# samples_size_list: a list of each sample's size.
# dict_size: size of the dictionary, in bytes.
dict_content = _train_dict(chunks, chunk_sizes, dict_size)
return ZstdDict(dict_content)
def finalize_dict(zstd_dict, samples, dict_size, level):
"""Finalize a zstd dictionary, return a ZstdDict object.
Given a custom content as a basis for dictionary, and a set of samples,
finalize dictionary by adding headers and statistics according to the zstd
dictionary format.
You may compose an effective dictionary content by hand, which is used as
basis dictionary, and use some samples to finalize a dictionary. The basis
dictionary can be a "raw content" dictionary, see is_raw parameter in
ZstdDict.__init__ method.
Parameters
zstd_dict: A ZstdDict object, basis dictionary.
samples: An iterable of samples, a sample is a bytes-like object
represents a file.
dict_size: The dictionary's maximum size, in bytes.
level: The compression level expected to use in production. The
statistics for each compression level differ, so tuning the
dictionary for the compression level can help quite a bit.
"""
if zstd_version_info < (1, 4, 5):
msg = ("This function only available when the underlying zstd "
"library's version is greater than or equal to v1.4.5, "
"the current underlying zstd library's version is v%s.") % zstd_version
raise NotImplementedError(msg)
# Check arguments' type
if not isinstance(zstd_dict, ZstdDict):
raise TypeError('zstd_dict argument should be a ZstdDict object.')
if not isinstance(dict_size, int):
raise TypeError('dict_size argument should be an int object.')
if not isinstance(level, int):
raise TypeError('level argument should be an int object.')
# Prepare data
chunks = []
chunk_sizes = []
for chunk in samples:
chunks.append(chunk)
chunk_sizes.append(_nbytes(chunk))
chunks = b''.join(chunks)
if not chunks:
raise ValueError("The samples are empty content, can't finalize dictionary.")
# custom_dict_bytes: existing dictionary.
# samples_bytes: samples be stored concatenated in a single flat buffer.
# samples_size_list: a list of each sample's size.
# dict_size: maximal size of the dictionary, in bytes.
# compression_level: compression level expected to use in production.
dict_content = _finalize_dict(zstd_dict.dict_content,
chunks, chunk_sizes,
dict_size, level)
return _zstd.ZstdDict(dict_content)
class _UnsupportedCParameter:
def __set_name__(self, _, name):
self.name = name
def __get__(self, *_, **__):
msg = ("%s CParameter not available, zstd version is %s.") % (
self.name,
zstd_version,
)
raise NotImplementedError(msg)
class CParameter(IntEnum):
"""Compression parameters"""
compressionLevel = _zstd._ZSTD_c_compressionLevel
windowLog = _zstd._ZSTD_c_windowLog
hashLog = _zstd._ZSTD_c_hashLog
chainLog = _zstd._ZSTD_c_chainLog
searchLog = _zstd._ZSTD_c_searchLog
minMatch = _zstd._ZSTD_c_minMatch
targetLength = _zstd._ZSTD_c_targetLength
strategy = _zstd._ZSTD_c_strategy
targetCBlockSize = _UnsupportedCParameter()
enableLongDistanceMatching = _zstd._ZSTD_c_enableLongDistanceMatching
ldmHashLog = _zstd._ZSTD_c_ldmHashLog
ldmMinMatch = _zstd._ZSTD_c_ldmMinMatch
ldmBucketSizeLog = _zstd._ZSTD_c_ldmBucketSizeLog
ldmHashRateLog = _zstd._ZSTD_c_ldmHashRateLog
contentSizeFlag = _zstd._ZSTD_c_contentSizeFlag
checksumFlag = _zstd._ZSTD_c_checksumFlag
dictIDFlag = _zstd._ZSTD_c_dictIDFlag
nbWorkers = _zstd._ZSTD_c_nbWorkers
jobSize = _zstd._ZSTD_c_jobSize
overlapLog = _zstd._ZSTD_c_overlapLog
@lru_cache(maxsize=None)
def bounds(self):
"""Return lower and upper bounds of a compression parameter, both inclusive."""
# 1 means compression parameter
return _zstd._get_param_bounds(1, self.value)
class DParameter(IntEnum):
"""Decompression parameters"""
windowLogMax = _zstd._ZSTD_d_windowLogMax
@lru_cache(maxsize=None)
def bounds(self):
"""Return lower and upper bounds of a decompression parameter, both inclusive."""
# 0 means decompression parameter
return _zstd._get_param_bounds(0, self.value)
class Strategy(IntEnum):
"""Compression strategies, listed from fastest to strongest.
Note : new strategies _might_ be added in the future, only the order
(from fast to strong) is guaranteed.
"""
fast = _zstd._ZSTD_fast
dfast = _zstd._ZSTD_dfast
greedy = _zstd._ZSTD_greedy
lazy = _zstd._ZSTD_lazy
lazy2 = _zstd._ZSTD_lazy2
btlazy2 = _zstd._ZSTD_btlazy2
btopt = _zstd._ZSTD_btopt
btultra = _zstd._ZSTD_btultra
btultra2 = _zstd._ZSTD_btultra2
# Set CParameter/DParameter types for validity check
_zstd._set_parameter_types(CParameter, DParameter)
zstd_support_multithread = CParameter.nbWorkers.bounds() != (0, 0)