Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#1036](https://github.com/toptal/chewy/issues/1036): Add `Chewy.close_client` and `Chewy::ElasticClient#close` to explicitly close connections to Elasticsearch, avoiding file descriptor leaks in long-lived multi-threaded processes (e.g. Sidekiq). ([@AlfonsoUceda][])

### Bug Fixes

### Changes
Expand Down Expand Up @@ -888,6 +890,7 @@
[@AgeevAndrew]: https://github.com/AgeevAndrew
[@aglushkov]: https://github.com/aglushkov
[@AlexVPopov]: https://github.com/AlexVPopov
[@AlfonsoUceda]: https://github.com/AlfonsoUceda
[@AndreySavelyev]: https://github.com/AndreySavelyev
[@afg419]: https://github.com/afg419
[@arion]: https://github.com/arion
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ development:
ca_file: './tmp/http_ca.crt'
```

### Closing connections

`Chewy.client` is memoized per thread, so every thread that touches Chewy gets
its own client with its own connections to Elasticsearch. Neither
`elasticsearch-ruby` nor `elastic-transport` expose a public way to close those
connections, so they are only released when Ruby's garbage collector reclaims
the client. In long-lived, multi-threaded processes that frequently spawn and
discard threads (e.g. Sidekiq, which replaces a thread on job failure), this can
leak file descriptors.

Use `Chewy.close_client` to close the current thread's connections and drop its
client. The next `Chewy.client` call rebuilds a fresh one:

```ruby
Chewy.close_client
```

### Index

Create `app/chewy/users_index.rb` with User Index:
Expand Down
15 changes: 15 additions & 0 deletions lib/chewy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ def client
Chewy.current[:chewy_client] ||= Chewy::ElasticClient.new
end

# Closes the current thread's client connections to Elasticsearch and
# drops the thread-local client, so the next `Chewy.client` call builds a
# fresh one.
#
# Useful in long-lived multi-threaded processes (e.g. Sidekiq) where the
# per-thread client would otherwise keep its connections open until the
# dead thread is garbage collected, leaking file descriptors.
def close_client
client = Chewy.current[:chewy_client]
return unless client

client.close
Chewy.current[:chewy_client] = nil
end

# Sends wait_for_status request to ElasticSearch with status
# defined in configuration.
#
Expand Down
15 changes: 15 additions & 0 deletions lib/chewy/elastic_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ def initialize(elastic_client = self.class.build_es_client)
@elastic_client = elastic_client
end

# Closes the underlying connections to Elasticsearch.
#
# Neither elasticsearch-ruby nor elastic-transport expose a public method
# to close connections, so they are only released when Ruby's garbage
# collector reclaims the client instance. This reaches down to the Faraday
# connection of every transport connection and closes it explicitly, which
# is useful to avoid file descriptor leaks in long-lived processes that
# build a client per thread (e.g. Sidekiq workers).
def close
@elastic_client.transport.connections.each do |connection|
faraday = connection.connection
faraday.close if faraday.respond_to?(:close)
end
end

private

def method_missing(name, *args, **kwargs, &block)
Expand Down
20 changes: 20 additions & 0 deletions spec/chewy/elastic_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,24 @@
Chewy.client.search({index: ['products'], body: {size: 0}}).to_a
end
end

describe '#close' do
let(:faraday_connection) { double(:faraday_connection) }
let(:connection) { double(:connection, connection: faraday_connection) }
let(:transport) { double(:transport, connections: [connection]) }
let(:elastic_client) { double(:elastic_client, transport: transport) }
let(:client) { described_class.new(elastic_client) }

it 'closes every underlying Faraday connection' do
allow(faraday_connection).to receive(:respond_to?).with(:close).and_return(true)
expect(faraday_connection).to receive(:close)
client.close
end

it 'skips connections that do not support close' do
allow(faraday_connection).to receive(:respond_to?).with(:close).and_return(false)
expect(faraday_connection).not_to receive(:close)
expect { client.close }.not_to raise_error
end
end
end
27 changes: 27 additions & 0 deletions spec/chewy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@
after { Chewy.current[:chewy_client] = initial_client }
end

describe '.close_client' do
let!(:initial_client) { Chewy.current[:chewy_client] }

after { Chewy.current[:chewy_client] = initial_client }

context 'when a client exists for the current thread' do
let(:client) { instance_double(Chewy::ElasticClient) }

before { Chewy.current[:chewy_client] = client }

specify 'closes the client and clears the thread-local' do
expect(client).to receive(:close)
Chewy.close_client
expect(Chewy.current[:chewy_client]).to be_nil
end
end

context 'when no client exists for the current thread' do
before { Chewy.current[:chewy_client] = nil }

specify 'does nothing' do
expect { Chewy.close_client }.not_to raise_error
expect(Chewy.current[:chewy_client]).to be_nil
end
end
end

describe '.create_indices' do
before do
stub_index(:cities)
Expand Down