Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 55   
56 -class SendException(ProtonException):
57 """ 58 Exception used to indicate an exceptional state/condition on a send request 59 """
60 - def __init__(self, state):
61 self.state = state
62
63 -def _is_settled(delivery):
64 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
65
66 -class BlockingSender(BlockingLink):
67 - def __init__(self, connection, sender):
68 super(BlockingSender, self).__init__(connection, sender) 69 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 70 #this may be followed by a detach, which may contain an error condition, so wait a little... 71 self._waitForClose() 72 #...but close ourselves if peer does not 73 self.link.close() 74 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
75
76 - def send(self, msg, timeout=False, error_states=None):
77 delivery = self.link.send(msg) 78 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 79 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 80 delivery.settle() 81 bad = error_states 82 if bad is None: 83 bad = [Delivery.REJECTED, Delivery.RELEASED] 84 if delivery.remote_state in bad: 85 raise SendException(delivery.remote_state) 86 return delivery
87
88 -class Fetcher(MessagingHandler):
89 - def __init__(self, connection, prefetch):
90 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 91 self.connection = connection 92 self.incoming = collections.deque([]) 93 self.unsettled = collections.deque([])
94
95 - def on_message(self, event):
96 self.incoming.append((event.message, event.delivery)) 97 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
98 103
104 - def on_connection_error(self, event):
105 raise ConnectionClosed(event.connection)
106 107 @property
108 - def has_message(self):
109 return len(self.incoming)
110
111 - def pop(self):
112 message, delivery = self.incoming.popleft() 113 if not delivery.settled: 114 self.unsettled.append(delivery) 115 return message
116
117 - def settle(self, state=None):
118 delivery = self.unsettled.popleft() 119 if state: 120 delivery.update(state) 121 delivery.settle()
122
123 124 -class BlockingReceiver(BlockingLink):
125 - def __init__(self, connection, receiver, fetcher, credit=1):
126 super(BlockingReceiver, self).__init__(connection, receiver) 127 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 128 #this may be followed by a detach, which may contain an error condition, so wait a little... 129 self._waitForClose() 130 #...but close ourselves if peer does not 131 self.link.close() 132 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 133 if credit: receiver.flow(credit) 134 self.fetcher = fetcher 135 self.container = connection.container
136
137 - def __del__(self):
138 self.fetcher = None 139 # The next line causes a core dump if the Proton-C reactor finalizes 140 # first. The self.container reference prevents reactor finalization 141 # until after it is set to None. 142 self.link.handler = None 143 self.container = None
144
145 - def receive(self, timeout=False):
146 if not self.fetcher: 147 raise Exception("Can't call receive on this receiver as a handler was provided") 148 if not self.link.credit: 149 self.link.flow(1) 150 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 151 return self.fetcher.pop()
152
153 - def accept(self):
155
156 - def reject(self):
158
159 - def release(self, delivered=True):
160 if delivered: 161 self.settle(Delivery.MODIFIED) 162 else: 163 self.settle(Delivery.RELEASED)
164
165 - def settle(self, state=None):
166 if not self.fetcher: 167 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 168 self.fetcher.settle(state)
169
170 171 -class LinkDetached(LinkException):
172 - def __init__(self, link):
173 self.link = link 174 if link.is_sender: 175 txt = "sender %s to %s closed" % (link.name, link.target.address) 176 else: 177 txt = "receiver %s from %s closed" % (link.name, link.source.address) 178 if link.remote_condition: 179 txt += " due to: %s" % link.remote_condition 180 self.condition = link.remote_condition.name 181 else: 182 txt += " by peer" 183 self.condition = None 184 super(LinkDetached, self).__init__(txt)
185
186 187 -class ConnectionClosed(ConnectionException):
188 - def __init__(self, connection):
189 self.connection = connection 190 txt = "Connection %s closed" % connection.hostname 191 if connection.remote_condition: 192 txt += " due to: %s" % connection.remote_condition 193 self.condition = connection.remote_condition.name 194 else: 195 txt += " by peer" 196 self.condition = None 197 super(ConnectionClosed, self).__init__(txt)
198
199 200 -class BlockingConnection(Handler):
201 """ 202 A synchronous style connection wrapper. 203 """
204 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
205 self.disconnected = False 206 self.timeout = timeout or 60 207 self.container = container or Container() 208 self.container.timeout = self.timeout 209 self.container.start() 210 self.url = Url(url).defaults() 211 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 212 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 213 msg="Opening connection")
214
215 - def create_sender(self, address, handler=None, name=None, options=None):
216 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
217
218 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
219 prefetch = credit 220 if handler: 221 fetcher = None 222 if prefetch is None: 223 prefetch = 1 224 else: 225 fetcher = Fetcher(self, credit) 226 return BlockingReceiver( 227 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
228
229 - def close(self):
230 if not self.conn: 231 return 232 self.conn.close() 233 try: 234 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 235 msg="Closing connection") 236 finally: 237 self.conn.free() 238 # For cleanup, reactor needs to process PN_CONNECTION_FINAL 239 # and all events with embedded contexts must be drained. 240 self.run() # will not block any more 241 self.conn = None 242 self.container.global_handler = None # break circular ref: container to cadapter.on_error 243 self.container = None
244
245 - def _is_closed(self):
246 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
247
248 - def run(self):
249 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 250 while self.container.process(): pass 251 self.container.stop() 252 self.container.process()
253
254 - def wait(self, condition, timeout=False, msg=None):
255 """Call process until condition() is true""" 256 if timeout is False: 257 timeout = self.timeout 258 if timeout is None: 259 while not condition() and not self.disconnected: 260 self.container.process() 261 else: 262 container_timeout = self.container.timeout 263 self.container.timeout = timeout 264 try: 265 deadline = time.time() + timeout 266 while not condition() and not self.disconnected: 267 self.container.process() 268 if deadline < time.time(): 269 txt = "Connection %s timed out" % self.url 270 if msg: txt += ": " + msg 271 raise Timeout(txt) 272 finally: 273 self.container.timeout = container_timeout 274 if self.disconnected or self._is_closed(): 275 self.container.stop() 276 self.conn.handler = None # break cyclical reference 277 if self.disconnected and not self._is_closed(): 278 raise ConnectionException( 279 "Connection %s disconnected: %s" % (self.url, self.disconnected))
280 285
286 - def on_connection_remote_close(self, event):
287 if event.connection.state & Endpoint.LOCAL_ACTIVE: 288 event.connection.close() 289 raise ConnectionClosed(event.connection)
290
291 - def on_transport_tail_closed(self, event):
292 self.on_transport_closed(event)
293
294 - def on_transport_head_closed(self, event):
295 self.on_transport_closed(event)
296
297 - def on_transport_closed(self, event):
298 self.disconnected = event.transport.condition or "unknown"
299
300 -class AtomicCount(object):
301 - def __init__(self, start=0, step=1):
302 """Thread-safe atomic counter. Start at start, increment by step.""" 303 self.count, self.step = start, step 304 self.lock = threading.Lock()
305
306 - def next(self):
307 """Get the next value""" 308 self.lock.acquire() 309 self.count += self.step; 310 result = self.count 311 self.lock.release() 312 return result
313
314 -class SyncRequestResponse(IncomingMessageHandler):
315 """ 316 Implementation of the synchronous request-responce (aka RPC) pattern. 317 @ivar address: Address for all requests, may be None. 318 @ivar connection: Connection for requests and responses. 319 """ 320 321 correlation_id = AtomicCount() 322
323 - def __init__(self, connection, address=None):
324 """ 325 Send requests and receive responses. A single instance can send many requests 326 to the same or different addresses. 327 328 @param connection: A L{BlockingConnection} 329 @param address: Address for all requests. 330 If not specified, each request must have the address property set. 331 Sucessive messages may have different addresses. 332 """ 333 super(SyncRequestResponse, self).__init__() 334 self.connection = connection 335 self.address = address 336 self.sender = self.connection.create_sender(self.address) 337 # dynamic=true generates a unique address dynamically for this receiver. 338 # credit=1 because we want to receive 1 response message initially. 339 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 340 self.response = None
341
342 - def call(self, request):
343 """ 344 Send a request message, wait for and return the response message. 345 346 @param request: A L{proton.Message}. If L{self.address} is not set the 347 L{self.address} must be set and will be used. 348 """ 349 if not self.address and not request.address: 350 raise ValueError("Request message has no address: %s" % request) 351 request.reply_to = self.reply_to 352 request.correlation_id = correlation_id = self.correlation_id.next() 353 self.sender.send(request) 354 def wakeup(): 355 return self.response and (self.response.correlation_id == correlation_id)
356 self.connection.wait(wakeup, msg="Waiting for response") 357 response = self.response 358 self.response = None # Ready for next response. 359 self.receiver.flow(1) # Set up credit for the next response. 360 return response
361 362 @property
363 - def reply_to(self):
364 """Return the dynamic address of our receiver.""" 365 return self.receiver.remote_source.address
366
367 - def on_message(self, event):
368 """Called when we receive a message for our receiver.""" 369 self.response = event.message 370 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
371