Skip to content

introduce ImagePushResponse#51148

Merged
thaJeztah merged 1 commit intomoby:masterfrom
ndeloof:decode-JSONMessage-push
Oct 21, 2025
Merged

introduce ImagePushResponse#51148
thaJeztah merged 1 commit intomoby:masterfrom
ndeloof:decode-JSONMessage-push

Conversation

@ndeloof
Copy link
Contributor

@ndeloof ndeloof commented Oct 9, 2025

- What I did

follow-up for #50935
introduce ImagePushResponse with utility method JSONMessages to manage json messages stream

- How I did it

made ImagePullResponse a general purpose type as jsonmessage.Stream type to be used as both ImagePullResponse and ImagePushResponse (avoid code duplication)

- How to verify it

- Human readable description for the release notes

`ImagePush` now returns an object with `JSONMessages` method returning iterator over the message objects.

- A picture of a cute animal (not mandatory but encouraged)
image

@robmry
Copy link
Contributor

robmry commented Oct 9, 2025

Some build errors ...

client/client.go:109:19: cannot use &Client{} (value of type *Client) as APIClient value in variable declaration: *Client does not implement APIClient (wrong type for method ImagePush)
		have ImagePush(context.Context, string, ImagePushOptions) (ImagePushResponse, error)
		want ImagePush(context.Context, string, ImagePushOptions) (io.ReadCloser, error)

client/image_pull_test.go:102:26: cannot use resp (variable of struct type ImagePullResponse) as io.Reader value in argument to io.ReadAll: ImagePullResponse does not implement io.Reader (missing method Read)

client/image_pull_test.go:193:28: cannot use resp (variable of struct type ImagePullResponse) as io.Reader value in argument to io.ReadAll: ImagePullResponse does not implement io.Reader (missing method Read)

client/image_pull_test.go:202:14: undefined: newImagePullResponse

client/image_push_test.go:103:26: cannot use resp (variable of struct type ImagePushResponse) as io.Reader value in argument to io.ReadAll: ImagePushResponse does not implement io.Reader (missing method Read)

client/image_push_test.go:191:28: cannot use resp (variable of struct type ImagePushResponse) as io.Reader value in argument to io.ReadAll: ImagePushResponse does not implement io.Reader (missing method Read)

}
}
}
type ImagePullResponse jsonmessage.Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could make both pull and push responses an interface like the below and keep the actual Stream internal?

type ImagePullResponse interface {
    io.ReadCloser
   JSONMessages(ctx context.Context) iter.Seq2[JSONMessage, error]
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, code is cleaner and this allows to use return nil, err vs allocating an empty Response

@ndeloof ndeloof force-pushed the decode-JSONMessage-push branch from 1c3171b to b53dacd Compare October 10, 2025 06:01
@thaJeztah thaJeztah added this to the 29.0.0 milestone Oct 10, 2025
@thaJeztah thaJeztah added impact/changelog area/go-sdk impact/go-sdk Noteworthy (compatibility changes) in the Go SDK status/2-code-review labels Oct 10, 2025
Comment on lines +29 to +46
// Read implements io.ReadCloser
func (r stream) Read(p []byte) (n int, err error) {
return r.rc.Read(p)
}

// Close implements io.ReadCloser
func (r stream) Close() error {
if r.close == nil {
return nil
}
var err error
r.close.Do(func() {
if r.rc != nil {
err = r.rc.Close()
}
})
return err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW; I made some small changes in the other implementation; if we keep them separate, might be good to align;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, never mind I see they're the same now, so just needs a rebase (sorry!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I've seen those and using the updated codebase

@ndeloof ndeloof force-pushed the decode-JSONMessage-push branch 3 times, most recently from 21eb60d to 4d4c0fe Compare October 10, 2025 12:37
@thaJeztah thaJeztah added the kind/enhancement Enhancements are not bugs or new features but can improve usability or performance. label Oct 10, 2025
Comment on lines +14 to +24
func NewJSONMessageStream(rc io.ReadCloser) stream {
if rc == nil {
panic("nil io.ReadCloser")
}
return stream{
rc: rc,
close: sync.OnceValue(rc.Close),
}
}

type stream struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering if generics would be a good fit for this; we could have a constructor that returns a stream with a specific type as iterator.

This would allow us to have ImagePush, ImagePull, ImageBuild (and other streams) to define their own concrete type of records returned in the stream instead of the generic JSONMessage, which ... looks to be a bit of a "catch all" type that can handle responses from either (and other fields are just "unused").

Here's a quick writeup that ChatGPT did (⚠️ haven't tried it yet, but may give this approach a go Tomorrow, or later Today);

package internal

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"iter"
	"sync"

	"github.com/moby/moby/client/pkg/jsonmessage"
)

