class Celluloid::IO::Stream
Base class of all streams in Celluloid::IO
Attributes
The “sync mode” of the stream
See IO#sync for full details.
Public Class Methods
# File lib/celluloid/io/stream.rb, line 19 def initialize(socket) super @eof = false @sync = true @read_buffer = ''.force_encoding(Encoding::ASCII_8BIT) @write_buffer = ''.force_encoding(Encoding::ASCII_8BIT) @read_latch = Latch.new @write_latch = Latch.new end
Public Instance Methods
Writes s
to the stream. s
will be converted to a
String using String#to_s.
# File lib/celluloid/io/stream.rb, line 256 def << (s) do_write(s) self end
Closes the stream and flushes any unwritten data.
# File lib/celluloid/io/stream.rb, line 311 def close flush rescue nil super end
Executes the block for every line in the stream where lines are separated
by eol
.
See also gets
# File lib/celluloid/io/stream.rb, line 181 def each(eol=$/) while line = self.gets(eol) yield line end end
Calls the given block once for each byte in the stream.
# File lib/celluloid/io/stream.rb, line 216 def each_byte # :yields: byte while c = getc yield(c.ord) end end
Returns true if the stream is at file which means there is no more data to be read.
# File lib/celluloid/io/stream.rb, line 241 def eof? fill_rbuff if !@eof && @read_buffer.empty? @eof && @read_buffer.empty? end
Flushes buffered data to the stream.
# File lib/celluloid/io/stream.rb, line 301 def flush osync = @sync @sync = true do_write "" return self ensure @sync = osync end
Reads one character from the stream. Returns nil if called at end of file.
# File lib/celluloid/io/stream.rb, line 211 def getc read(1) end
Reads the next line from the stream. Lines are separated by
eol
. If limit
is provided the result will not be
longer than the given number of bytes.
eol
may be a String or Regexp.
Unlike IO#gets the line read will not be assigned to +$_+.
Unlike IO#gets the separator must be provided if a limit is provided.
# File lib/celluloid/io/stream.rb, line 155 def gets(eol=$/, limit=nil) idx = @read_buffer.index(eol) until @eof break if idx fill_rbuff idx = @read_buffer.index(eol) end if eol.is_a?(Regexp) size = idx ? idx+$&.size : nil else size = idx ? idx+eol.size : nil end if limit and limit >= 0 size = [size, limit].min end consume_rbuff(size) end
Writes args
to the stream.
See IO#print for full details.
# File lib/celluloid/io/stream.rb, line 284 def print(*args) s = "" args.each { |arg| s << arg.to_s } do_write(s) nil end
Formats and writes to the stream converting parameters under control of the format string.
See Kernel#sprintf for format string details.
# File lib/celluloid/io/stream.rb, line 295 def printf(s, *args) do_write(s % args) nil end
Writes args
to the stream along with a record separator.
See IO#puts for full details.
# File lib/celluloid/io/stream.rb, line 264 def puts(*args) s = "" if args.empty? s << "\n" end args.each do |arg| s << arg.to_s if $/ && /\n\z/ !~ s s << "\n" end end do_write(s) nil end
Reads size
bytes from the stream. If buf
is
provided it must reference a string which will receive the data.
See IO#read for full details.
# File lib/celluloid/io/stream.rb, line 87 def read(size=nil, buf=nil) if size == 0 if buf buf.clear return buf else return "" end end until @eof break if size && size <= @read_buffer.size fill_rbuff break unless size end ret = consume_rbuff(size) || "" if buf buf.replace(ret) ret = buf end (size && ret.empty?) ? nil : ret end
Reads a one-character string from the stream. Raises an EOFError at end of file.
# File lib/celluloid/io/stream.rb, line 224 def readchar raise EOFError if eof? getc end
Reads a line from the stream which is separated by eol
.
Raises EOFError if at end of file.
# File lib/celluloid/io/stream.rb, line 204 def readline(eol=$/) raise EOFError if eof? gets(eol) end
Reads lines from the stream which are separated by eol
.
See also gets
# File lib/celluloid/io/stream.rb, line 191 def readlines(eol=$/) ary = [] while line = self.gets(eol) ary << line end ary end
Reads at most maxlen
bytes from the stream. If
buf
is provided it must reference a string which will receive
the data.
See IO#readpartial for full details.
# File lib/celluloid/io/stream.rb, line 117 def readpartial(maxlen, buf=nil) if maxlen == 0 if buf buf.clear return buf else return "" end end if @read_buffer.empty? begin return sysread(maxlen, buf) rescue Errno::EAGAIN retry end end ret = consume_rbuff(maxlen) if buf buf.replace(ret) ret = buf end raise EOFError if ret.empty? ret end
System read via the nonblocking subsystem
# File lib/celluloid/io/stream.rb, line 37 def sysread(length = nil, buffer = nil) buffer ||= ''.force_encoding(Encoding::ASCII_8BIT) @read_latch.synchronize do begin read_nonblock(length, buffer) rescue ::IO::WaitReadable wait_readable retry end end buffer end
System write via the nonblocking subsystem
# File lib/celluloid/io/stream.rb, line 53 def syswrite(string) length = string.length total_written = 0 remaining = string @write_latch.synchronize do while total_written < length begin written = write_nonblock(remaining) rescue ::IO::WaitWritable wait_writable retry rescue EOFError return total_written rescue Errno::EAGAIN wait_writable retry end total_written += written # FIXME: mutating the original buffer here. Seems bad. remaining.slice!(0, written) if written < remaining.length end end total_written end
Pushes character c
back onto the stream such that a subsequent
buffered character read will return it.
Unlike IO#getc multiple bytes may be pushed back onto the stream.
Has no effect on unbuffered reads (such as sysread).
# File lib/celluloid/io/stream.rb, line 235 def ungetc(c) @read_buffer[0,0] = c.chr end
Wait until the current object is readable
# File lib/celluloid/io/stream.rb, line 31 def wait_readable; Celluloid::IO.wait_readable(self); end
Wait until the current object is writable
# File lib/celluloid/io/stream.rb, line 34 def wait_writable; Celluloid::IO.wait_writable(self); end
Writes s
to the stream. If the argument is not a string it
will be converted using String#to_s. Returns the number of bytes written.
# File lib/celluloid/io/stream.rb, line 249 def write(s) do_write(s) s.bytesize end
Private Instance Methods
Consumes size
bytes from the buffer
# File lib/celluloid/io/stream.rb, line 332 def consume_rbuff(size=nil) if @read_buffer.empty? nil else size = @read_buffer.size unless size ret = @read_buffer[0, size] @read_buffer[0, size] = "" ret end end
Writes s
to the buffer. When the buffer is full or sync is true the buffer is flushed
to the underlying stream.
# File lib/celluloid/io/stream.rb, line 345 def do_write(s) @write_buffer << s @write_buffer.force_encoding(Encoding::BINARY) if @sync or @write_buffer.size > BLOCK_SIZE or idx = @write_buffer.rindex($/) remain = idx ? idx + $/.size : @write_buffer.length nwritten = 0 while remain > 0 str = @write_buffer[nwritten,remain] begin nwrote = syswrite(str) rescue Errno::EAGAIN retry end remain -= nwrote nwritten += nwrote end @write_buffer[0,nwritten] = "" end end
Fills the buffer from the underlying stream
# File lib/celluloid/io/stream.rb, line 321 def fill_rbuff begin @read_buffer << sysread(BLOCK_SIZE) rescue Errno::EAGAIN retry rescue EOFError @eof = true end end