Parent

Class/Module Index [+]

Quicksearch

RR::Replicators::TwoWayReplicator

This replicator implements a two way replication. Options:

Example of using a Proc object for custom behaviour:

lambda do |rep_helper, diff|
  # if specified as replication_conflict_handling option, logs all
  # conflicts to a text file
  File.open('/var/log/rubyrep_conflict_log', 'a') do |f|
    f.puts <<-end_str
      #{Time.now}: conflict
      * in table #{diff.changes[:left].table}
      * for record '#{diff.changes[:left].key}'
      * change type in left db: '#{diff.changes[:left].type}'
      * change type in right db: '#{diff.changes[:right].type}'
    end_str
  end
end

Constants

CONFLICT_STATE_MATRIX

Specifies how to clear conflicts. The outer hash keys describe the type of the winning change. The inner hash keys describe the type of the loosing change. The inser hash values describe the action to take on the loosing side.

MAX_REPLICATION_ATTEMPTS

How often a replication will be attempted (in case it fails because the record in question was removed from the source or inserted into the target database after the ReplicationDifference was loaded

OTHER_SIDE

Shortcut to calculate the “other” database.

Attributes

rep_helper[RW]

The current ReplicationHelper object

Public Class Methods

default_options() click to toggle source

Provides default option for the replicator. Optional. Returns a hash with key => value pairs.

# File lib/rubyrep/replicators/two_way_replicator.rb, line 65
def self.default_options
  {
    :left_change_handling => :replicate,
    :right_change_handling => :replicate,
    :replication_conflict_handling => :ignore,
    :logged_replication_events => [:ignored_conflicts],
  }
end
new(rep_helper) click to toggle source

Initializes the TwoWayReplicator Raises an ArgumentError if any of the replication options is invalid.

Parameters:

# File lib/rubyrep/replicators/two_way_replicator.rb, line 132
def initialize(rep_helper)
  self.rep_helper = rep_helper

  validate_change_handling_options
  validate_conflict_handling_options
  validate_logging_options
end

Public Instance Methods

attempt_change(action, source_db, target_db, diff, remaining_attempts) click to toggle source

Helper for execution of insert / update / delete attempts. Wraps those attempts into savepoints and handles exceptions.

Note: Savepoints have to be used for PostgreSQL (as a failed SQL statement will otherwise invalidate the complete transaction.)

  • action: short description of change (e. g.: "update" or "delete")

  • source_db: either :left or :right - source database of replication

  • target_db: either :left or :right - target database of replication

  • diff: the current ReplicationDifference instance

  • remaining_attempts: the number of remaining replication attempts for this difference

# File lib/rubyrep/replicators/two_way_replicator.rb, line 272
def attempt_change(action, source_db, target_db, diff, remaining_attempts)
  begin
    rep_helper.session.send(target_db).execute "savepoint rr_#{action}_#{remaining_attempts}"
    yield
    unless rep_helper.new_transaction?
      rep_helper.session.send(target_db).execute "release savepoint rr_#{action}_#{remaining_attempts}"
    end
  rescue Exception => e
    rep_helper.session.send(target_db).execute "rollback to savepoint rr_#{action}_#{remaining_attempts}"
    diff.amend
    replicate_difference diff, remaining_attempts - 1,
      "#{action} failed with #{e.message}"
  end
end
attempt_delete(source_db, diff, remaining_attempts, target_key) click to toggle source

Attempts to delete the source record from the target database.

    1. if +source_db is :left, then the record is deleted in database

:right.

  • source_db: either :left or :right - source database of replication

  • diff: the current ReplicationDifference instance

  • remaining_attempts: the number of remaining replication attempts for this difference

  • target_key: a column_name => value hash identifying the source record

# File lib/rubyrep/replicators/two_way_replicator.rb, line 294
def attempt_delete(source_db, diff, remaining_attempts, target_key)
  change = diff.changes[source_db]
  target_db = OTHER_SIDE[source_db]
  target_table = rep_helper.corresponding_table(source_db, change.table)

  attempt_change('delete', source_db, target_db, diff, remaining_attempts) do
    number_updated = rep_helper.delete_record target_db, target_table, target_key
    if number_updated == 0
      diff.amend
      replicate_difference diff, remaining_attempts - 1, "target record for delete vanished"
    else
      log_replication_outcome source_db, diff
    end
  end
end
attempt_insert(source_db, diff, remaining_attempts, source_key) click to toggle source

Attempts to read the specified record from the source database and insert it into the target database. Retries if insert fails due to missing source or suddenly existing target record.

  • source_db: either :left or :right - source database of replication

  • diff: the current ReplicationDifference instance

  • remaining_attempts: the number of remaining replication attempts for this difference

  • source_key: a column_name => value hash identifying the source record

# File lib/rubyrep/replicators/two_way_replicator.rb, line 211
def attempt_insert(source_db, diff, remaining_attempts, source_key)
  source_change = diff.changes[source_db]
  source_table = source_change.table
  target_db = OTHER_SIDE[source_db]
  target_table = rep_helper.corresponding_table(source_db, source_table)

  values = rep_helper.load_record source_db, source_table, source_key
  if values == nil
    diff.amend
    replicate_difference diff, remaining_attempts - 1, "source record for insert vanished"
  else
    attempt_change('insert', source_db, target_db, diff, remaining_attempts) do
      rep_helper.insert_record target_db, target_table, values
      log_replication_outcome source_db, diff
    end
  end
end
attempt_update(source_db, diff, remaining_attempts, source_key, target_key) click to toggle source

Attempts to read the specified record from the source database and update the specified record in the target database. Retries if update fails due to missing source

  • source_db: either :left or :right - source database of replication

  • diff: the current ReplicationDifference instance

  • remaining_attempts: the number of remaining replication attempts for this difference

  • source_key: a column_name => value hash identifying the source record

  • target_key: a column_name => value hash identifying the source record

# File lib/rubyrep/replicators/two_way_replicator.rb, line 237
def attempt_update(source_db, diff, remaining_attempts, source_key, target_key)
  source_change = diff.changes[source_db]
  source_table = source_change.table
  target_db = OTHER_SIDE[source_db]
  target_table = rep_helper.corresponding_table(source_db, source_table)

  values = rep_helper.load_record source_db, source_table, source_key
  if values == nil
    diff.amend
    replicate_difference diff, remaining_attempts - 1, "source record for update vanished"
  else
    attempt_change('update', source_db, target_db, diff, remaining_attempts) do
      number_updated = rep_helper.update_record target_db, target_table, values, target_key
      if number_updated == 0
        diff.amend
        replicate_difference diff, remaining_attempts - 1, "target record for update vanished"
      else
        log_replication_outcome source_db, diff
      end
    end
  end
end
clear_conflict(source_db, diff, remaining_attempts) click to toggle source

Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.

  • source_db: the winning database (either :left or :right)

  • diff: the ReplicationDifference instance

  • remaining_attempts: the number of remaining replication attempts for this difference

# File lib/rubyrep/replicators/two_way_replicator.rb, line 161
def clear_conflict(source_db, diff, remaining_attempts)
  source_change = diff.changes[source_db]
  target_db = OTHER_SIDE[source_db]
  target_change = diff.changes[target_db]

  target_action = CONFLICT_STATE_MATRIX[source_change.type][target_change.type]
  source_key = source_change.type == :update ? source_change.new_key : source_change.key
  target_key = target_change.type == :update ? target_change.new_key : target_change.key
  case target_action
  when :insert
    attempt_insert source_db, diff, remaining_attempts, source_key
  when :update
    attempt_update source_db, diff, remaining_attempts, source_key, target_key
  when :delete
    attempt_delete source_db, diff, remaining_attempts, target_key
  end
end
log_replication_outcome(winner, diff) click to toggle source

Logs replication of the specified difference as per configured :replication_conflict_logging / :left_change_logging / :right_change_logging options.

  • winner: Either the winner database (:left or :right) or :ignore

  • diff: the ReplicationDifference instance

# File lib/rubyrep/replicators/two_way_replicator.rb, line 183
def log_replication_outcome(winner, diff)
  options = rep_helper.options_for_table(diff.changes[:left].table)
  option_values = [options[:logged_replication_events]].flatten # make sure I have an array
  if diff.type == :conflict
    return unless option_values.include?(:all_conflicts) or option_values.include?(:ignored_conflicts)
    return if winner != :ignore and not option_values.include?(:all_conflicts)
    outcome = {:left => 'left_won', :right => 'right_won', :ignore => 'ignored'}[winner]
  else
    return unless option_values.include?(:all_changes) or option_values.include?(:ignored_changes)
    return if winner != :ignore and not option_values.include?(:all_changes)
    outcome = winner == :ignore ? 'ignored' : 'replicated'
  end
  rep_helper.log_replication_outcome diff, outcome
end
replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) click to toggle source

