Class | MCollective::Message |
In: |
lib/mcollective/message.rb
|
Parent: | Object |
container for a message, its headers, agent, collective and other meta data
VALIDTYPES | = | [:message, :request, :direct_request, :reply] |
agent | [RW] | |
collective | [RW] | |
discovered_hosts | [RW] | |
expected_msgid | [R] | |
filter | [RW] | |
headers | [RW] | |
message | [R] | |
msgtime | [R] | |
options | [RW] | |
payload | [R] | |
request | [R] | |
requestid | [RW] | |
ttl | [RW] | |
type | [R] | |
validated | [R] |
payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for options[:expected_msgid] - in the case of replies this is the msgid it is expecting in the replies
# File lib/mcollective/message.rb, line 22 22: def initialize(payload, message, options = {}) 23: options = {:base64 => false, 24: :agent => nil, 25: :headers => {}, 26: :type => :message, 27: :request => nil, 28: :filter => Util.empty_filter, 29: :options => {}, 30: :ttl => 60, 31: :expected_msgid => nil, 32: :collective => nil}.merge(options) 33: 34: @payload = payload 35: @message = message 36: @requestid = nil 37: @discovered_hosts = nil 38: 39: @type = options[:type] 40: @headers = options[:headers] 41: @base64 = options[:base64] 42: @filter = options[:filter] 43: @expected_msgid = options[:expected_msgid] 44: @options = options[:options] 45: 46: @ttl = @options[:ttl] || Config.instance.ttl 47: @msgtime = 0 48: 49: @validated = false 50: 51: if options[:request] 52: @request = options[:request] 53: @agent = request.agent 54: @collective = request.collective 55: @type = :reply 56: else 57: @agent = options[:agent] 58: @collective = options[:collective] 59: end 60: 61: base64_decode! 62: end
# File lib/mcollective/message.rb, line 97 97: def base64_decode! 98: return unless @base64 99: 100: @body = SSL.base64_decode(@body) 101: @base64 = false 102: end
# File lib/mcollective/message.rb, line 104 104: def base64_encode! 105: return if @base64 106: 107: @body = SSL.base64_encode(@body) 108: @base64 = true 109: end
# File lib/mcollective/message.rb, line 187 187: def create_reqid 188: Digest::MD5.hexdigest("#{Config.instance.identity}-#{Time.now.to_f}-#{agent}-#{collective}") 189: end
# File lib/mcollective/message.rb, line 131 131: def decode! 132: raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type) 133: 134: @payload = PluginManager["security_plugin"].decodemsg(self) 135: 136: if type == :request 137: raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid]) 138: end 139: 140: [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop| 141: instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop) 142: end 143: end
# File lib/mcollective/message.rb, line 115 115: def encode! 116: case type 117: when :reply 118: raise "Cannot encode a reply message if no request has been associated with it" unless request 119: raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid]) 120: 121: @requestid = request.payload[:requestid] 122: @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid]) 123: when :request, :direct_request 124: @requestid = create_reqid 125: @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl) 126: else 127: raise "Cannot encode #{type} messages" 128: end 129: end
in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.
# File lib/mcollective/message.rb, line 92 92: def expected_msgid=(msgid) 93: raise "Can only store the expected msgid for reply messages" unless @type == :reply 94: @expected_msgid = msgid 95: end
publish a reply message by creating a target name and sending it
# File lib/mcollective/message.rb, line 169 169: def publish 170: Timeout.timeout(2) do 171: # If we've been specificaly told about hosts that were discovered 172: # use that information to do P2P calls if appropriate else just 173: # send it as is. 174: if @discovered_hosts && Config.instance.direct_addressing 175: if @discovered_hosts.size <= Config.instance.direct_addressing_threshold 176: @type = :direct_request 177: Log.debug("Handling #{requestid} as a direct request") 178: end 179: 180: PluginManager["connector_plugin"].publish(self) 181: else 182: PluginManager["connector_plugin"].publish(self) 183: end 184: end 185: end
Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.
Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.
# File lib/mcollective/message.rb, line 73 73: def type=(type) 74: if type == :direct_request 75: raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing 76: 77: unless @discovered_hosts && !@discovered_hosts.empty? 78: raise "Can only set type to :direct_request if discovered_hosts have been set" 79: end 80: end 81: 82: raise "Unknown message type #{type}" unless VALIDTYPES.include?(type) 83: 84: @type = type 85: end
Perform validation against the message by checking filters and ttl
# File lib/mcollective/message.rb, line 146 146: def validate 147: raise "Can only validate request messages" unless type == :request 148: 149: msg_age = Time.now.utc.to_i - msgtime 150: 151: if msg_age > ttl 152: cid = "" 153: cid += payload[:callerid] + "@" if payload.include?(:callerid) 154: cid += payload[:senderid] 155: 156: if msg_age > ttl 157: PluginManager["global_stats"].ttlexpired 158: 159: raise(MsgTTLExpired, "Message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}") 160: end 161: end 162: 163: raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter]) 164: 165: @validated = true 166: end