class Mongo::MongoReplicaSetClient

Instantiates and manages connections to a MongoDB replica set.

Constants

REPL_SET_OPTS

Attributes

manager[R]
refresh_interval[R]
refresh_mode[R]
refresh_version[R]
replica_set_name[R]
seeds[R]

Public Class Methods

new(*args) click to toggle source

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: Mongo::MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

@overload initialize(seeds=ENV, opts={})

@param [Array<String>, Array<Array(String, Integer)>] seeds

@option opts [String, Integer, Symbol] :w (1) Set default number of nodes to which a write
  should be acknowledged.
@option opts [Integer] :wtimeout (nil) Set replica set acknowledgement timeout.
@option opts [Boolean] :j (false) If true, block until write operations have been committed
  to the journal. Cannot be used in combination with 'fsync'. Prior to MongoDB 2.6 this option was
  ignored if the server was running without journaling. Starting with MongoDB 2.6, write operations will
  fail with an exception if this option is used when the server is running without journaling.
@option opts [Boolean] :fsync (false) If true, and the server is running without journaling, blocks until
  the server has synced all data files to disk. If the server is running with journaling, this acts the same as
  the 'j' option, blocking until write operations have been committed to the journal.
  Cannot be used in combination with 'j'.

  Notes about write concern options:
    Write concern options are propagated to objects instantiated from this MongoReplicaSetClient.
    These defaults can be overridden upon instantiation of any object by explicitly setting an options hash
    on initialization.
@option opts [:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest] :read (:primary)
  A "read preference" determines the candidate replica set members to which a query or command can be sent.
  [:primary]
    * Read from primary only.
    * Cannot be combined with tags.
  [:primary_preferred]
    * Read from primary if available, otherwise read from a secondary.
  [:secondary]
    * Read from secondary if available.
  [:secondary_preferred]
    * Read from a secondary if available, otherwise read from the primary.
  [:nearest]
    * Read from any member.
@option opts [Array<Hash{ String, Symbol => Tag Value }>] :tag_sets ([])
  Read from replica-set members with these tags.
@option opts [Integer] :secondary_acceptable_latency_ms (15) The acceptable
  nearest available member for a member to be considered "near".
@option opts [Logger] :logger (nil) Logger instance to receive driver operation log.
@option opts [Integer] :pool_size (1) The maximum number of socket connections allowed per
  connection pool. Note: this setting is relevant only for multi-threaded applications.
@option opts [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,
  this is the number of seconds to wait for a new connection to be released before throwing an exception.
  Note: this setting is relevant only for multi-threaded applications.
@option opts [Float] :op_timeout (DEFAULT_OP_TIMEOUT) The number of seconds to wait for a read operation to time out.
 Set to DEFAULT_OP_TIMEOUT (20) by default. A value of nil may be specified explicitly.
@option opts [Float] :connect_timeout (30) The number of seconds to wait before timing out a
  connection attempt.
@option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL.
@option opts [String] :ssl_cert (nil) The certificate file used to identify the local connection against MongoDB.
@option opts [String] :ssl_key (nil) The private keyfile used to identify the local connection against MongoDB.
  Note that even if the key is stored in the same file as the certificate, both need to be explicitly specified.
@option opts [String] :ssl_key_pass_phrase (nil) A passphrase for the private key.
@option opts [Boolean] :ssl_verify (nil) Specifies whether or not peer certification validation should occur.
@option opts [String] :ssl_ca_cert (nil) The ca_certs file contains a set of concatenated "certification authority"
  certificates, which are used to validate certificates passed from the other end of the connection.
  Required for :ssl_verify.
@option opts [Boolean] :refresh_mode (false) Set this to :sync to periodically update the
  state of the connection every :refresh_interval seconds. Replica set connection failures
  will always trigger a complete refresh. This option is useful when you want to add new nodes
  or remove replica set nodes not currently in use by the driver.
@option opts [Integer] :refresh_interval (90) If :refresh_mode is enabled, this is the number of seconds
  between calls to check the replica set's state.
@note the number of seed nodes does not have to be equal to the number of replica set members.
  The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

@example Connect to a replica set and provide two seed nodes.

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])

@example Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named 'prod':

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

@example Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)

@see api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby

@raise [MongoArgumentError] This is raised for usage errors.

@raise [ConnectionFailure] This is raised for the various connection failures.

# File lib/mongo/mongo_replica_set_client.rb, line 124
def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  nodes = args.shift || []

  raise MongoArgumentError, "Too many arguments" unless args.empty?

  # This is temporary until support for the old format is dropped
  @seeds = nodes.collect do |node|
    if node.is_a?(Array)
      warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
      warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
      node
    elsif node.is_a?(String)
      Support.normalize_seeds(node)
    else
      raise MongoArgumentError "Bad seed format!"
    end
  end

  if @seeds.empty? && ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError,
        "ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    @seeds = parser.nodes
  end

  if @seeds.length.zero?
    raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
  end

  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil

  # Lock for request ids.
  @id_lock = Mutex.new

  @connected = false

  @connect_mutex = Mutex.new

  @mongos = false

  check_opts(opts)
  setup(opts.dup)
