-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathhow_to_configure_service_connection_reconnection_retries.py
More file actions
167 lines (144 loc) · 6.91 KB
/
how_to_configure_service_connection_reconnection_retries.py
File metadata and controls
167 lines (144 loc) · 6.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""sampler module for reconnection strategy"""
import time
from solace.messaging.config.retry_strategy import RetryStrategy
from solace.messaging.config.solace_properties import transport_layer_properties
from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener
from solace.messaging.resources.topic import Topic
from howtos.sampler_boot import SolaceConstants, SamplerBoot, SamplerUtil
constants = SolaceConstants
boot = SamplerBoot()
class HowToConnectWithDifferentStrategy:
"""
This is a sampler for reconnection strategy
"""
@staticmethod
def connect_never_retry():
"""
creates a new instance of message service, that is used to configure
direct message instances from config
Returns: new connection for Direct messaging
Raises:
PubSubPlusClientError
"""
try:
messaging_service = MessagingService.builder().from_properties(boot.broker_properties()) \
.with_reconnection_retry_strategy(RetryStrategy.never_retry()).build()
messaging_service.connect()
return messaging_service
except PubSubPlusClientError as exception:
raise exception
finally:
messaging_service.disconnect()
@staticmethod
def connect_retry_interval(retry_interval):
"""
creates a new instance of message service, that is used to configure
direct message instances from config
Returns: new connection for Direct messaging
Raises:
PubSubPlusClientError
"""
try:
messaging_service = MessagingService.builder().from_properties(boot.broker_properties()) \
.with_reconnection_retry_strategy(RetryStrategy.forever_retry(retry_interval)).build()
messaging_service.connect()
return messaging_service
except PubSubPlusClientError as exception:
raise exception
finally:
messaging_service.disconnect()
@staticmethod
def connect_parametrized_retry(retries, retry_interval):
"""
creates a new instance of message service, that is used to configure
direct message instances from config
Returns: new connection for Direct messaging
Raises:
PubSubPlusClientError
"""
try:
message_service = MessagingService.builder() \
.from_properties(boot.broker_properties()) \
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(retries, retry_interval)) \
.build(SamplerUtil.get_new_application_id())
message_service.connect()
return message_service
except PubSubPlusClientError as exception:
print(f'Exception: {exception}')
raise exception
finally:
message_service.disconnect()
@staticmethod
def connect_using_properties(retries: int, retry_interval: int):
"""
creates a new instance of message service, that is used to configure
direct message instances from config
Returns: new connection for Direct messaging
Raises:
PubSubPlusClientError
"""
service_config = dict()
messaging_service = None
try:
service_config[transport_layer_properties.RECONNECTION_ATTEMPTS] = retries
service_config[transport_layer_properties.RECONNECTION_ATTEMPTS_WAIT_INTERVAL] = retry_interval
messaging_service = MessagingService.builder().from_properties(boot.broker_properties()).build()
messaging_service.connect()
return messaging_service
except PubSubPlusClientError as exception:
raise exception
finally:
if messaging_service:
messaging_service.disconnect()
@staticmethod
def add_listener_when_reconnection_happens(retries: int, retry_interval: int) -> 'MessagingService':
"""method adds a reconnection listener when an reconnection happens using the reconnection strategy
Args:
retries (int): the number of retries count
retry_interval (int): the retry interval value
Returns:
the listener events
"""
events = list()
class SampleServiceReconnectionHandler(ReconnectionAttemptListener, ReconnectionListener):
def __init__(self):
self.events = list()
def on_reconnecting(event):
self.events.append(event)
print('Got reconnection attempt event, current reconnection events {self.events}')
def on_reconnected(self, event):
self.events.append(event)
print('Got reconnected event, current reconnection events {self.events}')
messaging_service = MessagingService.builder().from_properties(boot.broker_properties()) \
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(retries, retry_interval)) \
.build()
event_handler = SampleServiceReconnectionHandler()
try:
messaging_service.add_reconnection_listener(event_handler)
messaging_service.connect()
# wait for reconnection here
# for now ommitting this code as it requires external connection administration
finally:
messaging_service.disconnect()
return event_handler.events # MessagingService got list
@staticmethod
def run():
"""
:return: Success or Failed according to connection established
"""
print("Running connect_never_retry...")
print("\tSuccess" if HowToConnectWithDifferentStrategy().connect_never_retry() is not None else "Failed")
print("Running connect_retry_interval...")
print("\tSuccess" if HowToConnectWithDifferentStrategy().connect_retry_interval(3) is not None else "Failed")
print("Running connect_parametrized_retry...")
print("\tSuccess" if HowToConnectWithDifferentStrategy().connect_parametrized_retry(3, 40) is not None else "Failed")
print("Running connect_using_properties...")
print("\tSuccess" if HowToConnectWithDifferentStrategy().connect_using_properties(5, 30000) is not None else "Failed")
print("Running add_listener_when_reconnection_happens...")
# note the check for reconnection events allows for 0 events as the trigger for
# reconnect requries a manual connection administrator to force reconnection
print("\tSuccess" if len(HowToConnectWithDifferentStrategy()
.add_listener_when_reconnection_happens(3, 3000)) >= 0 else "Failed")
if __name__ == '__main__':
HowToConnectWithDifferentStrategy().run()