// MessageStream is the generic interface for a streaming JSON decoder.
type MessageStream[T any] interface {
	io.ReadCloser
	// JSONMessages decodes the response stream as a sequence of T.
	// If the stream ends or the context is canceled, the underlying io.Reader is closed.
	JSONMessages(ctx context.Context) iter.Seq2[T, error]
}

// Decoder is pluggable per T (defaults to json.Decoder.Decode).
type Decoder[T any] func(*json.Decoder, *T) error

// NewMessageStream constructs a typed stream that yields values of T.
// If dec is nil, json.Decoder.Decode is used.
func NewMessageStream[T any](rc io.ReadCloser, dec Decoder[T]) MessageStream[T] {
	if rc == nil {
		panic("nil io.ReadCloser")
	}
	if dec == nil {
		dec = func(d *json.Decoder, out *T) error { return d.Decode(out) }
	}
	return &stream[T]{
		rc:    rc,
		close: sync.OnceValue(rc.Close),
		dec:   dec,
	}
}

type stream[T any] struct {
	rc    io.ReadCloser
	close func() error
	dec   Decoder[T]
}

// Read implements io.Reader.
func (r *stream[T]) Read(p []byte) (int, error) {
	if r.rc == nil {
		return 0, io.EOF
	}
	return r.rc.Read(p)
}

// Close implements io.Closer.
func (r *stream[T]) Close() error {
	if r.close == nil {
		return nil
	}
	return r.close()
}

// JSONMessages implements MessageStream[T].
func (r *stream[T]) JSONMessages(ctx context.Context) iter.Seq2[T, error] {
	context.AfterFunc(ctx, func() { _ = r.Close() })
	dec := json.NewDecoder(r)

	return func(yield func(T, error) bool) {
		defer r.Close()
		for {
			var msg T
			err := r.dec(dec, &msg)
			if errors.Is(err, io.EOF) {
				break
			}
			// Prefer to surface context cancellation promptly.
			if ctx.Err() != nil {
				// If decoding succeeded before noticing ctx cancel, we still deliver msg with ctx error.
				// If decoding failed for another reason, msg is zero-value and err is ignored in favor of ctx error.
				if !yield(msg, ctx.Err()) {
					return
				}
				return
			}
			if !yield(msg, err) {
				return
			}
		}
	}
}

Then called as this

// Operation-specific generic response types.
type ImagePullResponse[T any] interface {
	MessageStream[T]
}
type ImagePushResponse[T any] interface {
	MessageStream[T]
}

// Concrete message shapes per operation.
// You can start with jsonmessage.JSONMessage and evolve independently later.
type PullMessage = jsonmessage.JSONMessage
type PushMessage = jsonmessage.JSONMessage

// Constructors that pick the concrete T at the call site:
func NewImagePullResponse(rc io.ReadCloser) ImagePullResponse[PullMessage] {
	return NewMessageStream[PullMessage](rc, nil)
}
func NewImagePushResponse(rc io.ReadCloser) ImagePushResponse[PushMessage] {
	return NewMessageStream[PushMessage](rc, nil)
}

Although I think the ImagePullResponse ImagePushResponse may not necessarily need generics; we could make those concrete 🤔

type PushMessage jsonmessage.JSONMessage 