Called to replicate the specified difference.

  • :diff: ReplicationDifference instance

  • :remaining_attempts: how many more times a replication will be attempted

  • :previous_failure_description: why the previous replication attempt failed

# File lib/rubyrep/replicators/two_way_replicator.rb, line 314
def replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil)
  raise Exception, previous_failure_description || "max replication attempts exceeded" if remaining_attempts == 0
  options = rep_helper.options_for_table(diff.changes[:left].table)
  if diff.type == :left or diff.type == :right
    key = diff.type == :left ? :left_change_handling : :right_change_handling
    option = options[key]

    if option == :ignore
      log_replication_outcome :ignore, diff
    elsif option == :replicate
      source_db = diff.type

      change = diff.changes[source_db]

      case change.type
      when :insert
        attempt_insert source_db, diff, remaining_attempts, change.key
      when :update
        attempt_update source_db, diff, remaining_attempts, change.new_key, change.key
      when :delete
        attempt_delete source_db, diff, remaining_attempts, change.key
      end
    else # option must be a Proc
      option.call rep_helper, diff
    end
  elsif diff.type == :conflict
    option = options[:replication_conflict_handling]
    if option == :ignore
      log_replication_outcome :ignore, diff
    elsif option == :left_wins
      clear_conflict :left, diff, remaining_attempts
    elsif option == :right_wins
      clear_conflict :right, diff, remaining_attempts
    elsif option == :later_wins
      winner_db = diff.changes[:left].last_changed_at >= diff.changes[:right].last_changed_at ? :left : :right
      clear_conflict winner_db, diff, remaining_attempts
    elsif option == :earlier_wins
      winner_db = diff.changes[:left].last_changed_at <= diff.changes[:right].last_changed_at ? :left : :right
      clear_conflict winner_db, diff, remaining_attempts
    else # option must be a Proc
      option.call rep_helper, diff
    end
  end
