33__all__ = ['StreamReader' , 'StreamWriter' , 'StreamReaderProtocol' ,
44 'open_connection' , 'start_server' ,
55 'IncompleteReadError' ,
6+ 'LimitOverrunError' ,
67 ]
78
89import socket
@@ -27,15 +28,28 @@ class IncompleteReadError(EOFError):
2728 Incomplete read error. Attributes:
2829
2930 - partial: read bytes string before the end of stream was reached
30- - expected: total number of expected bytes
31+ - expected: total number of expected bytes (or None if unknown)
3132 """
3233 def __init__ (self , partial , expected ):
33- EOFError .__init__ (self , "%s bytes read on a total of %s expected bytes"
34- % (len (partial ), expected ))
34+ super () .__init__ ("%d bytes read on a total of %r expected bytes"
35+ % (len (partial ), expected ))
3536 self .partial = partial
3637 self .expected = expected
3738
3839
40+ class LimitOverrunError (Exception ):
41+ """Reached buffer limit while looking for the separator.
42+
43+ Attributes:
44+ - message: error message
45+ - consumed: total number of bytes that should be consumed
46+ """
47+ def __init__ (self , message , consumed ):
48+ super ().__init__ (message )
49+ self .message = message
50+ self .consumed = consumed
51+
52+
3953@coroutine
4054def open_connection (host = None , port = None , * ,
4155 loop = None , limit = _DEFAULT_LIMIT , ** kwds ):
@@ -318,6 +332,10 @@ class StreamReader:
318332 def __init__ (self , limit = _DEFAULT_LIMIT , loop = None ):
319333 # The line length limit is a security feature;
320334 # it also doubles as half the buffer limit.
335+
336+ if limit <= 0 :
337+ raise ValueError ('Limit cannot be <= 0' )
338+
321339 self ._limit = limit
322340 if loop is None :
323341 self ._loop = events .get_event_loop ()
@@ -361,7 +379,7 @@ def set_exception(self, exc):
361379 waiter .set_exception (exc )
362380
363381 def _wakeup_waiter (self ):
364- """Wakeup read() or readline() function waiting for data or EOF."""
382+ """Wakeup read* () functions waiting for data or EOF."""
365383 waiter = self ._waiter
366384 if waiter is not None :
367385 self ._waiter = None
@@ -409,7 +427,10 @@ def feed_data(self, data):
409427
410428 @coroutine
411429 def _wait_for_data (self , func_name ):
412- """Wait until feed_data() or feed_eof() is called."""
430+ """Wait until feed_data() or feed_eof() is called.
431+
432+ If stream was paused, automatically resume it.
433+ """
413434 # StreamReader uses a future to link the protocol feed_data() method
414435 # to a read coroutine. Running two read coroutines at the same time
415436 # would have an unexpected behaviour. It would not possible to know
@@ -418,6 +439,13 @@ def _wait_for_data(self, func_name):
418439 raise RuntimeError ('%s() called while another coroutine is '
419440 'already waiting for incoming data' % func_name )
420441
442+ assert not self ._eof , '_wait_for_data after EOF'
443+
444+ # Waiting for data while paused will make deadlock, so prevent it.
445+ if self ._paused :
446+ self ._paused = False
447+ self ._transport .resume_reading ()
448+
421449 self ._waiter = futures .Future (loop = self ._loop )
422450 try :
423451 yield from self ._waiter
@@ -426,43 +454,150 @@ def _wait_for_data(self, func_name):
426454
427455 @coroutine
428456 def readline (self ):
457+ """Read chunk of data from the stream until newline (b'\n ') is found.
458+
459+ On success, return chunk that ends with newline. If only partial
460+ line can be read due to EOF, return incomplete line without
461+ terminating newline. When EOF was reached while no bytes read, empty
462+ bytes object is returned.
463+
464+ If limit is reached, ValueError will be raised. In that case, if
465+ newline was found, complete line including newline will be removed
466+ from internal buffer. Else, internal buffer will be cleared. Limit is
467+ compared against part of the line without newline.
468+
469+ If stream was paused, this function will automatically resume it if
470+ needed.
471+ """
472+ sep = b'\n '
473+ seplen = len (sep )
474+ try :
475+ line = yield from self .readuntil (sep )
476+ except IncompleteReadError as e :
477+ return e .partial
478+ except LimitOverrunError as e :
479+ if self ._buffer .startswith (sep , e .consumed ):
480+ del self ._buffer [:e .consumed + seplen ]
481+ else :
482+ self ._buffer .clear ()
483+ self ._maybe_resume_transport ()
484+ raise ValueError (e .args [0 ])
485+ return line
486+
487+ @coroutine
488+ def readuntil (self , separator = b'\n ' ):
489+ """Read chunk of data from the stream until `separator` is found.
490+
491+ On success, chunk and its separator will be removed from internal buffer
492+ (i.e. consumed). Returned chunk will include separator at the end.
493+
494+ Configured stream limit is used to check result. Limit means maximal
495+ length of chunk that can be returned, not counting the separator.
496+
497+ If EOF occurs and complete separator still not found,
498+ IncompleteReadError(<partial data>, None) will be raised and internal
499+ buffer becomes empty. This partial data may contain a partial separator.
500+
501+ If chunk cannot be read due to overlimit, LimitOverrunError will be raised
502+ and data will be left in internal buffer, so it can be read again, in
503+ some different way.
504+
505+ If stream was paused, this function will automatically resume it if
506+ needed.
507+ """
508+ seplen = len (separator )
509+ if seplen == 0 :
510+ raise ValueError ('Separator should be at least one-byte string' )
511+
429512 if self ._exception is not None :
430513 raise self ._exception
431514
432- line = bytearray ()
433- not_enough = True
434-
435- while not_enough :
436- while self ._buffer and not_enough :
437- ichar = self ._buffer .find (b'\n ' )
438- if ichar < 0 :
439- line .extend (self ._buffer )
440- self ._buffer .clear ()
441- else :
442- ichar += 1
443- line .extend (self ._buffer [:ichar ])
444- del self ._buffer [:ichar ]
445- not_enough = False
446-
447- if len (line ) > self ._limit :
448- self ._maybe_resume_transport ()
449- raise ValueError ('Line is too long' )
515+ # Consume whole buffer except last bytes, which length is
516+ # one less than seplen. Let's check corner cases with
517+ # separator='SEPARATOR':
518+ # * we have received almost complete separator (without last
519+ # byte). i.e buffer='some textSEPARATO'. In this case we
520+ # can safely consume len(separator) - 1 bytes.
521+ # * last byte of buffer is first byte of separator, i.e.
522+ # buffer='abcdefghijklmnopqrS'. We may safely consume
523+ # everything except that last byte, but this require to
524+ # analyze bytes of buffer that match partial separator.
525+ # This is slow and/or require FSM. For this case our
526+ # implementation is not optimal, since require rescanning
527+ # of data that is known to not belong to separator. In
528+ # real world, separator will not be so long to notice
529+ # performance problems. Even when reading MIME-encoded
530+ # messages :)
531+
532+ # `offset` is the number of bytes from the beginning of the buffer where
533+ # is no occurrence of `separator`.
534+ offset = 0
535+
536+ # Loop until we find `separator` in the buffer, exceed the buffer size,
537+ # or an EOF has happened.
538+ while True :
539+ buflen = len (self ._buffer )
540+
541+ # Check if we now have enough data in the buffer for `separator` to
542+ # fit.
543+ if buflen - offset >= seplen :
544+ isep = self ._buffer .find (separator , offset )
545+
546+ if isep != - 1 :
547+ # `separator` is in the buffer. `isep` will be used later to
548+ # retrieve the data.
549+ break
550+
551+ # see upper comment for explanation.
552+ offset = buflen + 1 - seplen
553+ if offset > self ._limit :
554+ raise LimitOverrunError ('Separator is not found, and chunk exceed the limit' , offset )
450555
556+ # Complete message (with full separator) may be present in buffer
557+ # even when EOF flag is set. This may happen when the last chunk
558+ # adds data which makes separator be found. That's why we check for
559+ # EOF *ater* inspecting the buffer.
451560 if self ._eof :
452- break
561+ chunk = bytes (self ._buffer )
562+ self ._buffer .clear ()
563+ raise IncompleteReadError (chunk , None )
564+
565+ # _wait_for_data() will resume reading if stream was paused.
566+ yield from self ._wait_for_data ('readuntil' )
453567
454- if not_enough :
455- yield from self . _wait_for_data ( 'readline' )
568+ if isep > self . _limit :
569+ raise LimitOverrunError ( 'Separator is found, but chunk is longer than limit' , isep )
456570
571+ chunk = self ._buffer [:isep + seplen ]
572+ del self ._buffer [:isep + seplen ]
457573 self ._maybe_resume_transport ()
458- return bytes (line )
574+ return bytes (chunk )
459575
460576 @coroutine
461577 def read (self , n = - 1 ):
578+ """Read up to `n` bytes from the stream.
579+
580+ If n is not provided, or set to -1, read until EOF and return all read
581+ bytes. If the EOF was received and the internal buffer is empty, return
582+ an empty bytes object.
583+
584+ If n is zero, return empty bytes object immediatelly.
585+
586+ If n is positive, this function try to read `n` bytes, and may return
587+ less or equal bytes than requested, but at least one byte. If EOF was
588+ received before any byte is read, this function returns empty byte
589+ object.
590+
591+ Returned value is not limited with limit, configured at stream creation.
592+
593+ If stream was paused, this function will automatically resume it if
594+ needed.
595+ """
596+
462597 if self ._exception is not None :
463598 raise self ._exception
464599
465- if not n :
600+ if n == 0 :
466601 return b''
467602
468603 if n < 0 :
@@ -477,29 +612,41 @@ def read(self, n=-1):
477612 break
478613 blocks .append (block )
479614 return b'' .join (blocks )
480- else :
481- if not self ._buffer and not self ._eof :
482- yield from self ._wait_for_data ('read' )
483615
484- if n < 0 or len (self ._buffer ) <= n :
485- data = bytes (self ._buffer )
486- self ._buffer .clear ()
487- else :
488- # n > 0 and len(self._buffer) > n
489- data = bytes (self ._buffer [:n ])
490- del self ._buffer [:n ]
616+ if not self ._buffer and not self ._eof :
617+ yield from self ._wait_for_data ('read' )
618+
619+ # This will work right even if buffer is less than n bytes
620+ data = bytes (self ._buffer [:n ])
621+ del self ._buffer [:n ]
491622
492623 self ._maybe_resume_transport ()
493624 return data
494625
495626 @coroutine
496627 def readexactly (self , n ):
628+ """Read exactly `n` bytes.
629+
630+ Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
631+ read. The `IncompleteReadError.partial` attribute of the exception will
632+ contain the partial read bytes.
633+
634+ if n is zero, return empty bytes object.
635+
636+ Returned value is not limited with limit, configured at stream creation.
637+
638+ If stream was paused, this function will automatically resume it if
639+ needed.
640+ """
497641 if n < 0 :
498642 raise ValueError ('readexactly size can not be less than zero' )
499643
500644 if self ._exception is not None :
501645 raise self ._exception
502646
647+ if n == 0 :
648+ return b''
649+
503650 # There used to be "optimized" code here. It created its own
504651 # Future and waited until self._buffer had at least the n
505652 # bytes, then called read(n). Unfortunately, this could pause
@@ -516,6 +663,8 @@ def readexactly(self, n):
516663 blocks .append (block )
517664 n -= len (block )
518665
666+ assert n == 0
667+
519668 return b'' .join (blocks )
520669
521670 if compat .PY35 :
0 commit comments