diff --git a/README.md b/README.md index bf53ab4..aef9f2b 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Networking usually means pushing lots of bytes around and in Ruby it's easy to m The [examples](https://github.com/iconara/ione/tree/master/examples) directory has some examples of what you can do with Ione, for example: * [redis_client](https://github.com/iconara/ione/tree/master/examples/redis_client) is a more or less full featured Redis client that uses most of Ione's features. -* [http_client](https://github.com/iconara/ione/tree/master/examples/http_client) is a simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also shows how to make TLS connections. +* [http_client](https://github.com/iconara/ione/tree/master/examples/http_client) is a simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also shows how to make TLS connections, and how to provide a thread pool for callbacks. * [cql-rb](https://github.com/iconara/cql-rb) is a high performance Cassandra driver and where Ione was originally developed. * [cassandra-driver](https://github.com/datastax/ruby-driver) is the successor to cql-rb. * [ione-rpc](https://github.com/iconara/ione-rpc) is a RPC framework built on Ione. It makes it reasonably easy to build networked applications without having to reinvent the wheel. diff --git a/examples/http_client/README.md b/examples/http_client/README.md index 2fe8399..89560b1 100644 --- a/examples/http_client/README.md +++ b/examples/http_client/README.md @@ -1,3 +1,5 @@ # Ione HTTP client example A simplistic HTTP client that uses Ione and [http_parser.rb](http://rubygems.org/gems/http_parser.rb) to make HTTP GET request. It also supports HTTPS. + +This example also uses a thread pool to avoid blocking the reactor when the HTTP response is parsed. It's purpose is to move the protocol processing off of the reactor thread, not to parallelize it, and that means that a simple single-threaded implementation is sufficient. diff --git a/examples/http_client/lib/ione/http_client.rb b/examples/http_client/lib/ione/http_client.rb index 75b21b3..42abf93 100644 --- a/examples/http_client/lib/ione/http_client.rb +++ b/examples/http_client/lib/ione/http_client.rb @@ -3,12 +3,14 @@ require 'ione' require 'http_parser' require 'uri' +require 'thread' module Ione class HttpClient def initialize(cert_store=nil) - @reactor = Io::IoReactor.new + @thread_pool = SingleThreadPool.new + @reactor = Io::IoReactor.new(thread_pool: @thread_pool) if cert_store @cert_store = cert_store else @@ -18,11 +20,11 @@ def initialize(cert_store=nil) end def start - @reactor.start.map(self) + @thread_pool.start.then { @reactor.start }.map(self) end def stop - @reactor.stop.map(self) + @reactor.stop.then { @thread_pool.stop }.map(self) end def get(url, headers={}) @@ -97,4 +99,41 @@ def initialize(status, headers, body) @body = body end end + + class SingleThreadPool + StoppedError = Class.new(StandardError) + + def initialize + @queue = Queue.new + @stopped_promise = Promise.new + end + + def submit(&task) + if @stopped + Future.failed(StoppedError.new('Thread pool stopped')) + else + promise = Promise.new + @queue << [task, promise] + promise.future + end + end + + def start + @thread = Thread.start do + until (job = @queue.pop) == :die + task, promise = job + promise.try(&task) + end + @stopped_promise.fulfill + end + Future.resolved + end + + def stop + @stopped = true + @queue.clear + @queue << :die + @stopped_promise.future + end + end end diff --git a/lib/ione.rb b/lib/ione.rb index 98d0b58..b067152 100644 --- a/lib/ione.rb +++ b/lib/ione.rb @@ -5,4 +5,5 @@ module Ione require 'ione/future' require 'ione/byte_buffer' +require 'ione/thread_pool' require 'ione/io' diff --git a/lib/ione/io/acceptor.rb b/lib/ione/io/acceptor.rb index ff96109..1ec6641 100644 --- a/lib/ione/io/acceptor.rb +++ b/lib/ione/io/acceptor.rb @@ -16,12 +16,13 @@ class Acceptor attr_reader :backlog # @private - def initialize(host, port, backlog, unblocker, reactor, socket_impl=nil) + def initialize(host, port, backlog, unblocker, thread_pool, reactor, socket_impl=nil) @host = host @port = port @backlog = backlog @unblocker = unblocker @reactor = reactor + @thread_pool = thread_pool @socket_impl = socket_impl || ServerSocket @accept_listeners = [] @lock = Mutex.new @@ -30,6 +31,15 @@ def initialize(host, port, backlog, unblocker, reactor, socket_impl=nil) # Register a listener to be notified when client connections are accepted # + # It is very important that you don't do any heavy lifting in the callback + # since it by default is called from the IO reactor thread, and as long as + # the callback is working the reactor can't handle any IO and no other + # callbacks can be called. However, if you have provided a thread pool to + # your reactor then each call to the callback will be submitted to that + # pool and you're free to do as much work as you want. + # + # Errors raised by the callback will be ignored. + # # @yieldparam [Ione::Io::ServerConnection] the connection to the client def on_accept(&listener) @lock.synchronize do @@ -107,7 +117,7 @@ def writable? # @private def read client_socket, host, port = accept - connection = ServerConnection.new(client_socket, host, port, @unblocker) + connection = ServerConnection.new(client_socket, host, port, @unblocker, @thread_pool) @reactor.accept(connection) notify_accept_listeners(connection) end @@ -135,7 +145,11 @@ def accept def notify_accept_listeners(connection) listeners = @lock.synchronize { @accept_listeners } - listeners.each { |l| l.call(connection) rescue nil } + listeners.each do |listener| + @thread_pool.submit do + listener.call(connection) + end + end end end end diff --git a/lib/ione/io/base_connection.rb b/lib/ione/io/base_connection.rb index da65f2e..cdc29d6 100644 --- a/lib/ione/io/base_connection.rb +++ b/lib/ione/io/base_connection.rb @@ -12,10 +12,11 @@ class BaseConnection attr_reader :host, :port # @private - def initialize(host, port, unblocker) + def initialize(host, port, unblocker, thread_pool) @host = host @port = port @unblocker = unblocker + @thread_pool = thread_pool @state = CONNECTING_STATE @writable = false @lock = Mutex.new @@ -111,9 +112,11 @@ def writable? # yourself in your protocol handler. # # It is very important that you don't do any heavy lifting in the callback - # since it is called from the IO reactor thread, and as long as the - # callback is working the reactor can't handle any IO and no other - # callbacks can be called. + # since it by default is called from the IO reactor thread, and as long as + # the callback is working the reactor can't handle any IO and no other + # callbacks can be called. However, if you have provided a thread pool to + # your reactor then each call to the callback will be submitted to that + # pool and you're free to do as much work as you want. # # Errors raised by the callback will be ignored. # @@ -185,7 +188,11 @@ def flush # @private def read new_data = @io.read_nonblock(65536) - @data_listener.call(new_data) if @data_listener + if @data_listener + @thread_pool.submit do + @data_listener.call(new_data) + end + end rescue => e close(e) end diff --git a/lib/ione/io/connection.rb b/lib/ione/io/connection.rb index 28b4cb1..49818b5 100644 --- a/lib/ione/io/connection.rb +++ b/lib/ione/io/connection.rb @@ -9,8 +9,8 @@ class Connection < BaseConnection attr_reader :connection_timeout # @private - def initialize(host, port, connection_timeout, unblocker, clock, socket_impl=Socket) - super(host, port, unblocker) + def initialize(host, port, connection_timeout, unblocker, thread_pool, clock, socket_impl=Socket) + super(host, port, unblocker, thread_pool) @connection_timeout = connection_timeout @clock = clock @socket_impl = socket_impl diff --git a/lib/ione/io/io_reactor.rb b/lib/ione/io/io_reactor.rb index 34c7358..476b406 100644 --- a/lib/ione/io/io_reactor.rb +++ b/lib/ione/io/io_reactor.rb @@ -20,6 +20,21 @@ module Io # run in the reactor thread, and every cycle you use there is a cycle which # can't be used to handle IO. # + # You can provide the reactor with a thread pool to use when callbacks. This + # way you don't have to worry about the protocol parsing blocking the + # reactor. A thread pool is any object that responds to `#submit`, takes a + # block and returns a future which resolves to the value of calling the + # block. It's completely up to the implementation when and how the block is + # called. The default implementation simply calls the block immediately. + # + # The callbacks that are called in the thread pool are `Connection#on_data`, + # `ServerConnection#on_data` ({BaseConnection#on_data}) and + # {Acceptor#on_accept}. + # + # If you provide a thread pool with more than one thread multiple chunks + # from the same connection can be processed in parallel, and it's up to you + # to serialize the processing per connection. + # # The IO reactor is completely protocol agnostic, and it's up to you to # create objects that can interpret the bytes received from remote hosts, # and to send the correct commands back. The way this works is that when you @@ -89,9 +104,12 @@ class IoReactor # Initializes a new IO reactor. # - # @param options [Hash] only used to inject behaviour during tests + # @param options [Hash] + # @option options [#submit] :thread_pool (nil) a thread pool which will + # be used for (some) callbacks def initialize(options={}) - @options = options + @options = options.dup + @thread_pool = options.delete(:thread_pool) || NULL_THREAD_POOL @clock = options[:clock] || Time @state = PENDING_STATE @error_listeners = [] @@ -238,14 +256,14 @@ def connect(host, port, options=nil, &block) timeout = options[:timeout] || 5 ssl = options[:ssl] end - connection = Connection.new(host, port, timeout, @unblocker, @clock) + connection = Connection.new(host, port, timeout, @unblocker, @thread_pool, @clock) f = connection.connect @io_loop.add_socket(connection) @unblocker.unblock if running? if ssl f = f.flat_map do ssl_context = ssl == true ? nil : ssl - upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, ssl_context) + upgraded_connection = SslConnection.new(host, port, connection.to_io, @unblocker, @thread_pool, ssl_context) ff = upgraded_connection.connect @io_loop.remove_socket(connection) @io_loop.add_socket(upgraded_connection) @@ -320,9 +338,9 @@ def bind(host, port, options=nil, &block) ssl_context = options[:ssl] end if ssl_context - server = SslAcceptor.new(host, port, backlog, @unblocker, self, ssl_context) + server = SslAcceptor.new(host, port, backlog, @unblocker, @thread_pool, self, ssl_context) else - server = Acceptor.new(host, port, backlog, @unblocker, self) + server = Acceptor.new(host, port, backlog, @unblocker, @thread_pool, self) end f = server.bind @io_loop.add_socket(server) diff --git a/lib/ione/io/server_connection.rb b/lib/ione/io/server_connection.rb index aab8e51..35161d4 100644 --- a/lib/ione/io/server_connection.rb +++ b/lib/ione/io/server_connection.rb @@ -5,8 +5,8 @@ module Io # @since v1.1.0 class ServerConnection < BaseConnection # @private - def initialize(socket, host, port, unblocker) - super(host, port, unblocker) + def initialize(socket, host, port, unblocker, thread_pool) + super(host, port, unblocker, thread_pool) @io = socket @state = CONNECTED_STATE end diff --git a/lib/ione/io/ssl_acceptor.rb b/lib/ione/io/ssl_acceptor.rb index 6fac235..b7b3e51 100644 --- a/lib/ione/io/ssl_acceptor.rb +++ b/lib/ione/io/ssl_acceptor.rb @@ -5,15 +5,15 @@ module Ione module Io # @private class SslAcceptor < Acceptor - def initialize(host, port, backlog, unblocker, reactor, ssl_context, socket_impl=nil, ssl_socket_impl=nil) - super(host, port, backlog, unblocker, reactor, socket_impl) + def initialize(host, port, backlog, unblocker, thread_pool, reactor, ssl_context, socket_impl=nil, ssl_socket_impl=nil) + super(host, port, backlog, unblocker, thread_pool, reactor, socket_impl) @ssl_context = ssl_context @ssl_socket_impl = ssl_socket_impl end def read client_socket, host, port = accept - connection = SslServerConnection.new(client_socket, host, port, @unblocker, @ssl_context, method(:notify_accept_listeners), @ssl_socket_impl) + connection = SslServerConnection.new(client_socket, host, port, @unblocker, @thread_pool, @ssl_context, method(:notify_accept_listeners), @ssl_socket_impl) @reactor.accept(connection) end end diff --git a/lib/ione/io/ssl_connection.rb b/lib/ione/io/ssl_connection.rb index 5b89009..ef91175 100644 --- a/lib/ione/io/ssl_connection.rb +++ b/lib/ione/io/ssl_connection.rb @@ -7,8 +7,8 @@ module Ione module Io # @private class SslConnection < BaseConnection - def initialize(host, port, io, unblocker, ssl_context=nil, socket_impl=OpenSSL::SSL::SSLSocket) - super(host, port, unblocker) + def initialize(host, port, io, unblocker, thread_pool, ssl_context=nil, socket_impl=OpenSSL::SSL::SSLSocket) + super(host, port, unblocker, thread_pool) @socket_impl = socket_impl @ssl_context = ssl_context @raw_io = io diff --git a/lib/ione/io/ssl_server_connection.rb b/lib/ione/io/ssl_server_connection.rb index 72a6b0d..d7574c2 100644 --- a/lib/ione/io/ssl_server_connection.rb +++ b/lib/ione/io/ssl_server_connection.rb @@ -7,8 +7,8 @@ class SslServerConnection < ServerConnection ACCEPTING_STATE = 0 ESTABLISHED_STATE = 1 - def initialize(socket, host, port, unblocker, ssl_context, accept_callback, ssl_socket_impl=nil) - super(socket, host, port, unblocker) + def initialize(socket, host, port, unblocker, thread_pool, ssl_context, accept_callback, ssl_socket_impl=nil) + super(socket, host, port, unblocker, thread_pool) @ssl_context = ssl_context @accept_callback = accept_callback @ssl_socket_impl = ssl_socket_impl || OpenSSL::SSL::SSLSocket diff --git a/lib/ione/thread_pool.rb b/lib/ione/thread_pool.rb new file mode 100644 index 0000000..ee5dfa0 --- /dev/null +++ b/lib/ione/thread_pool.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +module Ione + # A null implementation of a thread pool whose {#submit} calls the given block + # immediately and returns a future resolved with its value. + # + # @private + class NullThreadPool + # @return [Ione::Future] a future that resolves to the value of the given block + def submit(&task) + Future.resolved(task.call) + rescue => e + Future.failed(e) + end + end + + NULL_THREAD_POOL = NullThreadPool.new +end diff --git a/spec/ione/io/acceptor_spec.rb b/spec/ione/io/acceptor_spec.rb index 43a53b9..3effcf5 100644 --- a/spec/ione/io/acceptor_spec.rb +++ b/spec/ione/io/acceptor_spec.rb @@ -7,7 +7,7 @@ module Ione module Io describe Acceptor do let :acceptor do - described_class.new('example.com', 4321, backlog, unblocker, reactor, socket_impl) + described_class.new('example.com', 4321, backlog, unblocker, thread_pool, reactor, socket_impl) end let :backlog do @@ -18,6 +18,10 @@ module Io double(:unblocker) end + let :thread_pool do + FakeThreadPool.new(true) + end + let :reactor do double(:reactor) end @@ -202,6 +206,21 @@ module Io received_connection2.port.should == 3333 end + it 'calls the accept listeners in the provided thread pool' do + thread_pool.auto_run = false + called1 = false + called2 = false + acceptor.on_accept { |c| called1 = true } + acceptor.on_accept { |c| called2 = true } + acceptor.bind + acceptor.read + called1.should be_false + called2.should be_false + thread_pool.run_all + called1.should be_true + called2.should be_true + end + it 'ignores exceptions raised by the connection callback' do called = false acceptor.on_accept { |c| raise 'bork!' } diff --git a/spec/ione/io/connection_common.rb b/spec/ione/io/connection_common.rb index 45fd2df..9458965 100644 --- a/spec/ione/io/connection_common.rb +++ b/spec/ione/io/connection_common.rb @@ -254,6 +254,17 @@ data.should == 'foo bar' end + it 'calls the data listener in the provided thread pool' do + thread_pool.auto_run = false + socket.should_receive(:read_nonblock).with(instance_of(Fixnum)).and_return('foo bar') + data = nil + handler.on_data { |d| data = d } + handler.read + data.should be_nil + thread_pool.run_all + data.should eq('foo bar') + end + context 'when #read_nonblock raises an error' do before do socket.stub(:close) diff --git a/spec/ione/io/connection_spec.rb b/spec/ione/io/connection_spec.rb index 70e07d5..f3247d6 100644 --- a/spec/ione/io/connection_spec.rb +++ b/spec/ione/io/connection_spec.rb @@ -8,7 +8,7 @@ module Ione module Io describe Connection do let :handler do - described_class.new('example.com', 55555, 5, unblocker, clock, socket_impl) + described_class.new('example.com', 55555, 5, unblocker, thread_pool, clock, socket_impl) end let :unblocker do @@ -27,6 +27,10 @@ module Io double(:socket) end + let :thread_pool do + FakeThreadPool.new + end + before do socket_impl.stub(:getaddrinfo) .with('example.com', 55555, nil, Socket::SOCK_STREAM) @@ -44,6 +48,10 @@ module Io socket.stub(:close) end + before do + thread_pool.auto_run = true + end + it_behaves_like 'a connection' do before do handler.connect diff --git a/spec/ione/io/server_connection_spec.rb b/spec/ione/io/server_connection_spec.rb index 92fe93c..d37782e 100644 --- a/spec/ione/io/server_connection_spec.rb +++ b/spec/ione/io/server_connection_spec.rb @@ -8,7 +8,7 @@ module Ione module Io describe ServerConnection do let :handler do - described_class.new(socket, 'example.com', 4321, unblocker) + described_class.new(socket, 'example.com', 4321, unblocker, thread_pool) end let :socket do @@ -19,6 +19,14 @@ module Io double(:unblocker, unblock: nil) end + let :thread_pool do + FakeThreadPool.new + end + + before do + thread_pool.auto_run = true + end + it_behaves_like 'a connection' describe '#to_io' do diff --git a/spec/ione/io/ssl_acceptor_spec.rb b/spec/ione/io/ssl_acceptor_spec.rb index 3772c38..de423ff 100644 --- a/spec/ione/io/ssl_acceptor_spec.rb +++ b/spec/ione/io/ssl_acceptor_spec.rb @@ -7,13 +7,17 @@ module Ione module Io describe SslAcceptor do let :acceptor do - described_class.new('example.com', 4321, backlog = 3, unblocker, reactor, ssl_context, socket_impl, ssl_socket_impl) + described_class.new('example.com', 4321, backlog = 3, unblocker, thread_pool, reactor, ssl_context, socket_impl, ssl_socket_impl) end let :unblocker do double(:unblocker) end + let :thread_pool do + FakeThreadPool.new(true) + end + let :reactor do double(:reactor) end diff --git a/spec/ione/io/ssl_connection_spec.rb b/spec/ione/io/ssl_connection_spec.rb index 7616c12..c4a8a9b 100644 --- a/spec/ione/io/ssl_connection_spec.rb +++ b/spec/ione/io/ssl_connection_spec.rb @@ -8,7 +8,7 @@ module Ione module Io describe SslConnection do let :handler do - described_class.new('example.com', 55555, raw_socket, unblocker, ssl_context, socket_impl) + described_class.new('example.com', 55555, raw_socket, unblocker, thread_pool, ssl_context, socket_impl) end let :socket_impl do @@ -27,6 +27,10 @@ module Io double(:unblocker, unblock: nil) end + let :thread_pool do + FakeThreadPool.new(true) + end + let :ssl_context do double(:ssl_context) end @@ -58,7 +62,7 @@ module Io it 'does not pass the context parameter when the SSL context is nil' do socket_impl.stub(:new).and_return(ssl_socket) - h = described_class.new('example.com', 55555, raw_socket, unblocker, nil, socket_impl) + h = described_class.new('example.com', 55555, raw_socket, unblocker, thread_pool, nil, socket_impl) h.connect socket_impl.should have_received(:new).with(raw_socket) end diff --git a/spec/ione/io/ssl_server_connection_spec.rb b/spec/ione/io/ssl_server_connection_spec.rb index 4ad9d10..da8c904 100644 --- a/spec/ione/io/ssl_server_connection_spec.rb +++ b/spec/ione/io/ssl_server_connection_spec.rb @@ -8,7 +8,7 @@ module Ione module Io describe SslServerConnection do let :handler do - described_class.new(socket, 'example.com', 4444, unblocker, ssl_context, accept_callback, ssl_socket_impl) + described_class.new(socket, 'example.com', 4444, unblocker, thread_pool, ssl_context, accept_callback, ssl_socket_impl) end let :socket do @@ -19,6 +19,10 @@ module Io double(:unblocker) end + let :thread_pool do + FakeThreadPool.new(true) + end + let :ssl_context do double(:ssl_context) end diff --git a/spec/ione/thread_pool_spec.rb b/spec/ione/thread_pool_spec.rb new file mode 100644 index 0000000..5a9ca2d --- /dev/null +++ b/spec/ione/thread_pool_spec.rb @@ -0,0 +1,32 @@ +# encoding: utf-8 + +require 'spec_helper' + + +module Ione + describe NullThreadPool do + let :thread_pool do + described_class.new + end + + describe '#submit' do + it 'calls the block immediately' do + called = false + thread_pool.submit { called = true } + called.should be_true + end + + it 'returns a resolved future with the result of the block' do + f = thread_pool.submit { 2 * 4 } + f.value.should eq(8) + end + + context 'when the task raises an error' do + it 'returns a failed future' do + f = thread_pool.submit { raise 'bork' } + expect { f.value }.to raise_error('bork') + end + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 52ce18b..909638e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,6 +5,7 @@ require 'bundler/setup' require 'support/fake_server' +require 'support/fake_thread_pool' require 'support/await_helper' require 'support/server_helper' diff --git a/spec/support/fake_thread_pool.rb b/spec/support/fake_thread_pool.rb new file mode 100644 index 0000000..0b09325 --- /dev/null +++ b/spec/support/fake_thread_pool.rb @@ -0,0 +1,31 @@ +# encoding: utf-8 + +class FakeThreadPool + attr_accessor :auto_run + + def initialize(auto_run=false) + @auto_run = auto_run + @tasks = [] + end + + def run_all + until @tasks.empty? + task, promise = @tasks.shift + promise.try { task.call } + end + end + + def submit(&task) + if @auto_run + begin + Ione::Future.resolved(task.call) + rescue => e + Ione::Future.failed(e) + end + else + promise = Ione::Promise.new + @tasks << [task, promise] + promise.future + end + end +end