end
validate_change_handling_options() click to toggle source

Verifies if the :left_change_handling / :right_change_handling options are valid. Raises an ArgumentError if an option is invalid

# File lib/rubyrep/replicators/two_way_replicator.rb, line 91
def validate_change_handling_options
  [:left_change_handling, :right_change_handling].each do |key|
    rep_helper.session.configuration.each_matching_option(key) do |table_spec, value|
      unless value.respond_to? :call
        verify_option table_spec, [:ignore, :replicate], key, value
      end
    end
  end
end
validate_conflict_handling_options() click to toggle source

Verifies if the given :replication_conflict_handling options are valid. Raises an ArgumentError if an option is invalid.

# File lib/rubyrep/replicators/two_way_replicator.rb, line 103
def validate_conflict_handling_options
  rep_helper.session.configuration.each_matching_option(:replication_conflict_handling) do |table_spec, value|
    unless value.respond_to? :call
      verify_option table_spec,
        [:ignore, :left_wins, :right_wins, :later_wins, :earlier_wins],
        :replication_conflict_handling, value
    end
  end
end
validate_logging_options() click to toggle source

Verifies if the given :replication_logging option /options is / are valid. Raises an ArgumentError if invalid

# File lib/rubyrep/replicators/two_way_replicator.rb, line 115
def validate_logging_options
  rep_helper.session.configuration.each_matching_option(:logged_replication_events) do |table_spec, values|
    values = [values].flatten # ensure that I have an array
    values.each do |value|
      verify_option table_spec,
        [:ignored_changes, :all_changes, :ignored_conflicts, :all_conflicts],
        :logged_replication_events, value
    end
  end
end
verify_option(table_spec, valid_option_values, option_key, option_value) click to toggle source

Checks if an option is configured correctly. Raises an ArgumentError if not.

  • table_spec: the table specification to which the option belongs. May be nil.

  • valid_option_values: array of valid option values

  • option_key: the key of the option that is to be checked

  • option_value: the value of the option that is to be checked

# File lib/rubyrep/replicators/two_way_replicator.rb, line 79
def verify_option(table_spec, valid_option_values, option_key, option_value)
  unless valid_option_values.include? option_value
    message = ""
    message << "#{table_spec.inspect}: " if table_spec
    message << "#{option_value.inspect} not a valid #{option_key.inspect} option"
    raise ArgumentError.new(message)
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.