class Celluloid::IO::Stream

Base class of all streams in Celluloid::IO

Attributes

sync[RW]

The “sync mode” of the stream

See IO#sync for full details.

Public Class Methods

new(socket) click to toggle source
Calls superclass method Celluloid::IO::Socket.new
# File lib/celluloid/io/stream.rb, line 19
def initialize(socket)
  super
  @eof  = false
  @sync = true
  @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT)
  @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch  = Latch.new
  @write_latch = Latch.new
end

Public Instance Methods

<<(s) click to toggle source

Writes s to the stream. s will be converted to a String using String#to_s.

# File lib/celluloid/io/stream.rb, line 256
def << (s)
  do_write(s)
  self
end
close() click to toggle source

Closes the stream and flushes any unwritten data.

Calls superclass method
# File lib/celluloid/io/stream.rb, line 311
def close
  flush rescue nil
  super
end
each(eol=$/) { |line| ... } click to toggle source

Executes the block for every line in the stream where lines are separated by eol.

See also gets

# File lib/celluloid/io/stream.rb, line 181
def each(eol=$/)
  while line = self.gets(eol)
    yield line
  end
end
Also aliased as: each_line
each_byte() { |byte| ... } click to toggle source

Calls the given block once for each byte in the stream.

# File lib/celluloid/io/stream.rb, line 216
def each_byte # :yields: byte
  while c = getc
    yield(c.ord)
  end
end
each_line(eol=$/)
Alias for: each
eof()
Alias for: eof?
eof?() click to toggle source

Returns true if the stream is at file which means there is no more data to be read.

# File lib/celluloid/io/stream.rb, line 241
def eof?
  fill_rbuff if !@eof && @read_buffer.empty?
  @eof && @read_buffer.empty?
end
Also aliased as: eof
flush() click to toggle source

Flushes buffered data to the stream.

# File lib/celluloid/io/stream.rb, line 301
def flush
  osync = @sync
  @sync = true
  do_write ""
  return self
ensure
  @sync = osync
end
getc() click to toggle source

Reads one character from the stream. Returns nil if called at end of file.

# File lib/celluloid/io/stream.rb, line 211
def getc
  read(1)
end
gets(eol=$/, limit=nil) click to toggle source

Reads the next line from the stream. Lines are separated by eol. If limit is provided the result will not be longer than the given number of bytes.

eol may be a String or Regexp.

Unlike IO#gets the line read will not be assigned to +$_+.

Unlike IO#gets the separator must be provided if a limit is provided.

# File lib/celluloid/io/stream.rb, line 155
def gets(eol=$/, limit=nil)
  idx = @read_buffer.index(eol)

  until @eof
    break if idx
    fill_rbuff
    idx = @read_buffer.index(eol)
  end

  if eol.is_a?(Regexp)
    size = idx ? idx+$&.size : nil
  else
    size = idx ? idx+eol.size : nil
  end

  if limit and limit >= 0
    size = [size, limit].min
  end

  consume_rbuff(size)
end
print(*args) click to toggle source

Writes args to the stream.

See IO#print for full details.

printf(s, *args) click to toggle source

Formats and writes to the stream converting parameters under control of the format string.

See Kernel#sprintf for format string details.

# File lib/celluloid/io/stream.rb, line 295
def printf(s, *args)
  do_write(s % args)
  nil
end
puts(*args) click to toggle source

Writes args to the stream along with a record separator.

See IO#puts for full details.

# File lib/celluloid/io/stream.rb, line 264
def puts(*args)
  s = ""
  if args.empty?
    s << "\n"
  end

  args.each do |arg|
    s << arg.to_s
    if $/ && /\n\z/ !~ s
      s << "\n"
    end
  end

  do_write(s)
  nil
end
read(size=nil, buf=nil) click to toggle source

Reads size bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#read for full details.

# File lib/celluloid/io/stream.rb, line 87
def read(size=nil, buf=nil)
  if size == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  until @eof
    break if size && size <= @read_buffer.size
    fill_rbuff
    break unless size
  end

  ret = consume_rbuff(size) || ""

  if buf
    buf.replace(ret)
    ret = buf
  end

  (size && ret.empty?) ? nil : ret
end
readchar() click to toggle source

Reads a one-character string from the stream. Raises an EOFError at end of file.

# File lib/celluloid/io/stream.rb, line 224
def readchar
  raise EOFError if eof?
  getc
end
readline(eol=$/) click to toggle source

Reads a line from the stream which is separated by eol.

Raises EOFError if at end of file.

# File lib/celluloid/io/stream.rb, line 204
def readline(eol=$/)
  raise EOFError if eof?
  gets(eol)
