Package core :: Module event_bus
[hide private]
[frames] | no frames]

Source Code for Module core.event_bus

  1  # Copyright 2011 the original author or authors. 
  2  # 
  3  # Licensed under the Apache License, Version 2.0 (the "License"); 
  4  # you may not use this file except in compliance with the License. 
  5  # You may obtain a copy of the License at 
  6  # 
  7  #      http://www.apache.org/licenses/LICENSE-2.0 
  8  # 
  9  # Unless required by applicable law or agreed to in writing, software 
 10  # distributed under the License is distributed on an "AS IS" BASIS, 
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 12  # See the License for the specific language governing permissions and 
 13  # limitations under the License. 
 14   
 15  import org.vertx.java.deploy.impl.VertxLocator 
 16  import org.vertx.java.core.buffer 
 17  import org.vertx.java.core 
 18  import org.vertx.java.core.json 
 19  import java.lang 
 20   
 21  from core.javautils import map_to_java, map_from_java 
 22  from core.buffer import Buffer 
 23   
 24  __author__ = "Scott Horn" 
 25  __email__ = "scott@hornmicro.com" 
 26  __credits__ = "Based entirely on work by Tim Fox http://tfox.org" 
27 28 -class EventBus(object):
29 """This class represents a distributed lightweight event bus which can encompass multiple vert.x instances. 30 It is very useful for otherwise isolated vert.x application instances to communicate with each other. 31 32 Messages sent over the event bus are JSON objects represented as Ruby Hash instances. 33 34 The event bus implements a distributed publish / subscribe network. 35 36 Messages are sent to an address. 37 38 There can be multiple handlers registered against that address. 39 Any handlers with a matching name will receive the message irrespective of what vert.x application instance and 40 what vert.x instance they are located in. 41 42 All messages sent over the bus are transient. On event of failure of all or part of the event bus messages 43 may be lost. Applications should be coded to cope with lost messages, e.g. by resending them, and making application 44 services idempotent. 45 46 The order of messages received by any specific handler from a specific sender will match the order of messages 47 sent from that sender. 48 49 When sending a message, a reply handler can be provided. If so, it will be called when the reply from the receiver 50 has been received. 51 52 When receiving a message in a handler the received object is an instance of EventBus::Message - this contains 53 the actual Hash of the message plus a reply method which can be used to reply to it. 54 """ 55 handler_dict = {} 56 57 @staticmethod
58 - def java_eventbus():
59 return org.vertx.java.deploy.impl.VertxLocator.vertx.eventBus()
60 61 @staticmethod
62 - def send(address, message, reply_handler=None):
63 """Send a message on the event bus 64 65 Keyword arguments: 66 @param address: the address to publish to 67 @param message: The message to send 68 @param reply_handler: An optional reply handler. 69 It will be called when the reply from a receiver is received. 70 """ 71 EventBus.send_or_pub(True, address, message, reply_handler)
72 73 @staticmethod
74 - def publish(address, message):
75 """Publish a message on the event bus 76 77 Keyword arguments: 78 @param address: the address to publish to 79 @param message: The message to publish 80 """ 81 EventBus.send_or_pub(False, address, message)
82 83 @staticmethod
84 - def send_or_pub(send, address, message, reply_handler=None):
85 if not address: 86 raise RuntimeError("An address must be specified") 87 if message is None: 88 raise RuntimeError("A message must be specified") 89 message = EventBus.convert_msg(message) 90 if send: 91 if reply_handler != None: 92 EventBus.java_eventbus().send(address, message, InternalHandler(reply_handler)) 93 else: 94 EventBus.java_eventbus().send(address, message) 95 else: 96 EventBus.java_eventbus().publish(address, message)
97 98 99 @staticmethod
100 - def register_handler(address, local_only=False, handler=None):
101 """ Register a handler. 102 103 Keyword arguments: 104 @param address: the address to register for. Any messages sent to that address will be 105 received by the handler. A single handler can be registered against many addresses. 106 @param local_only: if True then handler won't be propagated across cluster 107 @param handler: The handler 108 109 @return: id of the handler which can be used in EventBus.unregister_handler 110 """ 111 if handler is None: 112 raise RuntimeError("handler is required") 113 internal = InternalHandler(handler) 114 if local_only: 115 EventBus.java_eventbus().registerLocalHandler(address, internal) 116 else: 117 EventBus.java_eventbus().registerHandler(address, internal) 118 id = java.util.UUID.randomUUID().toString() 119 EventBus.handler_dict[id] = address, internal 120 return id
121 122 @staticmethod
123 - def register_simple_handler(local_only=False, handler=None):
124 """ 125 Registers a handler against a uniquely generated address, the address is returned as the id 126 received by the handler. A single handler can be registered against many addresses. 127 128 Keyword arguments: 129 @param local_only: If Rrue then handler won't be propagated across cluster 130 @param handler: The handler 131 132 @return: id of the handler which can be used in EventBus.unregister_handler 133 """ 134 if handler is None: 135 raise RuntimeError("Handler is required") 136 internal = InternalHandler(handler) 137 id = java.util.UUID.randomUUID().toString() 138 if local_only: 139 EventBus.java_eventbus().registerLocalHandler(id, internal) 140 else: 141 EventBus.java_eventbus().registerHandler(id, internal) 142 EventBus.handler_dict[id] = id, internal 143 return id
144 145 @staticmethod
146 - def unregister_handler(handler_id):
147 """Unregisters a handler 148 149 Keyword arguments: 150 @param handler_id: the id of the handler to unregister. Returned from EventBus.register_handler 151 """ 152 [address, handler] = EventBus.handler_dict.pop(handler_id) 153 154 EventBus.java_eventbus().unregisterHandler(address, handler)
155 156 @staticmethod
157 - def convert_msg(message):
158 if isinstance(message, dict): 159 message = org.vertx.java.core.json.JsonObject(map_to_java(message)) 160 elif isinstance(message, Buffer): 161 message = message._to_java_buffer() 162 elif isinstance(message, long): 163 message = java.lang.Long(message) 164 elif isinstance(message, float): 165 message = java.lang.Double(message) 166 elif isinstance(message, int): 167 message = java.lang.Integer(message) 168 else: 169 message = map_to_java(message) 170 return message
171
172 -class InternalHandler(org.vertx.java.core.Handler):
173 - def __init__(self, handler):
174 self.handler = handler
175
176 - def handle(self, message):
177 self.handler(Message(message))
178
179 -class Message(object):
180 """Represents a message received from the event bus"""
181 - def __init__(self, message):
182 self.java_obj = message 183 if isinstance(message.body, org.vertx.java.core.json.JsonObject): 184 self.body = map_from_java(message.body.toMap()) 185 elif isinstance(message.body, org.vertx.java.core.buffer.Buffer): 186 self.body = Buffer(message.body) 187 else: 188 self.body = map_from_java(message.body)
189
190 - def reply(self, reply, handler=None):
191 """Reply to this message. If the message was sent specifying a receipt handler, that handler will be 192 called when it has received a reply. If the message wasn't sent specifying a receipt handler 193 this method does nothing. 194 Replying to a message this way is equivalent to sending a message to an address which is the same as the message id 195 of the original message. 196 197 Keyword arguments: 198 @param reply: message to send as reply 199 @param handler: the reply handler 200 """ 201 reply = EventBus.convert_msg(reply) 202 if handler is None: 203 self.java_obj.reply(reply) 204 else: 205 self.java_obj.reply(reply, InternalHandler(handler))
206