class Mongo::Cursor
A cursor over query results. Returned objects are hashes.
Attributes
Public Class Methods
Create a new cursor.
Note: cursors are created when executing queries using [Collection#find] and other similar methods. Application developers shouldn't have to create cursors manually.
@return [Cursor]
# File lib/mongo/cursor.rb, line 37 def initialize(collection, opts={}) opts = opts.dup @cursor_id = opts.delete(:cursor_id) @db = collection.db @collection = collection @ns = opts.delete(:ns) @connection = @db.connection @logger = @connection.logger # Query selector @selector = opts.delete(:selector) || {} # Query pre-serialized bson to append @bson = @selector.delete(:bson) # Special operators that form part of $query @order = opts.delete(:order) @explain = opts.delete(:explain) @hint = opts.delete(:hint) @snapshot = opts.delete(:snapshot) @max_scan = opts.delete(:max_scan) @return_key = opts.delete(:return_key) @show_disk_loc = opts.delete(:show_disk_loc) @comment = opts.delete(:comment) @compile_regex = opts.key?(:compile_regex) ? opts.delete(:compile_regex) : true # Wire-protocol settings @fields = convert_fields_for_query(opts.delete(:fields)) @skip = opts.delete(:skip) || 0 @limit = opts.delete(:limit) || 0 @tailable = opts.delete(:tailable) @timeout = opts.key?(:timeout) ? opts.delete(:timeout) : true @options = 0 # Use this socket for the query @socket = opts.delete(:socket) @pool = opts.delete(:pool) @closed = false @query_run = false @transformer = opts.delete(:transformer) @read = opts.delete(:read) || @collection.read Mongo::ReadPreference::validate(@read) @tag_sets = opts.delete(:tag_sets) || @collection.tag_sets @acceptable_latency = opts.delete(:acceptable_latency) || @collection.acceptable_latency batch_size(opts.delete(:batch_size) || 0) @cache = opts.delete(:first_batch) || [] @returned = 0 if(!@timeout) add_option(OP_QUERY_NO_CURSOR_TIMEOUT) end if(@read != :primary) add_option(OP_QUERY_SLAVE_OK) end if(@tailable) add_option(OP_QUERY_TAILABLE) end # If a cursor_id is provided, this is a cursor for a command if @cursor_id @command_cursor = true @query_run = true end if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/ @command = true else @command = false end @opts = opts end
Public Instance Methods
Add an option to the query options bitfield.
@param opt a valid query option
@raise InvalidOperation if this method is run after the cursor has bee
iterated for the first time.
@return [Integer] the current value of the options bitfield for this cursor.
@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
# File lib/mongo/cursor.rb, line 437 def add_option(opt) check_modifiable if exhaust?(opt) if @limit != 0 raise MongoArgumentError, "Exhaust is incompatible with limit." elsif @connection.mongos? raise MongoArgumentError, "Exhaust is incompatible with mongos." end end @options |= opt @options end
Guess whether the cursor is alive on the server.
Note that this method only checks whether we have a cursor id. The cursor may still have timed out on the server. This will be indicated in the next call to #next.
@return [Boolean]
# File lib/mongo/cursor.rb, line 122 def alive? @cursor_id && @cursor_id != 0 end
Set the batch size for server responses.
Note that the batch size will take effect only on queries where the number to be returned is greater than 100.
This can not override MongoDB's limit on the amount of data it will return to the client. Depending on server version this can be 4-16mb.
@param [Integer] size either 0 or some integer greater than 1. If 0,
the server will determine the batch size.
@return [Cursor]
# File lib/mongo/cursor.rb, line 317 def batch_size(size=nil) return @batch_size unless size check_modifiable if size < 0 || size == 1 raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1." else @batch_size = @limit != 0 && size > @limit ? @limit : size end self end
Close the cursor.
Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to close it manually.
Note also: Mongo::Collection#find takes an optional block argument which can be used to ensure that your cursors get closed.
@return [True]
# File lib/mongo/cursor.rb, line 392 def close if @cursor_id && @cursor_id != 0 message = BSON::ByteBuffer.new([0, 0, 0, 0]) message.put_int(1) message.put_long(@cursor_id) log(:debug, "Cursor#close #{@cursor_id}") @connection.send_message( Mongo::Constants::OP_KILL_CURSORS, message, :pool => @pool ) end @cursor_id = 0 @closed = true end
Is this cursor closed?
@return [Boolean]
# File lib/mongo/cursor.rb, line 411 def closed? @closed end
Get the size of the result set for this query.
@param [Boolean] skip_and_limit whether or not to take skip or limit into account.
@return [Integer] the number of objects in the result set for this query.
@raise [OperationFailure] on a database error.
# File lib/mongo/cursor.rb, line 200 def count(skip_and_limit = false) check_command_cursor command = BSON::OrderedHash["count", @collection.name, "query", @selector] if skip_and_limit command.merge!(BSON::OrderedHash["limit", @limit]) if @limit != 0 command.merge!(BSON::OrderedHash["skip", @skip]) if @skip != 0 end if @hint hint = @hint.is_a?(String) ? @hint : generate_index_name(@hint) end command.merge!(BSON::OrderedHash["fields", @fields]) command.merge!(BSON::OrderedHash["hint", hint]) if hint response = @db.command(command, :read => @read, :comment => @comment) return response['n'].to_i if Mongo::Support.ok?(response) return 0 if response['errmsg'] == "ns missing" raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response) end
Iterate over each document in this cursor, yielding it to the given block, if provided. An Enumerator is returned if no block is given.
Iterating over an entire cursor will close it.
@yield passes each document to a block for processing.
@example if 'comments' represents a collection of comments:
comments.find.each do |doc| puts doc['user'] end
# File lib/mongo/cursor.rb, line 340 def each if block_given? || !defined?(Enumerator) while doc = self.next yield doc end else Enumerator.new do |yielder| while doc = self.next yielder.yield doc end end end end
Get the explain plan for this cursor.
@return [Hash] a document containing the explain plan for this cursor.
# File lib/mongo/cursor.rb, line 372 def explain check_command_cursor c = Cursor.new(@collection, query_options_hash.merge(:limit => -@limit.abs, :explain => true)) explanation = c.next_document c.close explanation end
# File lib/mongo/cursor.rb, line 126 def full_collection_name @ns || "#{@collection.db.name}.#{@collection.name}" end
Determine whether this cursor has any remaining results.
@return [Boolean]
# File lib/mongo/cursor.rb, line 189 def has_next? num_remaining > 0 end
Clean output for inspect.
# File lib/mongo/cursor.rb, line 489 def inspect "<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{full_collection_name}' " + "@selector=#{@selector.inspect} @cursor_id=#{@cursor_id}>" end
Limit the number of results to be returned by this cursor.
This method overrides any limit specified in the Mongo::Collection#find method, and only the last limit applied has an effect.
@return [Integer] the current number_to_return if no parameter is given.
@raise [InvalidOperation] if this cursor has already been used.
# File lib/mongo/cursor.rb, line 251 def limit(number_to_return=nil) return @limit unless number_to_return check_modifiable if (number_to_return != 0) && exhaust? raise MongoArgumentError, "Limit is incompatible with exhaust option." end @limit = number_to_return self end
Instruct the server to abort queries after they exceed the specified wall-clock execution time.
A query that completes in under its time limit will “roll over” remaining time to the first getmore op (which will then “roll over” its remaining time to the second getmore op and so on, until the time limit is hit).
Cursors returned by successful time-limited queries will still obey the default cursor idle timeout (unless the “no cursor idle timeout” flag has been set).
@note This will only have an effect in MongoDB 2.5+
@param #max_time_ms [Fixnum] max execution time (in milliseconds)
@return [Fixnum, Cursor] either the current #max_time_ms or cursor
# File lib/mongo/cursor.rb, line 297 def max_time_ms(max_time_ms=nil) return @max_time_ms unless max_time_ms check_modifiable @max_time_ms = max_time_ms self end
Get the next document specified the cursor options.
@return [Hash, Nil] the next document or Nil if no documents remain.
# File lib/mongo/cursor.rb, line 133 def next if @cache.length == 0 if @query_run && exhaust? close return nil else refresh end end doc = @cache.shift if doc && (err = doc['errmsg'] || doc['$err']) # assignment code = doc['code'] || doc['assertionCode'] # If the server has stopped being the master (e.g., it's one of a # pair but it has died or something like that) then we close that # connection. The next request will re-open on master server. if err.include?("not master") @connection.close raise ConnectionFailure.new(err, code, doc) end # Handle server side operation execution timeout if code == 50 raise ExecutionTimeout.new(err, code, doc) end raise OperationFailure.new(err, code, doc) elsif doc && (write_concern_error = doc['writeConcernError']) # assignment raise WriteConcernError.new(write_concern_error['errmsg'], write_concern_error['code'], doc) end if @transformer.nil? doc else @transformer.call(doc) if doc end end
Get the query options for this Cursor.
@return [Hash]
# File lib/mongo/cursor.rb, line 472 def query_options_hash BSON::OrderedHash[ :selector => @selector, :fields => @fields, :skip => @skip, :limit => @limit, :order => @order, :hint => @hint, :snapshot => @snapshot, :timeout => @timeout, :max_scan => @max_scan, :return_key => @return_key, :show_disk_loc => @show_disk_loc, :comment => @comment ] end
Returns an integer indicating which query options have been selected.
@return [Integer]
@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY The MongoDB wire protocol.
# File lib/mongo/cursor.rb, line 421 def query_opts warn "The method Cursor#query_opts has been deprecated " + "and will removed in v2.0. Use Cursor#options instead." @options end
Remove an option from the query options bitfield.
@param opt a valid query option
@raise InvalidOperation if this method is run after the cursor has bee
iterated for the first time.
@return [Integer] the current value of the options bitfield for this cursor.
@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
# File lib/mongo/cursor.rb, line 462 def remove_option(opt) check_modifiable @options &= ~opt @options end
Reset this cursor on the server. Cursor options, such as the query string and the values for skip and limit, are preserved.
# File lib/mongo/cursor.rb, line 175 def rewind! check_command_cursor close @cache.clear @cursor_id = nil @closed = false @query_run = false @n_received = nil true end
Skips the first number_to_skip
results of this cursor. Returns
the current number_to_skip if no parameter is given.
This method overrides any skip specified in the Mongo::Collection#find method, and only the last skip applied has an effect.
@return [Integer]
@raise [InvalidOperation] if this cursor has already been used.
# File lib/mongo/cursor.rb, line 272 def skip(number_to_skip=nil) return @skip unless number_to_skip check_modifiable @skip = number_to_skip self end
Sort this cursor's results.
This method overrides any sort order specified in the Mongo::Collection#find method, and only the last sort applied has an effect.
@param [Symbol, Array, Hash, OrderedHash] order either 1) a key to sort by 2)
an array of [key, direction] pairs to sort by or 3) a hash of field => direction pairs to sort by. Direction should be specified as Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING (or :descending / :desc)
@raise [InvalidOperation] if this cursor has already been used.
@raise [InvalidSortValueError] if the specified order is invalid.
# File lib/mongo/cursor.rb, line 236 def sort(order, direction=nil) check_modifiable order = [[order, direction]] unless direction.nil? @order = order self end
Receive all the documents from this cursor as an array of hashes.
Notes:
If you've already started iterating over the cursor, the array returned by this method contains only the remaining documents. See #rewind! if you need to reset the cursor.
Use of this method is discouraged - in most cases, it's much more efficient to retrieve documents as you need them by iterating over the cursor.
@return [Array] an array of documents.
# File lib/mongo/cursor.rb, line 365 def to_a super end
Private Instance Methods
# File lib/mongo/cursor.rb, line 719 def check_command_cursor if @command_cursor raise InvalidOperation, "Cannot call #{caller.first} on command cursors" end end
# File lib/mongo/cursor.rb, line 713 def check_modifiable if @query_run || @closed raise InvalidOperation, "Cannot modify the query once it has been run or closed." end end
# File lib/mongo/cursor.rb, line 635 def checkin_socket(sock) @connection.checkin(sock) end
# File lib/mongo/cursor.rb, line 618 def checkout_socket_from_connection begin if @pool socket = @pool.checkout elsif @command && !Mongo::ReadPreference::secondary_ok?(@selector) socket = @connection.checkout_reader({:mode => :primary}) else socket = @connection.checkout_reader(read_preference) end rescue SystemStackError, NoMemoryError, SystemCallError => ex @connection.close raise ex end @pool = socket.pool socket end
# File lib/mongo/cursor.rb, line 700 def close_cursor_if_query_complete if @limit > 0 && @returned >= @limit close end end
# File lib/mongo/cursor.rb, line 725 def compile_regex? @compile_regex end
# File lib/mongo/cursor.rb, line 639 def construct_query_message message = BSON::ByteBuffer.new("", @connection.max_bson_size + MongoClient::COMMAND_HEADROOM) message.put_int(@options) BSON::BSON_RUBY.serialize_cstr(message, full_collection_name) message.put_int(@skip) @batch_size > 1 ? message.put_int(@batch_size) : message.put_int(@limit) if query_contains_special_fields? && @bson # costs two serialize calls query_message = BSON::BSON_CODER.serialize(@selector, false, false, @connection.max_bson_size + MongoClient::APPEND_HEADROOM) query_message.grow(@bson) query_spec = construct_query_spec query_spec.delete('$query') query_message.grow(BSON::BSON_CODER.serialize(query_spec, false, false, @connection.max_bson_size)) else # costs only one serialize call spec = query_contains_special_fields? ? construct_query_spec : @selector spec.merge!(@opts) query_message = BSON::BSON_CODER.serialize(spec, false, false, @connection.max_bson_size + MongoClient::APPEND_HEADROOM) query_message.grow(@bson) if @bson end message.put_binary(query_message.to_s) message.put_binary(BSON::BSON_CODER.serialize(@fields, false, false, @connection.max_bson_size).to_s) if @fields message end
# File lib/mongo/cursor.rb, line 671 def construct_query_spec return @selector if @selector.has_key?('$query') spec = BSON::OrderedHash.new spec['$query'] = @selector spec['$orderby'] = Mongo::Support.format_order_clause(@order) if @order spec['$hint'] = @hint if @hint && @hint.length > 0 spec['$explain'] = true if @explain spec['$snapshot'] = true if @snapshot spec['$maxScan'] = @max_scan if @max_scan spec['$returnKey'] = true if @return_key spec['$showDiskLoc'] = true if @show_disk_loc spec['$comment'] = @comment if @comment spec['$maxTimeMS'] = @max_time_ms if @max_time_ms if needs_read_pref? read_pref = Mongo::ReadPreference::mongos(@read, @tag_sets) spec['$readPreference'] = read_pref if read_pref end spec end
Convert the :fields
parameter from a single field name or an
array of fields names to a hash, with the field names for keys and
'1' for each value.
# File lib/mongo/cursor.rb, line 499 def convert_fields_for_query(fields) case fields when String, Symbol {fields => 1} when Array return nil if fields.length.zero? fields.inject({}) do |hash, field| field.is_a?(Hash) ? hash.merge!(field) : hash[field] = 1 hash end when Hash return fields end end
Check whether the exhaust option is set
@return [true, false] The state of the exhaust flag.
# File lib/mongo/cursor.rb, line 709 def exhaust?(opts = options) !(opts & OP_QUERY_EXHAUST).zero? end
# File lib/mongo/cursor.rb, line 729 def generate_index_name(spec) indexes = [] spec.each_pair do |field, type| indexes.push("#{field}_#{type}") end indexes.join("_") end
# File lib/mongo/cursor.rb, line 662 def instrument_payload log = { :database => @db.name, :collection => @collection.name, :selector => selector } log[:fields] = @fields if @fields log[:skip] = @skip if @skip && (@skip != 0) log[:limit] = @limit if @limit && (@limit != 0) log[:order] = @order if @order log end
# File lib/mongo/cursor.rb, line 691 def needs_read_pref? @connection.mongos? && @read != :primary end
Return the number of documents remaining for this cursor.
# File lib/mongo/cursor.rb, line 515 def num_remaining if @cache.length == 0 if @query_run && exhaust? close return 0 else refresh end end @cache.length end
# File lib/mongo/cursor.rb, line 737 def pin_pool?(response) ( response && (response['cursor'] || response['cursors']) ) || ( !@socket && !@command ) end
# File lib/mongo/cursor.rb, line 695 def query_contains_special_fields? @order || @explain || @hint || @snapshot || @show_disk_loc || @max_scan || @return_key || @comment || @max_time_ms || needs_read_pref? end
Refresh the documents in @cache. This means either sending the initial query or sending a GET_MORE operation.
# File lib/mongo/cursor.rb, line 530 def refresh if !@query_run send_initial_query elsif !@cursor_id.zero? send_get_more end end
# File lib/mongo/cursor.rb, line 582 def send_get_more message = BSON::ByteBuffer.new([0, 0, 0, 0]) # DB name. BSON::BSON_RUBY.serialize_cstr(message, full_collection_name) # Number of results to return. if @limit > 0 limit = @limit - @returned if @batch_size > 0 limit = limit < @batch_size ? limit : @batch_size end message.put_int(limit) else message.put_int(@batch_size) end # Cursor id. message.put_long(@cursor_id) log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger socket = @pool.checkout begin results, @n_received, @cursor_id = @connection.receive_message( Mongo::Constants::OP_GET_MORE, message, nil, socket, @command, nil, exhaust?, compile_regex?) ensure socket.checkin end @returned += @n_received @cache += results close_cursor_if_query_complete end
Sends initial query – which is always a read unless it is a command
Upon ConnectionFailure, tries query 3 times if socket was not provided and the query is either not a command or is a secondary_ok command.
Pins pools upon successful read and unpins pool upon ConnectionFailure
# File lib/mongo/cursor.rb, line 545 def send_initial_query tries = 0 instrument(:find, instrument_payload) do begin message = construct_query_message socket = @socket || checkout_socket_from_connection results, @n_received, @cursor_id = @connection.receive_message( Mongo::Constants::OP_QUERY, message, nil, socket, @command, nil, exhaust?, compile_regex?) rescue ConnectionFailure => ex socket.close if socket @pool = nil @connection.unpin_pool @connection.refresh if tries < 3 && !@socket && (!@command || Mongo::ReadPreference::secondary_ok?(@selector)) tries += 1 retry else raise ex end rescue OperationFailure, OperationTimeout => ex raise ex ensure socket.checkin unless @socket || socket.nil? end if pin_pool?(results.first) @connection.pin_pool(socket.pool, read_preference) end @returned += @n_received @cache += results @query_run = true close_cursor_if_query_complete end end