end

Public Instance Methods

arbiters() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 423
def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end
authenticate_pools() click to toggle source

@deprecated This method is no longer in use and never needs to be called

directly. Support will be removed after v2.0

Authentication of sockets is handled upon checkout and checkin.

# File lib/mongo/legacy.rb, line 111
def authenticate_pools
end
checkin(socket) click to toggle source

Checkin a socket used for reading.

# File lib/mongo/mongo_replica_set_client.rb, line 383
def checkin(socket)
  if socket && socket.pool
    socket.checkin
  end
  sync_refresh
end
checkout() { || ... } click to toggle source

Generic socket checkout Takes a block that returns a socket from pool

# File lib/mongo/mongo_replica_set_client.rb, line 348
def checkout
  ensure_manager

  connected? ? sync_refresh : connect

  begin
    socket = yield
  rescue => ex
    checkin(socket) if socket
    raise ex
  end

  if socket
    return socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
end
checkout_reader(read_pref={}) click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 368
def checkout_reader(read_pref={})
  checkout do
    pool = read_pool(read_pref)
    get_socket_from_pool(pool)
  end
end
checkout_writer() click to toggle source

Checkout a socket for writing (i.e., a primary node).

# File lib/mongo/mongo_replica_set_client.rb, line 376
def checkout_writer
  checkout do
    get_socket_from_pool(primary_pool)
  end
end
close(opts={}) click to toggle source

Close the connection to the database.

# File lib/mongo/mongo_replica_set_client.rb, line 313
def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end

  # Clear the reference to this object.
  thread_local[:managers].delete(self)
  unpin_pool

  @connected = false
end
connect(force = !connected?) click to toggle source

Initiate a connection to the replica set.

# File lib/mongo/mongo_replica_set_client.rb, line 189
def connect(force = !connected?)
  return unless force
  log(:info, "Connecting...")

  # Prevent recursive connection attempts from the same thread.
  # This is done rather than using a Monitor to prevent potentially recursing
  # infinitely while attempting to connect and continually failing. Instead, fail fast.
  raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting] == true

  current_version = @refresh_version
  @connect_mutex.synchronize do
    # don't try to connect if another thread has done so while we were waiting for the lock
    return unless current_version == @refresh_version
    begin
      thread_local[:locks][:connecting] = true
      if @manager
        ensure_manager
        @manager.refresh!(@seeds)
      else
        @manager = PoolManager.new(self, @seeds)
        ensure_manager
        @manager.connect
      end
    ensure
      thread_local[:locks][:connecting] = false
    end
    @refresh_version += 1

    if @manager.pools.empty?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    end
    check_wire_version_in_range
    @connected = true
  end
end
connected?() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 273
def connected?
  @connected && !@manager.pools.empty?
end
connecting?() click to toggle source

@deprecated

# File lib/mongo/mongo_replica_set_client.rb, line 278
def connecting?
  warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
  false
end
ensure_manager() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 390
def ensure_manager
  thread_local[:managers][self] = @manager
end
get_socket_from_pool(pool) click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 411
def get_socket_from_pool(pool)
  begin
    pool.checkout if pool
  rescue ConnectionFailure
    nil
  end
end
hard_refresh!() click to toggle source

Force a hard refresh of this connection's view of the replica set.

@return [Boolean] true if hard refresh

occurred. +false+ is returned when unable
to get the refresh lock.
# File lib/mongo/mongo_replica_set_client.rb, line 267
def hard_refresh!
  log(:info, "Initiating hard refresh...")
  connect(true)
  return true
end
host() click to toggle source

The replica set primary's host name.

@return [String]

# File lib/mongo/mongo_replica_set_client.rb, line 286
def host
  @manager.primary_pool.host
end
hosts() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 436
def hosts
  local_manager ? local_manager.hosts : []
end
inspect() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 183
def inspect
  "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end
local_manager() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 419
def local_manager
  thread_local[:managers][self]
end
logout_pools(database) click to toggle source

@deprecated This method is no longer in use and never needs to be called

directly. Support will be removed after v2.0

Authentication of sockets is handled upon checkout and checkin.

# File lib/mongo/legacy.rb, line 117
def logout_pools(database)
end
max_bson_size() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 460
def max_bson_size
  return local_manager.max_bson_size if local_manager
  DEFAULT_MAX_BSON_SIZE
end
max_message_size() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 465
def max_message_size
  return local_manager.max_message_size if local_manager
  max_bson_size * MESSAGE_SIZE_FACTOR
end
max_wire_version() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 470
def max_wire_version
  return local_manager.max_wire_version if local_manager
  0
