-
Notifications
You must be signed in to change notification settings - Fork 622
Subscription Channel ref without api version #5131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,18 @@ package subscription | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "go.uber.org/zap" | ||
| corev1 "k8s.io/api/core/v1" | ||
| apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||
| apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" | ||
| "k8s.io/apimachinery/pkg/api/equality" | ||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
| "k8s.io/apimachinery/pkg/api/meta" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/runtime" | ||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/client-go/dynamic" | ||
|
|
||
|
|
@@ -66,6 +71,9 @@ type Reconciler struct { | |
| // DynamicClientSet allows us to configure pluggable Build objects | ||
| dynamicClientSet dynamic.Interface | ||
|
|
||
| // crdLister is used to resolve the ref version | ||
| crdLister apiextensionsv1lister.CustomResourceDefinitionLister | ||
|
|
||
| // listers index properties about resources | ||
| subscriptionLister listers.SubscriptionLister | ||
| channelLister listers.ChannelLister | ||
|
|
@@ -321,11 +329,18 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript | |
| func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) { | ||
| logging.FromContext(ctx).Infow("Getting channel", zap.Any("channel", sub.Spec.Channel)) | ||
|
|
||
| if err := r.resolveRefAPIVersion(&sub.Spec.Channel); err != nil { | ||
| logging.FromContext(ctx).Warnw("failed to resolve the ref", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) | ||
| return nil, err | ||
| } | ||
|
|
||
| logging.FromContext(ctx).Infow("Resolved channel ref", zap.Any("channel", sub.Spec.Channel)) | ||
|
|
||
| // 1. Track the channel pointed by subscription. | ||
| // a. If channel is a Channel.messaging.knative.dev | ||
| obj, err := r.trackAndFetchChannel(ctx, sub, sub.Spec.Channel) | ||
| if err != nil { | ||
| logging.FromContext(ctx).Warnw("failed", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) | ||
| logging.FromContext(ctx).Warnw("failed to track and fetch channel", zap.Any("channel", sub.Spec.Channel), zap.Error(err)) | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -512,3 +527,37 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de | |
| } | ||
| return | ||
| } | ||
|
|
||
| // TODO(slinkydeveloper) this can be refactored in a separate object that takes care of "ref resolution" | ||
| func (r *Reconciler) resolveRefAPIVersion(objRef *corev1.ObjectReference) error { | ||
| if objRef.APIVersion == "" || strings.Contains(objRef.APIVersion, "/") { | ||
| // Either it's Core v1 or the version is specified manually | ||
| return nil | ||
| } | ||
|
|
||
| actualGvk := schema.GroupVersionKind{Group: objRef.APIVersion, Kind: objRef.Kind} | ||
| pluralGvk, _ := meta.UnsafeGuessKindToResource(actualGvk) | ||
| crd, err := r.crdLister.Get(pluralGvk.GroupResource().String()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| actualGvk.Version, err = findCRDStorageVersion(crd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| objRef.SetGroupVersionKind(actualGvk) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // This function runs under the assumption that there must be exactly one "storage" version | ||
| func findCRDStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, error) { | ||
| for _, version := range crd.Spec.Versions { | ||
| if version.Storage { | ||
| return version.Name, nil | ||
| } | ||
| } | ||
| return "", fmt.Errorf("this CRD %s doesn't have a storage version! Kubernetes, you're drunk, go home", crd.Name) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. /hold |
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this will do what you want. What you are trying to do is find the version of the channel that implements the duck type the subscription controller assumes to use, but this is not the way to do it. The stored version means nothing to the duck type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the proper way, if there is any?
From my understanding, the duck type doesn't care neither of the stored version, nor the original version used by the user to create the resource.
To be more specific: here I'm not performing discovery of duck types, nor looking for the duck type version etc. The user gives me a concrete channel kind and api group and I select for her/him the version, that's it. For what is worth, we could even pick just the first or the last version, since which version to pick is not important.
I query the storage version because, at the end of the day, this is the ultimate source of truth for k8s regarding that particular resource and skips some conversions in the middle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proper way is to specify the API version that you want to use. The label on the resource says which duck type if conforms to. Storage version is totally irrelevant, just in this case you happen to know that this version conforms to a duck type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea in this PR:
When the controller sees that there's no version in the "API version" field, it goes and checks the versions of this resource. It transiently sets the storage version of the resource on the reference. So, in memory, it becomes like this:
And then, it fetches the channel as
messaging.knative.dev/v1alpha1KafkaChannel.In KafkaChannel, we have
v1alpha1API supportingv1alpha1subscribable duck and we havev1beta1API supportingv1subscribable duck.When it fetches the channel as
messaging.knative.dev/v1alpha1, doesn't it getv1alpha1in the duck annotation? And then based on that the subscriber reconciler would operate as if this channel ref is a ref to a channel that's supportingv1alpha1duck type.This is why the storage version is used. If there's no version is specified, it just falls back to storage version to be able to operate with that resource.
Nope, just find an arbitrary version (in this case, getting the storage version makes the reasonable arbitrary version to use) and then let the subscription controller take it from there. It can handle whatever duck version is specified in the resource fetched as storage version.