type ImagePushResponse interface { 
    io.ReadCloser
    JSONMessages(ctx context.Context) iter.Seq2[PushMessage, error]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly simplified one without passing a custom decoder;

package internal

import (
	"context"
	"encoding/json"
	"errors"
	"io"
	"iter"
	"sync"
)

// Generic stream implementation.
type stream[T any] struct {
	rc    io.ReadCloser
	close func() error
}

// NewMessageStream constructs a typed stream that yields values of T.
func NewMessageStream[T any](rc io.ReadCloser) *stream[T] {
	if rc == nil {
		panic("nil io.ReadCloser")
	}
	return &stream[T]{
		rc:    rc,
		close: sync.OnceValue(rc.Close),
	}
}

// Read implements io.Reader.
func (r *stream[T]) Read(p []byte) (int, error) {
	if r.rc == nil {
		return 0, io.EOF
	}
	return r.rc.Read(p)
}

// Close implements io.Closer.
func (r *stream[T]) Close() error {
	if r.close == nil {
		return nil
	}
	return r.close()
}

// JSONMessages decodes the response stream as a sequence of T.
// If the stream ends or the context is canceled, the underlying reader is closed.
func (r *stream[T]) JSONMessages(ctx context.Context) iter.Seq2[T, error] {
	context.AfterFunc(ctx, func() { _ = r.Close() })
	dec := json.NewDecoder(r)

	return func(yield func(T, error) bool) {
		defer r.Close()
		for {
			var msg T
			err := dec.Decode(&msg)
			if errors.Is(err, io.EOF) {
				break
			}
			if ctx.Err() != nil {
				_ = yield(msg, ctx.Err())
				return
			}
			if !yield(msg, err) {
				return
			}
		}
	}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying that in #51170

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wanted to give generics a try, especially considering Aux raw message handling which is a pain for API consumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is roughly what I came to when looking at the whole "JSONMessage" (and related) bits;

  • The JSONMessage package was created to handle streaming responses from the API (push, pull)
  • Likely out of convenience, the package was re-used when other "streaming" endpoints were added, for example /build, and /events
  • For those endpoints JSONMessage was not (or "no longer") the actual type used by the daemon!
  • But it was easy to just add some new fields to the JSONMessage struct, add omitempty, and make it work with various responses from the API (docker push, docker pull, events)
  • Other fields were added as well, and the aux field was added to be more flexible; this allowed for ANY type to be included in the response, as long as the code calling JSONMessage added a custom function to unmarshal them.

But it all seems to be out of convenience, or "because that's how it was done, so that's probably the right way" and 🎉 it didn't require making changes in the API definitions... because there was no definition for the JSONMessage in the API. And fun fact is that the JSONMessage describes that it's used for events, but .. I think that's the only response NOT using it; #51156 (comment)

So, yes, effectively, the only thing these endpoints have in common is that they CAN be unmarshaled using JSONMessage, just because it has fields that overlap with the actual type (effectively, it almost could've been a "catch all" map[string]any).
The only advantage is that the JSONMessage utilities happen to have the logic (error handling, etc.), and it was good to share that logic, and some of them have in common that they have a progress-bar attached. But for others the JSONMessage is almost entirely useless, and just used as an "envelope" to shove an aux message in, where the aux message is the ACTUAL type returned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead, I think we should ideally look at defining concrete types where possible; i.e. don't use the generic "JSONMessage" (and "cherry-pick" the fields that are used), but have types for each endpoint. I don't think generics are a good choice for everything, but in this case, it may be; because there's some amount of logic involved to handle the responses, and it's "enough" to prefer having a shared implementation for those parts.

And I think with generics we can have the best of both worlds here;

  • Share the common implementation for handling the streams (which this PR started doing, possibly there's more w.r.t. some of the presentation logic)
  • But have concrete types for endpoints; with that, possibly the auxMessage isn't even needed, and can become a concrete type for endpoints that use them.

We can still have an auxMessage field defined for arbitrary data that's not (yet) part of the API contract, but more out of convenience.

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
@ndeloof ndeloof force-pushed the decode-JSONMessage-push branch from 1923048 to 2d1429c Compare October 16, 2025 12:43
@thaJeztah thaJeztah added the release-blocker PRs we want to block a release on label Oct 20, 2025
Copy link
Member

@thaJeztah thaJeztah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

lets bring this one in, then look at the other PR to see if we can improve further.

@thaJeztah thaJeztah merged commit 8c1a909 into moby:master Oct 21, 2025
317 of 320 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/dependencies area/go-sdk area/images Image Distribution impact/changelog impact/go-sdk Noteworthy (compatibility changes) in the Go SDK kind/enhancement Enhancements are not bugs or new features but can improve usability or performance. module/client release-blocker PRs we want to block a release on status/2-code-review

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants