diff --git a/CHANGELOG.md b/CHANGELOG.md index 65159a69..6347293e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +* [#1036](https://github.com/toptal/chewy/issues/1036): Add `Chewy.close_client` and `Chewy::ElasticClient#close` to explicitly close connections to Elasticsearch, avoiding file descriptor leaks in long-lived multi-threaded processes (e.g. Sidekiq). ([@AlfonsoUceda][]) + ### Bug Fixes ### Changes @@ -888,6 +890,7 @@ [@AgeevAndrew]: https://github.com/AgeevAndrew [@aglushkov]: https://github.com/aglushkov [@AlexVPopov]: https://github.com/AlexVPopov +[@AlfonsoUceda]: https://github.com/AlfonsoUceda [@AndreySavelyev]: https://github.com/AndreySavelyev [@afg419]: https://github.com/afg419 [@arion]: https://github.com/arion diff --git a/README.md b/README.md index ae89d1eb..c3f4e938 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,23 @@ development: ca_file: './tmp/http_ca.crt' ``` +### Closing connections + +`Chewy.client` is memoized per thread, so every thread that touches Chewy gets +its own client with its own connections to Elasticsearch. Neither +`elasticsearch-ruby` nor `elastic-transport` expose a public way to close those +connections, so they are only released when Ruby's garbage collector reclaims +the client. In long-lived, multi-threaded processes that frequently spawn and +discard threads (e.g. Sidekiq, which replaces a thread on job failure), this can +leak file descriptors. + +Use `Chewy.close_client` to close the current thread's connections and drop its +client. The next `Chewy.client` call rebuilds a fresh one: + +```ruby +Chewy.close_client +``` + ### Index Create `app/chewy/users_index.rb` with User Index: diff --git a/lib/chewy.rb b/lib/chewy.rb index d573b654..1fad35b9 100644 --- a/lib/chewy.rb +++ b/lib/chewy.rb @@ -102,6 +102,21 @@ def client Chewy.current[:chewy_client] ||= Chewy::ElasticClient.new end + # Closes the current thread's client connections to Elasticsearch and + # drops the thread-local client, so the next `Chewy.client` call builds a + # fresh one. + # + # Useful in long-lived multi-threaded processes (e.g. Sidekiq) where the + # per-thread client would otherwise keep its connections open until the + # dead thread is garbage collected, leaking file descriptors. + def close_client + client = Chewy.current[:chewy_client] + return unless client + + client.close + Chewy.current[:chewy_client] = nil + end + # Sends wait_for_status request to ElasticSearch with status # defined in configuration. # diff --git a/lib/chewy/elastic_client.rb b/lib/chewy/elastic_client.rb index 41a985cc..15ec76fe 100644 --- a/lib/chewy/elastic_client.rb +++ b/lib/chewy/elastic_client.rb @@ -12,6 +12,21 @@ def initialize(elastic_client = self.class.build_es_client) @elastic_client = elastic_client end + # Closes the underlying connections to Elasticsearch. + # + # Neither elasticsearch-ruby nor elastic-transport expose a public method + # to close connections, so they are only released when Ruby's garbage + # collector reclaims the client instance. This reaches down to the Faraday + # connection of every transport connection and closes it explicitly, which + # is useful to avoid file descriptor leaks in long-lived processes that + # build a client per thread (e.g. Sidekiq workers). + def close + @elastic_client.transport.connections.each do |connection| + faraday = connection.connection + faraday.close if faraday.respond_to?(:close) + end + end + private def method_missing(name, *args, **kwargs, &block) diff --git a/spec/chewy/elastic_client_spec.rb b/spec/chewy/elastic_client_spec.rb index 58cdc0cc..0c79814a 100644 --- a/spec/chewy/elastic_client_spec.rb +++ b/spec/chewy/elastic_client_spec.rb @@ -23,4 +23,24 @@ Chewy.client.search({index: ['products'], body: {size: 0}}).to_a end end + + describe '#close' do + let(:faraday_connection) { double(:faraday_connection) } + let(:connection) { double(:connection, connection: faraday_connection) } + let(:transport) { double(:transport, connections: [connection]) } + let(:elastic_client) { double(:elastic_client, transport: transport) } + let(:client) { described_class.new(elastic_client) } + + it 'closes every underlying Faraday connection' do + allow(faraday_connection).to receive(:respond_to?).with(:close).and_return(true) + expect(faraday_connection).to receive(:close) + client.close + end + + it 'skips connections that do not support close' do + allow(faraday_connection).to receive(:respond_to?).with(:close).and_return(false) + expect(faraday_connection).not_to receive(:close) + expect { client.close }.not_to raise_error + end + end end diff --git a/spec/chewy_spec.rb b/spec/chewy_spec.rb index 17e0511c..4a430e1a 100644 --- a/spec/chewy_spec.rb +++ b/spec/chewy_spec.rb @@ -58,6 +58,33 @@ after { Chewy.current[:chewy_client] = initial_client } end + describe '.close_client' do + let!(:initial_client) { Chewy.current[:chewy_client] } + + after { Chewy.current[:chewy_client] = initial_client } + + context 'when a client exists for the current thread' do + let(:client) { instance_double(Chewy::ElasticClient) } + + before { Chewy.current[:chewy_client] = client } + + specify 'closes the client and clears the thread-local' do + expect(client).to receive(:close) + Chewy.close_client + expect(Chewy.current[:chewy_client]).to be_nil + end + end + + context 'when no client exists for the current thread' do + before { Chewy.current[:chewy_client] = nil } + + specify 'does nothing' do + expect { Chewy.close_client }.not_to raise_error + expect(Chewy.current[:chewy_client]).to be_nil + end + end + end + describe '.create_indices' do before do stub_index(:cities)