end
readlines(eol=$/) click to toggle source

Reads lines from the stream which are separated by eol.

See also gets

# File lib/celluloid/io/stream.rb, line 191
def readlines(eol=$/)
  ary = []

  while line = self.gets(eol)
    ary << line
  end

  ary
end
readpartial(maxlen, buf=nil) click to toggle source

Reads at most maxlen bytes from the stream. If buf is provided it must reference a string which will receive the data.

See IO#readpartial for full details.

# File lib/celluloid/io/stream.rb, line 117
def readpartial(maxlen, buf=nil)
  if maxlen == 0
    if buf
      buf.clear
      return buf
    else
      return ""
    end
  end

  if @read_buffer.empty?
    begin
      return sysread(maxlen, buf)
    rescue Errno::EAGAIN
      retry
    end
  end

  ret = consume_rbuff(maxlen)

  if buf
    buf.replace(ret)
    ret = buf
  end

  raise EOFError if ret.empty?
  ret
end
sysread(length = nil, buffer = nil) click to toggle source

System read via the nonblocking subsystem

# File lib/celluloid/io/stream.rb, line 37
def sysread(length = nil, buffer = nil)
  buffer ||= ''.force_encoding(Encoding::ASCII_8BIT)

  @read_latch.synchronize do
    begin
      read_nonblock(length, buffer)
    rescue ::IO::WaitReadable
      wait_readable
      retry
    end
  end

  buffer
end
syswrite(string) click to toggle source

System write via the nonblocking subsystem

# File lib/celluloid/io/stream.rb, line 53
def syswrite(string)
  length = string.length
  total_written = 0

  remaining = string

  @write_latch.synchronize do
    while total_written < length
      begin
        written = write_nonblock(remaining)
      rescue ::IO::WaitWritable
        wait_writable
        retry
      rescue EOFError
        return total_written
      rescue Errno::EAGAIN
        wait_writable
        retry
      end

      total_written += written

      # FIXME: mutating the original buffer here. Seems bad.
      remaining.slice!(0, written) if written < remaining.length
    end
  end

  total_written
end
ungetc(c) click to toggle source

Pushes character c back onto the stream such that a subsequent buffered character read will return it.

Unlike IO#getc multiple bytes may be pushed back onto the stream.

Has no effect on unbuffered reads (such as sysread).

# File lib/celluloid/io/stream.rb, line 235
def ungetc(c)
  @read_buffer[0,0] = c.chr
end
wait_readable() click to toggle source

Wait until the current object is readable

# File lib/celluloid/io/stream.rb, line 31
def wait_readable; Celluloid::IO.wait_readable(self); end
wait_writable() click to toggle source

Wait until the current object is writable

# File lib/celluloid/io/stream.rb, line 34
def wait_writable; Celluloid::IO.wait_writable(self); end
write(s) click to toggle source

Writes s to the stream. If the argument is not a string it will be converted using String#to_s. Returns the number of bytes written.

# File lib/celluloid/io/stream.rb, line 249
def write(s)
  do_write(s)
  s.bytesize
end

Private Instance Methods

consume_rbuff(size=nil) click to toggle source

Consumes size bytes from the buffer

# File lib/celluloid/io/stream.rb, line 332
def consume_rbuff(size=nil)
  if @read_buffer.empty?
    nil
  else
    size = @read_buffer.size unless size
    ret = @read_buffer[0, size]
    @read_buffer[0, size] = ""
    ret
  end
end
do_write(s) click to toggle source

Writes s to the buffer. When the buffer is full or sync is true the buffer is flushed to the underlying stream.

# File lib/celluloid/io/stream.rb, line 345
def do_write(s)
  @write_buffer << s
  @write_buffer.force_encoding(Encoding::BINARY)

  if @sync or @write_buffer.size > BLOCK_SIZE or idx = @write_buffer.rindex($/)
    remain = idx ? idx + $/.size : @write_buffer.length
    nwritten = 0

    while remain > 0
      str = @write_buffer[nwritten,remain]
      begin
        nwrote = syswrite(str)
      rescue Errno::EAGAIN
        retry
      end
      remain -= nwrote
      nwritten += nwrote
    end

    @write_buffer[0,nwritten] = ""
  end
end
fill_rbuff() click to toggle source

Fills the buffer from the underlying stream

# File lib/celluloid/io/stream.rb, line 321
def fill_rbuff
  begin
    @read_buffer << sysread(BLOCK_SIZE)
  rescue Errno::EAGAIN
    retry
  rescue EOFError
    @eof = true
  end
end