-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathhow_to_add_and_remove_subscription.py
More file actions
151 lines (122 loc) · 5.98 KB
/
how_to_add_and_remove_subscription.py
File metadata and controls
151 lines (122 loc) · 5.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""sampler module to show how to add and remove the subscriptions"""
import pickle
import time
from concurrent.futures.thread import ThreadPoolExecutor
from typing import TypeVar, Generic
from solace.messaging.messaging_service import MessagingService
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.resources.topic_subscription import TopicSubscription
from solace.messaging.utils.converter import BytesToObject
from howtos.pubsub.how_to_direct_publish_message import HowToDirectPublishMessage
from howtos.sampler_boot import SamplerBoot, SolaceConstants
X = TypeVar('X')
constants = SolaceConstants
boot = SamplerBoot()
MAX_SLEEP = 10
class MessageHandlerImpl1(MessageHandler):
"""this method is an call back handler to receive message"""
def on_message(self, message: 'InboundMessage'):
""" Message receive callback """
topic = message.get_destination_name()
payload_as_bytes = message.get_payload_as_bytes()
payload_as_string = message.get_payload_as_string()
correlation_id = message.get_correlation_id()
print("\n" + f"CALLBACK: Message Received on Topic: {topic}.\n"
f"Message Bytes: {payload_as_bytes} \n"
f"Message String: {payload_as_string} \n"
f"Correlation id: {correlation_id}"
)
class MessageHandlerImpl2(MessageHandler):
"""this method is an call back handler to receive message"""
def on_message(self, message: 'InboundMessage'):
""" Message receive callback """
topic = message.get_destination_name()
payload_as_bytes = message.get_payload_as_bytes()
payload_as_string = message.get_payload_as_string()
correlation_id = message.get_correlation_id()
print("\n" + f"CALLBACK: Message Received on Topic: {topic}.\n"
f"Message Bytes: {payload_as_bytes} \n"
f"Message String: {payload_as_string} \n"
f"Correlation id: {correlation_id}")
class MyData(Generic[X]):
""" sample class for business object"""
name = 'some string'
def __init__(self, name):
self.name = name
def get_name(self):
""" return the name"""
return self.name
class ByteToObjectConverter(BytesToObject):
"""sample converter class to convert byte array to object"""
def convert(self, src: bytearray) -> X:
"""This method converts the received byte array to an business object"""
byte_to_object = pickle.loads(src)
return byte_to_object
class HowToAddAndRemoveSubscriptionSampler:
"""
class to show how to create a messaging service
"""
@staticmethod
def direct_message_consume_adding_subscriptions(messaging_service: MessagingService, consumer_subscription: str,
listener_topics: list):
""" to publish str or byte array type message
Args:
messaging_service: connected messaging service
consumer_subscription: Each topic subscribed
listener_topics: list of topics subscribed to
"""
try:
global MAX_SLEEP
topics = [TopicSubscription.of(consumer_subscription)]
direct_receive_service = messaging_service.create_direct_message_receiver_builder()
direct_receive_service = direct_receive_service.with_subscriptions(topics).build()
direct_receive_service.start()
direct_receive_service.receive_async(MessageHandlerImpl1())
for topic in listener_topics:
direct_receive_service.add_subscription(TopicSubscription.of(topic))
print(f"Subscribed to: {consumer_subscription}")
time.sleep(MAX_SLEEP)
finally:
direct_receive_service.terminate()
@staticmethod
def direct_message_consume_removing_subscriptions(messaging_service: MessagingService, consumer_subscription: str,
listener_topics: list):
""" to publish str or byte array type message
Args:
messaging_service: connected messaging service
consumer_subscription: Each topic subscribed
listener_topics: list of topics subscribed to
"""
try:
global MAX_SLEEP
topics = [TopicSubscription.of(consumer_subscription)]
direct_receive_service = messaging_service.create_direct_message_receiver_builder()
direct_receive_service = direct_receive_service.with_subscriptions(topics).build()
direct_receive_service.start()
direct_receive_service.receive_async(MessageHandlerImpl2())
for topic in listener_topics:
direct_receive_service.remove_subscription(TopicSubscription.of(topic))
print(f"Subscribed to: {consumer_subscription}")
time.sleep(MAX_SLEEP)
finally:
direct_receive_service.terminate()
@staticmethod
def run():
service = MessagingService.builder().from_properties(boot.broker_properties()).build()
service.connect()
consumer_subscription = constants.TOPIC_ENDPOINT_DEFAULT
listener_topics = ['try-me', 'try-me1',
'try-me2',
'try-me3']
HowToAddAndRemoveSubscriptionSampler() \
.direct_message_consume_adding_subscriptions(service, consumer_subscription, listener_topics)
HowToAddAndRemoveSubscriptionSampler() \
.direct_message_consume_removing_subscriptions(service, consumer_subscription, listener_topics)
@staticmethod
def publish_and_subscribe():
"""method for running the publisher and subscriber"""
with ThreadPoolExecutor(max_workers=2) as e:
e.submit(HowToAddAndRemoveSubscriptionSampler.run)
e.submit(HowToDirectPublishMessage.run)
if __name__ == '__main__':
HowToAddAndRemoveSubscriptionSampler.publish_and_subscribe()