Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/reconciler/subscription/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package subscription
import (
"context"

"knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -46,6 +47,7 @@ func NewController(

r := &Reconciler{
dynamicClientSet: dynamicclient.Get(ctx),
crdLister: customresourcedefinition.Get(ctx).Lister(),
subscriptionLister: subscriptionInformer.Lister(),
channelLister: channelInformer.Lister(),
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/subscription/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
_ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"

_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
)

func TestNew(t *testing.T) {
Expand Down
51 changes: 50 additions & 1 deletion pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package subscription
import (
"context"
"fmt"
"strings"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"

Expand Down Expand Up @@ -66,6 +71,9 @@ type Reconciler struct {
// DynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

// crdLister is used to resolve the ref version
crdLister apiextensionsv1lister.CustomResourceDefinitionLister

// listers index properties about resources
subscriptionLister listers.SubscriptionLister
channelLister listers.ChannelLister
Expand Down Expand Up @@ -321,11 +329,18 @@ func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscript
func (r *Reconciler) getChannel(ctx context.Context, sub *v1.Subscription) (*eventingduckv1.Channelable, pkgreconciler.Event) {
logging.FromContext(ctx).Infow("Getting channel", zap.Any("channel", sub.Spec.Channel))

if err := r.resolveRefAPIVersion(&sub.Spec.Channel); err != nil {
logging.FromContext(ctx).Warnw("failed to resolve the ref", zap.Any("channel", sub.Spec.Channel), zap.Error(err))
return nil, err
}

logging.FromContext(ctx).Infow("Resolved channel ref", zap.Any("channel", sub.Spec.Channel))

// 1. Track the channel pointed by subscription.
// a. If channel is a Channel.messaging.knative.dev
obj, err := r.trackAndFetchChannel(ctx, sub, sub.Spec.Channel)
if err != nil {
logging.FromContext(ctx).Warnw("failed", zap.Any("channel", sub.Spec.Channel), zap.Error(err))
logging.FromContext(ctx).Warnw("failed to track and fetch channel", zap.Any("channel", sub.Spec.Channel), zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -512,3 +527,37 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de
}
return
}

// TODO(slinkydeveloper) this can be refactored in a separate object that takes care of "ref resolution"
func (r *Reconciler) resolveRefAPIVersion(objRef *corev1.ObjectReference) error {
if objRef.APIVersion == "" || strings.Contains(objRef.APIVersion, "/") {
// Either it's Core v1 or the version is specified manually
return nil
}

actualGvk := schema.GroupVersionKind{Group: objRef.APIVersion, Kind: objRef.Kind}
pluralGvk, _ := meta.UnsafeGuessKindToResource(actualGvk)
crd, err := r.crdLister.Get(pluralGvk.GroupResource().String())
if err != nil {
return err
}

actualGvk.Version, err = findCRDStorageVersion(crd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this will do what you want. What you are trying to do is find the version of the channel that implements the duck type the subscription controller assumes to use, but this is not the way to do it. The stored version means nothing to the duck type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are trying to do is find the version of the channel that implements the duck type the subscription controller assumes to use, but this is not the way to do it

What is the proper way, if there is any?

The stored version means nothing to the duck type.

From my understanding, the duck type doesn't care neither of the stored version, nor the original version used by the user to create the resource.

To be more specific: here I'm not performing discovery of duck types, nor looking for the duck type version etc. The user gives me a concrete channel kind and api group and I select for her/him the version, that's it. For what is worth, we could even pick just the first or the last version, since which version to pick is not important.

I query the storage version because, at the end of the day, this is the ultimate source of truth for k8s regarding that particular resource and skips some conversions in the middle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proper way is to specify the API version that you want to use. The label on the resource says which duck type if conforms to. Storage version is totally irrelevant, just in this case you happen to know that this version conforms to a duck type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea in this PR:

Kind: Subscription
...
Spec:
  Channel:
    API Version:  messaging.knative.dev
    Kind:         KafkaChannel
    Name:         testchannel

When the controller sees that there's no version in the "API version" field, it goes and checks the versions of this resource. It transiently sets the storage version of the resource on the reference. So, in memory, it becomes like this:

Kind: Subscription
...
Spec:
  Channel:
    API Version:  messaging.knative.dev/v1alpha1
    Kind:         KafkaChannel
    Name:         testchannel

And then, it fetches the channel as messaging.knative.dev/v1alpha1 KafkaChannel.

In KafkaChannel, we have v1alpha1 API supporting v1alpha1 subscribable duck and we have v1beta1 API supporting v1 subscribable duck.

When it fetches the channel as messaging.knative.dev/v1alpha1, doesn't it get v1alpha1 in the duck annotation? And then based on that the subscriber reconciler would operate as if this channel ref is a ref to a channel that's supporting v1alpha1 duck type.

This is why the storage version is used. If there's no version is specified, it just falls back to storage version to be able to operate with that resource.

What you are trying to do is find the version of the channel that implements the duck type the subscription controller assumes to use, but this is not the way to do it.

Nope, just find an arbitrary version (in this case, getting the storage version makes the reasonable arbitrary version to use) and then let the subscription controller take it from there. It can handle whatever duck version is specified in the resource fetched as storage version.

if err != nil {
return err
}

objRef.SetGroupVersionKind(actualGvk)

return nil
}

// This function runs under the assumption that there must be exactly one "storage" version
func findCRDStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, error) {
for _, version := range crd.Spec.Versions {
if version.Storage {
return version.Name, nil
}
}
return "", fmt.Errorf("this CRD %s doesn't have a storage version! Kubernetes, you're drunk, go home", crd.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/hold

}
92 changes: 88 additions & 4 deletions pkg/reconciler/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/network"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -46,6 +42,12 @@ import (
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/resolver"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/messaging"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
pkgtesting "knative.dev/eventing/pkg/reconciler/testing"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
Expand Down Expand Up @@ -117,6 +119,11 @@ var (
Kind: "InMemoryChannel",
}

imcGVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Kind: "InMemoryChannel",
}

channelV1GVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Version: "v1",
Expand Down Expand Up @@ -217,6 +224,82 @@ func TestAllCases(t *testing.T) {
MarkSubscriptionReady,
),
}},
}, {
Name: "subscription goes ready without api version",
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannel(imcGVK, channelName),
WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
),
// Subscriber
NewUnstructured(subscriberGVK, subscriberName, testNS,
WithUnstructuredAddressable(subscriberDNS),
),
// Reply
NewInMemoryChannel(replyName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelAddress(replyDNS),
),
// Channel
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelReady(channelDNS),
WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{
UID: subscriptionUID,
Generation: 0,
SubscriberURI: subscriberURI,
ReplyURI: replyURI,
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}}),
WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{
UID: subscriptionUID,
ObservedGeneration: 0,
Ready: "True",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: "True",
}}),
),
// IMC CRD
pkgtesting.NewCustomResourceDefinition(messaging.InMemoryChannelsResource.String(),
pkgtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
},
Key: testNS + "/" + subscriptionName,
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannel(imcGVK, channelName),
WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
// - Status Update -
MarkSubscriptionReady,
),
}},
}, {
Name: "channel does not exist",
Objects: []runtime.Object{
Expand Down Expand Up @@ -1374,6 +1457,7 @@ func TestAllCases(t *testing.T) {
ctx = addressable.WithDuck(ctx)
r := &Reconciler{
dynamicClientSet: dynamicclient.Get(ctx),
crdLister: listers.GetCustomResourceDefinitionLister(),
subscriptionLister: listers.GetSubscriptionLister(),
channelLister: listers.GetMessagingChannelLister(),
channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0),
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/testing/v1/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func WithUnstructuredAddressable(hostname string) UnstructuredOption {
}

func apiVersion(gvk metav1.GroupVersionKind) string {
if gvk.Version == "" {
return gvk.Group
}
groupVersion := gvk.Version
if gvk.Group != "" {
groupVersion = gvk.Group + "/" + gvk.Version
Expand Down