Conversation
da7ae15 to
835cfcd
Compare
835cfcd to
1c3171b
Compare
|
Some build errors ... |
client/image_pull.go
Outdated
| } | ||
| } | ||
| } | ||
| type ImagePullResponse jsonmessage.Stream |
There was a problem hiding this comment.
Perhaps we could make both pull and push responses an interface like the below and keep the actual Stream internal?
type ImagePullResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[JSONMessage, error]
}There was a problem hiding this comment.
Agree, code is cleaner and this allows to use return nil, err vs allocating an empty Response
1c3171b to
b53dacd
Compare
| // Read implements io.ReadCloser | ||
| func (r stream) Read(p []byte) (n int, err error) { | ||
| return r.rc.Read(p) | ||
| } | ||
|
|
||
| // Close implements io.ReadCloser | ||
| func (r stream) Close() error { | ||
| if r.close == nil { | ||
| return nil | ||
| } | ||
| var err error | ||
| r.close.Do(func() { | ||
| if r.rc != nil { | ||
| err = r.rc.Close() | ||
| } | ||
| }) | ||
| return err | ||
| } |
There was a problem hiding this comment.
FWIW; I made some small changes in the other implementation; if we keep them separate, might be good to align;
There was a problem hiding this comment.
Oh, never mind I see they're the same now, so just needs a rebase (sorry!)
There was a problem hiding this comment.
yes I've seen those and using the updated codebase
21eb60d to
4d4c0fe
Compare
| func NewJSONMessageStream(rc io.ReadCloser) stream { | ||
| if rc == nil { | ||
| panic("nil io.ReadCloser") | ||
| } | ||
| return stream{ | ||
| rc: rc, | ||
| close: sync.OnceValue(rc.Close), | ||
| } | ||
| } | ||
|
|
||
| type stream struct { |
There was a problem hiding this comment.
Considering if generics would be a good fit for this; we could have a constructor that returns a stream with a specific type as iterator.
This would allow us to have ImagePush, ImagePull, ImageBuild (and other streams) to define their own concrete type of records returned in the stream instead of the generic JSONMessage, which ... looks to be a bit of a "catch all" type that can handle responses from either (and other fields are just "unused").
Here's a quick writeup that ChatGPT did (
package internal
import (
"context"
"encoding/json"
"errors"
"io"
"iter"
"sync"
"github.com/moby/moby/client/pkg/jsonmessage"
)
// MessageStream is the generic interface for a streaming JSON decoder.
type MessageStream[T any] interface {
io.ReadCloser
// JSONMessages decodes the response stream as a sequence of T.
// If the stream ends or the context is canceled, the underlying io.Reader is closed.
JSONMessages(ctx context.Context) iter.Seq2[T, error]
}
// Decoder is pluggable per T (defaults to json.Decoder.Decode).
type Decoder[T any] func(*json.Decoder, *T) error
// NewMessageStream constructs a typed stream that yields values of T.
// If dec is nil, json.Decoder.Decode is used.
func NewMessageStream[T any](rc io.ReadCloser, dec Decoder[T]) MessageStream[T] {
if rc == nil {
panic("nil io.ReadCloser")
}
if dec == nil {
dec = func(d *json.Decoder, out *T) error { return d.Decode(out) }
}
return &stream[T]{
rc: rc,
close: sync.OnceValue(rc.Close),
dec: dec,
}
}
type stream[T any] struct {
rc io.ReadCloser
close func() error
dec Decoder[T]
}
// Read implements io.Reader.
func (r *stream[T]) Read(p []byte) (int, error) {
if r.rc == nil {
return 0, io.EOF
}
return r.rc.Read(p)
}
// Close implements io.Closer.
func (r *stream[T]) Close() error {
if r.close == nil {
return nil
}
return r.close()
}
// JSONMessages implements MessageStream[T].
func (r *stream[T]) JSONMessages(ctx context.Context) iter.Seq2[T, error] {
context.AfterFunc(ctx, func() { _ = r.Close() })
dec := json.NewDecoder(r)
return func(yield func(T, error) bool) {
defer r.Close()
for {
var msg T
err := r.dec(dec, &msg)
if errors.Is(err, io.EOF) {
break
}
// Prefer to surface context cancellation promptly.
if ctx.Err() != nil {
// If decoding succeeded before noticing ctx cancel, we still deliver msg with ctx error.
// If decoding failed for another reason, msg is zero-value and err is ignored in favor of ctx error.
if !yield(msg, ctx.Err()) {
return
}
return
}
if !yield(msg, err) {
return
}
}
}
}Then called as this
// Operation-specific generic response types.
type ImagePullResponse[T any] interface {
MessageStream[T]
}
type ImagePushResponse[T any] interface {
MessageStream[T]
}
// Concrete message shapes per operation.
// You can start with jsonmessage.JSONMessage and evolve independently later.
type PullMessage = jsonmessage.JSONMessage
type PushMessage = jsonmessage.JSONMessage
// Constructors that pick the concrete T at the call site:
func NewImagePullResponse(rc io.ReadCloser) ImagePullResponse[PullMessage] {
return NewMessageStream[PullMessage](rc, nil)
}
func NewImagePushResponse(rc io.ReadCloser) ImagePushResponse[PushMessage] {
return NewMessageStream[PushMessage](rc, nil)
}Although I think the ImagePullResponse ImagePushResponse may not necessarily need generics; we could make those concrete 🤔
type PushMessage jsonmessage.JSONMessage
type ImagePushResponse interface {
io.ReadCloser
JSONMessages(ctx context.Context) iter.Seq2[PushMessage, error]
}There was a problem hiding this comment.
Slightly simplified one without passing a custom decoder;
package internal
import (
"context"
"encoding/json"
"errors"
"io"
"iter"
"sync"
)
// Generic stream implementation.
type stream[T any] struct {
rc io.ReadCloser
close func() error
}
// NewMessageStream constructs a typed stream that yields values of T.
func NewMessageStream[T any](rc io.ReadCloser) *stream[T] {
if rc == nil {
panic("nil io.ReadCloser")
}
return &stream[T]{
rc: rc,
close: sync.OnceValue(rc.Close),
}
}
// Read implements io.Reader.
func (r *stream[T]) Read(p []byte) (int, error) {
if r.rc == nil {
return 0, io.EOF
}
return r.rc.Read(p)
}
// Close implements io.Closer.
func (r *stream[T]) Close() error {
if r.close == nil {
return nil
}
return r.close()
}
// JSONMessages decodes the response stream as a sequence of T.
// If the stream ends or the context is canceled, the underlying reader is closed.
func (r *stream[T]) JSONMessages(ctx context.Context) iter.Seq2[T, error] {
context.AfterFunc(ctx, func() { _ = r.Close() })
dec := json.NewDecoder(r)
return func(yield func(T, error) bool) {
defer r.Close()
for {
var msg T
err := dec.Decode(&msg)
if errors.Is(err, io.EOF) {
break
}
if ctx.Err() != nil {
_ = yield(msg, ctx.Err())
return
}
if !yield(msg, err) {
return
}
}
}
}There was a problem hiding this comment.
I also wanted to give generics a try, especially considering Aux raw message handling which is a pain for API consumer.
There was a problem hiding this comment.
So, this is roughly what I came to when looking at the whole "JSONMessage" (and related) bits;
- The JSONMessage package was created to handle streaming responses from the API (push, pull)
- Likely out of convenience, the package was re-used when other "streaming" endpoints were added, for example
/build, and/events - For those endpoints
JSONMessagewas not (or "no longer") the actual type used by the daemon! - But it was easy to just add some new fields to the
JSONMessagestruct, addomitempty, and make it work with various responses from the API (docker push, docker pull, events) - Other fields were added as well, and the
auxfield was added to be more flexible; this allowed for ANY type to be included in the response, as long as the code callingJSONMessageadded a custom function to unmarshal them.
But it all seems to be out of convenience, or "because that's how it was done, so that's probably the right way" and 🎉 it didn't require making changes in the API definitions... because there was no definition for the JSONMessage in the API. And fun fact is that the JSONMessage describes that it's used for events, but .. I think that's the only response NOT using it; #51156 (comment)
So, yes, effectively, the only thing these endpoints have in common is that they CAN be unmarshaled using JSONMessage, just because it has fields that overlap with the actual type (effectively, it almost could've been a "catch all" map[string]any).
The only advantage is that the JSONMessage utilities happen to have the logic (error handling, etc.), and it was good to share that logic, and some of them have in common that they have a progress-bar attached. But for others the JSONMessage is almost entirely useless, and just used as an "envelope" to shove an aux message in, where the aux message is the ACTUAL type returned.
There was a problem hiding this comment.
So instead, I think we should ideally look at defining concrete types where possible; i.e. don't use the generic "JSONMessage" (and "cherry-pick" the fields that are used), but have types for each endpoint. I don't think generics are a good choice for everything, but in this case, it may be; because there's some amount of logic involved to handle the responses, and it's "enough" to prefer having a shared implementation for those parts.
And I think with generics we can have the best of both worlds here;
- Share the common implementation for handling the streams (which this PR started doing, possibly there's more w.r.t. some of the presentation logic)
- But have concrete types for endpoints; with that, possibly the auxMessage isn't even needed, and can become a concrete type for endpoints that use them.
We can still have an auxMessage field defined for arbitrary data that's not (yet) part of the API contract, but more out of convenience.
4d4c0fe to
1923048
Compare
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
1923048 to
2d1429c
Compare
thaJeztah
left a comment
There was a problem hiding this comment.
LGTM
lets bring this one in, then look at the other PR to see if we can improve further.
- What I did
follow-up for #50935
introduce ImagePushResponse with utility method
JSONMessagesto manage json messages stream- How I did it
made
ImagePullResponsea general purpose type asjsonmessage.Streamtype to be used as bothImagePullResponseandImagePushResponse(avoid code duplication)- How to verify it
- Human readable description for the release notes
- A picture of a cute animal (not mandatory but encouraged)