end
max_write_batch_size() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 484
def max_write_batch_size
  local_manager && local_manager.primary_pool && local_manager.primary_pool.node.max_write_batch_size ||
    DEFAULT_MAX_WRITE_BATCH_SIZE
end
min_wire_version() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 475
def min_wire_version
  return local_manager.min_wire_version if local_manager
  0
end
nodes() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 297
def nodes
  warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use MongoReplicaSetClient#seeds instead."
  @seeds
end
pin_pool(pool, read_preference) click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 398
def pin_pool(pool, read_preference)
  if @manager
    thread_local[:pinned_pools][@manager.object_id] = {
      :pool => pool,
      :read_preference => read_preference
    }
  end
end
pinned_pool() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 394
def pinned_pool
  thread_local[:pinned_pools][@manager.object_id] if @manager
end
pools() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 452
def pools
  local_manager ? local_manager.pools : []
end
port() click to toggle source

The replica set primary's port.

@return [Integer]

# File lib/mongo/mongo_replica_set_client.rb, line 293
def port
  @manager.primary_pool.port
end
primary() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 427
def primary
  local_manager ? local_manager.primary : nil
end
primary?()
Alias for: read_primary?
primary_pool() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 440
def primary_pool
  local_manager ? local_manager.primary_pool : nil
end
primary_wire_version_feature?(feature) click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 480
def primary_wire_version_feature?(feature)
  local_manager && local_manager.primary_pool && local_manager.primary_pool.node.wire_version_feature?(feature)
end
read_primary?() click to toggle source

Determine whether we're reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

@return [Boolean]

# File lib/mongo/mongo_replica_set_client.rb, line 307
def read_primary?
  read_pool == primary_pool
end
Also aliased as: primary?
reconnect() click to toggle source

Reconnect the replica set client.

@return [Boolean] true unless the refresh lock can't be acquired.

@since 1.12.4

# File lib/mongo/mongo_replica_set_client.rb, line 231
def reconnect
  close
  refresh
end
refresh(opts={}) click to toggle source

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running #hard_refresh!

@return [Boolean] true unless a hard refresh

is run and the refresh lock can't be acquired.
# File lib/mongo/mongo_replica_set_client.rb, line 243
def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  ensure_manager
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end
reset_connection() click to toggle source

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values. @deprecated

# File lib/mongo/mongo_replica_set_client.rb, line 330
def reset_connection
  close
  warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use MongoReplicaSetClient#close instead."
end
secondaries() click to toggle source

Note: might want to freeze these after connecting.

# File lib/mongo/mongo_replica_set_client.rb, line 432
def secondaries
  local_manager ? local_manager.secondaries : []
end
secondary_pool() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 444
def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end
secondary_pools() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 448
def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end
slave_ok?() click to toggle source

Returns true if it's okay to read from a secondary node.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

@return [Boolean] true

# File lib/mongo/mongo_replica_set_client.rb, line 342
def slave_ok?
  @read != :primary
end
tag_map() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 456
def tag_map
  local_manager ? local_manager.tag_map : {}
end
unpin_pool() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 407
def unpin_pool
  thread_local[:pinned_pools].delete @manager.object_id if @manager
end
valid_opts() click to toggle source
Calls superclass method Mongo::MongoClient#valid_opts
# File lib/mongo/mongo_replica_set_client.rb, line 179
def valid_opts
  super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end

Private Instance Methods

setup(opts) click to toggle source

Parse option hash

Calls superclass method Mongo::MongoClient#setup
# File lib/mongo/mongo_replica_set_client.rb, line 492
def setup(opts)
  # Refresh
  @refresh_mode = opts.delete(:refresh_mode) || false
  @refresh_interval = opts.delete(:refresh_interval) || 90

  if @refresh_mode && @refresh_interval < 60
    @refresh_interval = 60 unless ENV['TEST_MODE'] = 'TRUE'
  end

  if @refresh_mode == :async
    warn ":async refresh mode has been deprecated. Refresh
    mode will be disabled."
  elsif ![:sync, false].include?(@refresh_mode)
    raise MongoArgumentError,
      "Refresh mode must be either :sync or false."
  end

  if opts[:read_secondary]
    warn ":read_secondary options has now been deprecated and will " +
      "be removed in driver v2.0. Use the :read option instead."
    @read_secondary = opts.delete(:read_secondary) || false
  end

  # Replica set name
  if opts[:rs_name]
    warn ":rs_name option has been deprecated and will be removed in v2.0. " +
      "Please use :name instead."
    @replica_set_name = opts.delete(:rs_name)
  else
    @replica_set_name = opts.delete(:name)
  end

  super opts
end
sync_refresh() click to toggle source
# File lib/mongo/mongo_replica_set_client.rb, line 527
def sync_refresh
  if @refresh_mode == :sync &&
    ((Time.now - @last_refresh) > @refresh_interval)
    @last_refresh = Time.now
    refresh
  end
end