@@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter {
11361136 streams : new Map ( ) ,
11371137 pendingStreams : new Set ( ) ,
11381138 pendingAck : 0 ,
1139+ shutdownWritableCalled : false ,
11391140 writeQueueSize : 0 ,
11401141 originSet : undefined
11411142 } ;
@@ -1702,6 +1703,26 @@ function afterShutdown(status) {
17021703 stream [ kMaybeDestroy ] ( ) ;
17031704}
17041705
1706+ function shutdownWritable ( callback ) {
1707+ const handle = this [ kHandle ] ;
1708+ if ( ! handle ) return callback ( ) ;
1709+ const state = this [ kState ] ;
1710+ if ( state . shutdownWritableCalled ) {
1711+ // Backport v14.x: Session required for debugging stream object
1712+ // debugStreamObj(this, 'shutdownWritable() already called');
1713+ return callback ( ) ;
1714+ }
1715+ state . shutdownWritableCalled = true ;
1716+
1717+ const req = new ShutdownWrap ( ) ;
1718+ req . oncomplete = afterShutdown ;
1719+ req . callback = callback ;
1720+ req . handle = handle ;
1721+ const err = handle . shutdown ( req ) ;
1722+ if ( err === 1 ) // synchronous finish
1723+ return afterShutdown . call ( req , 0 ) ;
1724+ }
1725+
17051726function finishSendTrailers ( stream , headersList ) {
17061727 // The stream might be destroyed and in that case
17071728 // there is nothing to do.
@@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex {
19621983
19631984 let req ;
19641985
1986+ let waitingForWriteCallback = true ;
1987+ let waitingForEndCheck = true ;
1988+ let writeCallbackErr ;
1989+ let endCheckCallbackErr ;
1990+ const done = ( ) => {
1991+ if ( waitingForEndCheck || waitingForWriteCallback ) return ;
1992+ const err = writeCallbackErr || endCheckCallbackErr ;
1993+ // writeGeneric does not destroy on error and
1994+ // we cannot enable autoDestroy,
1995+ // so make sure to destroy on error.
1996+ if ( err ) {
1997+ this . destroy ( err ) ;
1998+ }
1999+ cb ( err ) ;
2000+ } ;
2001+ const writeCallback = ( err ) => {
2002+ waitingForWriteCallback = false ;
2003+ writeCallbackErr = err ;
2004+ done ( ) ;
2005+ } ;
2006+ const endCheckCallback = ( err ) => {
2007+ waitingForEndCheck = false ;
2008+ endCheckCallbackErr = err ;
2009+ done ( ) ;
2010+ } ;
2011+ // Shutdown write stream right after last chunk is sent
2012+ // so final DATA frame can include END_STREAM flag
2013+ process . nextTick ( ( ) => {
2014+ if ( writeCallbackErr ||
2015+ ! this . _writableState . ending ||
2016+ this . _writableState . buffered . length ||
2017+ ( this [ kState ] . flags & STREAM_FLAGS_HAS_TRAILERS ) )
2018+ return endCheckCallback ( ) ;
2019+ // Backport v14.x: Session required for debugging stream object
2020+ // debugStreamObj(this, 'shutting down writable on last write');
2021+ shutdownWritable . call ( this , endCheckCallback ) ;
2022+ } ) ;
2023+
19652024 if ( writev )
1966- req = writevGeneric ( this , data , cb ) ;
2025+ req = writevGeneric ( this , data , writeCallback ) ;
19672026 else
1968- req = writeGeneric ( this , data , encoding , cb ) ;
2027+ req = writeGeneric ( this , data , encoding , writeCallback ) ;
19692028
19702029 trackWriteState ( this , req . bytes ) ;
19712030 }
@@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex {
19792038 }
19802039
19812040 _final ( cb ) {
1982- const handle = this [ kHandle ] ;
19832041 if ( this . pending ) {
19842042 this . once ( 'ready' , ( ) => this . _final ( cb ) ) ;
1985- } else if ( handle !== undefined ) {
1986- debugStreamObj ( this , '_final shutting down' ) ;
1987- const req = new ShutdownWrap ( ) ;
1988- req . oncomplete = afterShutdown ;
1989- req . callback = cb ;
1990- req . handle = handle ;
1991- const err = handle . shutdown ( req ) ;
1992- if ( err === 1 ) // synchronous finish
1993- return afterShutdown . call ( req , 0 ) ;
1994- } else {
1995- cb ( ) ;
2043+ return ;
19962044 }
2045+ // Backport v14.x: Session required for debugging stream object
2046+ // debugStreamObj(this, 'shutting down writable on _final');
2047+ shutdownWritable . call ( this , cb ) ;
19972048 }
19982049
19992050 _read ( nread ) {
@@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex {
20982149 debugStream ( this [ kID ] || 'pending' , session [ kType ] , 'destroying stream' ) ;
20992150
21002151 const state = this [ kState ] ;
2101- const sessionCode = session [ kState ] . goawayCode ||
2102- session [ kState ] . destroyCode ;
2103- const code = err != null ?
2104- sessionCode || NGHTTP2_INTERNAL_ERROR :
2105- state . rstCode || sessionCode ;
2152+ const sessionState = session [ kState ] ;
2153+ const sessionCode = sessionState . goawayCode || sessionState . destroyCode ;
2154+
2155+ // If a stream has already closed successfully, there is no error
2156+ // to report from this stream, even if the session has errored.
2157+ // This can happen if the stream was already in process of destroying
2158+ // after a successful close, but the session had a error between
2159+ // this stream's close and destroy operations.
2160+ // Previously, this always overrode a successful close operation code
2161+ // NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2162+ const code = ( err != null ?
2163+ ( sessionCode || NGHTTP2_INTERNAL_ERROR ) :
2164+ ( this . closed ? this . rstCode : sessionCode )
2165+ ) ;
21062166 const hasHandle = handle !== undefined ;
21072167
21082168 if ( ! this . closed )
@@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex {
21112171
21122172 if ( hasHandle ) {
21132173 handle . destroy ( ) ;
2114- session [ kState ] . streams . delete ( id ) ;
2174+ sessionState . streams . delete ( id ) ;
21152175 } else {
2116- session [ kState ] . pendingStreams . delete ( this ) ;
2176+ sessionState . pendingStreams . delete ( this ) ;
21172177 }
21182178
21192179 // Adjust the write queue size for accounting
2120- session [ kState ] . writeQueueSize -= state . writeQueueSize ;
2180+ sessionState . writeQueueSize -= state . writeQueueSize ;
21212181 state . writeQueueSize = 0 ;
21222182
21232183 // RST code 8 not emitted as an error as its used by clients to signify
0 commit comments