From d5d061a5922dcdbcf346f3ebb3002b9db06a95f2 Mon Sep 17 00:00:00 2001 From: John Billings Date: Fri, 26 Sep 2014 17:32:22 -0700 Subject: [PATCH 01/14] Add 'use_previous_backends' option. * This option defaults to true: if there are no default servers and a watcher reports no backends, then use the previous backends that we already know about. * Factor out code that updates '@backends' into the watcher base class. --- README.md | 8 +- lib/synapse/service_watcher/base.rb | 37 ++++++++- lib/synapse/service_watcher/dns.rb | 16 +--- lib/synapse/service_watcher/docker.rb | 26 +------ lib/synapse/service_watcher/ec2tag.rb | 27 +------ lib/synapse/service_watcher/zookeeper.rb | 16 +--- spec/lib/synapse/service_watcher_base_spec.rb | 75 ++++++++++++++++--- .../synapse/service_watcher_docker_spec.rb | 34 +-------- .../synapse/service_watcher_ec2tags_spec.rb | 35 --------- 9 files changed, 113 insertions(+), 161 deletions(-) diff --git a/README.md b/README.md index 8de17e5b..0a5e8770 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,10 @@ If you do not list any default servers, no proxy will be created. The `default_servers` will also be used in addition to discovered servers if the `keep_default_servers` option is set. +If you do not list any `default_servers`, and all backends for a service +disappear then the previous known backends will be used. Disable this behavior +by unsetting `use_previous_backends`. + #### The `haproxy` Section #### This section is its own hash, which should contain the following keys: @@ -335,5 +339,5 @@ end 3. Implement the `start` and `validate_discovery_opts` methods 4. Implement whatever additional methods your discovery requires -When your watcher detects a list of new backends, they should be written to `@backends`. -You should then call `@synapse.configure` to force synapse to update the HAProxy config. +When your watcher detects a list of new backends, you should call `set_backends` to +store the new backends and update the HAProxy config. diff --git a/lib/synapse/service_watcher/base.rb b/lib/synapse/service_watcher/base.rb index 3d4b0d48..1f8bfbc5 100644 --- a/lib/synapse/service_watcher/base.rb +++ b/lib/synapse/service_watcher/base.rb @@ -1,3 +1,4 @@ +require 'set' require 'synapse/log' module Synapse @@ -42,6 +43,10 @@ def initialize(opts={}, synapse) @keep_default_servers = opts['keep_default_servers'] || false + # If there are no default servers and a watcher reports no backends, then + # use the previous backends that we already know about. + @use_previous_backends = opts.fetch('use_previous_backends', true) + # set a flag used to tell the watchers to exit # this is not used in every watcher @should_exit = false @@ -95,13 +100,41 @@ def validate_discovery_opts end def set_backends(new_backends) - if @keep_default_servers - @backends = @default_servers + new_backends + new_backends = (new_backends + (@keep_default_servers ? @default_servers : [])).uniq + + if new_backends.to_set == @backends.to_set + return false + end + + if new_backends.empty? + if @default_servers.empty? + if @use_previous_backends + # Discard this update + log.warn "synapse: no default servers for service #{@name};" \ + " using previous backends: #{@backends.inspect}" + return false + else + log.warn "synapse: no default servers for service #{@name} and" \ + " 'use_previous_backends' is disabled; dropping all backends" + @backends.clear + end + else + log.warn "synapse: no backends for service #{@name};" \ + " using default servers: #{@default_servers.inspect}" + @backends = @default_servers + end else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" @backends = new_backends end + + reconfigure! + + return true end + # Subclasses should not invoke this directly; it's only exposed so that it + # can be overridden in subclasses. def reconfigure! @synapse.reconfigure! end diff --git a/lib/synapse/service_watcher/dns.rb b/lib/synapse/service_watcher/dns.rb index d59c6971..32b15515 100644 --- a/lib/synapse/service_watcher/dns.rb +++ b/lib/synapse/service_watcher/dns.rb @@ -89,21 +89,7 @@ def configure_backends(servers) end end - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end - - reconfigure! + set_backends(new_backends) end end end diff --git a/lib/synapse/service_watcher/docker.rb b/lib/synapse/service_watcher/docker.rb index 75c95434..20b6125c 100644 --- a/lib/synapse/service_watcher/docker.rb +++ b/lib/synapse/service_watcher/docker.rb @@ -23,16 +23,10 @@ def validate_discovery_opts end def watch - last_containers = [] until @should_exit begin start = Time.now - current_containers = containers - unless last_containers == current_containers - last_containers = current_containers - configure_backends(last_containers) - end - + set_backends(containers) sleep_until_next_check(start) rescue Exception => e log.warn "synapse: error in watcher thread: #{e.inspect}" @@ -98,23 +92,5 @@ def containers log.warn "synapse: error while polling for containers: #{e.inspect}" [] end - - def configure_backends(new_backends) - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end - reconfigure! - end - end end diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a3319c26..85c9e6f9 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -54,20 +54,12 @@ def validate_discovery_opts end def watch - last_backends = [] until @should_exit begin start = Time.now - current_backends = discover_instances - - if last_backends != current_backends + if set_backends(discover_instances) log.info "synapse: ec2tag watcher backends have changed." - last_backends = current_backends - configure_backends(current_backends) - else - log.info "synapse: ec2tag watcher backends are unchanged." end - sleep_until_next_check(start) rescue Exception => e log.warn "synapse: error in ec2tag watcher thread: #{e.inspect}" @@ -111,23 +103,6 @@ def instances_with_tags(tag_name, tag_value) .tagged_values(tag_value) .select { |i| i.status == :running } end - - def configure_backends(new_backends) - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - @backends = new_backends - end - @synapse.reconfigure! - end end end diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 4a88e077..80c05f68 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -45,7 +45,7 @@ def create(path) @zk.create(path, ignore: :node_exists) end - # find the current backends at the discovery path; sets @backends + # find the current backends at the discovery path def discover log.info "synapse: discovering backends for service #{@name}" @@ -69,17 +69,7 @@ def discover end end - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end + set_backends(new_backends) end # sets up zookeeper callbacks if the data at the discovery path changes @@ -103,8 +93,6 @@ def watcher_callback watch # Rediscover discover - # send a message to calling class to reconfigure - reconfigure! end end diff --git a/spec/lib/synapse/service_watcher_base_spec.rb b/spec/lib/synapse/service_watcher_base_spec.rb index af3e8172..c1042a17 100644 --- a/spec/lib/synapse/service_watcher_base_spec.rb +++ b/spec/lib/synapse/service_watcher_base_spec.rb @@ -6,7 +6,7 @@ class Synapse::BaseWatcher describe Synapse::BaseWatcher do let(:mocksynapse) { double() } - subject { Synapse::BaseWatcher.new(args, mocksynapse) } + subject { Synapse::BaseWatcher.new(args, mocksynapse) } let(:testargs) { { 'name' => 'foo', 'discovery' => { 'method' => 'base' }, 'haproxy' => {} }} def remove_arg(name) @@ -37,18 +37,75 @@ def remove_arg(name) end end - context "with default_servers" do - default_servers = ['server1', 'server2'] + context 'set_backends test' do + default_servers = ['default_server_1', 'default_server_2'] let(:args) { testargs.merge({'default_servers' => default_servers}) } - it('sets default backends to default_servers') { expect(subject.backends).to equal(default_servers) } - context "with keep_default_servers set" do - let(:args) { testargs.merge({'default_servers' => default_servers, 'keep_default_servers' => true}) } - let(:new_backends) { ['discovered1', 'discovered2'] } + it 'sets backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + backends = ['server1', 'server2'] + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + end + + it 'removes duplicate backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + backends = ['server1', 'server2'] + duplicate_backends = backends + backends + expect(subject.send(:set_backends, duplicate_backends)).to equal(true) + expect(subject.backends).to eq(backends) + end + + it 'sets backends to default_servers if no backends discovered' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, [])).to equal(true) + expect(subject.backends).to eq(default_servers) + end + + context 'with no default_servers' do + let(:args) { remove_arg 'default_servers' } + it 'uses previous backends if no default_servers set' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + backends = ['server1', 'server2'] + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.send(:set_backends, [])).to equal(false) + expect(subject.backends).to eq(backends) + end + end + + context 'with no default_servers set and use_previous_backends disabled' do + let(:args) { + remove_arg 'default_servers' + testargs.merge({'use_previous_backends' => false}) + } + it 'removes all backends if no default_servers set and use_previous_backends disabled' do + expect(subject).to receive(:'reconfigure!').exactly(:twice) + backends = ['server1', 'server2'] + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.send(:set_backends, [])).to equal(true) + expect(subject.backends).to eq([]) + end + end + + it 'calls reconfigure only once for duplicate backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + backends = ['server1', 'server2'] + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.send(:set_backends, backends)).to equal(false) + expect(subject.backends).to eq(backends) + end + context 'with keep_default_servers set' do + let(:args) { + testargs.merge({'default_servers' => default_servers, 'keep_default_servers' => true}) + } it('keeps default_servers when setting backends') do - subject.send(:set_backends, new_backends) - expect(subject.backends).to eq(default_servers + new_backends) + backends = ['server1', 'server2'] + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends + default_servers) end end end diff --git a/spec/lib/synapse/service_watcher_docker_spec.rb b/spec/lib/synapse/service_watcher_docker_spec.rb index e5411222..d1a5c5a3 100644 --- a/spec/lib/synapse/service_watcher_docker_spec.rb +++ b/spec/lib/synapse/service_watcher_docker_spec.rb @@ -46,12 +46,7 @@ def add_arg(name, value) end it('has a happy first run path, configuring backends') do expect(subject).to receive(:containers).and_return(['container1']) - expect(subject).to receive(:configure_backends).with(['container1']) - subject.send(:watch) - end - it('does not call configure_backends if there is no change') do - expect(subject).to receive(:containers).and_return([]) - expect(subject).to_not receive(:configure_backends) + expect(subject).to receive(:set_backends).with(['container1']) subject.send(:watch) end end @@ -65,33 +60,6 @@ def add_arg(name, value) end end - context "configure_backends tests" do - before(:each) do - expect(subject.synapse).to receive(:'reconfigure!').at_least(:once) - end - it 'runs' do - expect { subject.send(:configure_backends, []) }.not_to raise_error - end - it 'sets backends right' do - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - end - it 'resets to default backends if no container found' do - subject.default_servers = ['fallback1'] - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - subject.send(:configure_backends, []) - expect(subject.backends).to eq(['fallback1']) - end - it 'does not reset to default backends if there are no default backends' do - subject.default_servers = [] - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - subject.send(:configure_backends, []) - expect(subject.backends).to eq(['foo']) - end - end - context "rewrite_container_ports tests" do it 'doesnt break if Ports => nil' do subject.send(:rewrite_container_ports, nil) diff --git a/spec/lib/synapse/service_watcher_ec2tags_spec.rb b/spec/lib/synapse/service_watcher_ec2tags_spec.rb index 40ff8df3..8feec983 100644 --- a/spec/lib/synapse/service_watcher_ec2tags_spec.rb +++ b/spec/lib/synapse/service_watcher_ec2tags_spec.rb @@ -181,40 +181,5 @@ def munge_haproxy_arg(name, new_value) end end end - - context "configure_backends tests" do - let(:backend1) { { 'name' => 'foo', 'host' => 'foo.backend.tld', 'port' => '123' } } - let(:backend2) { { 'name' => 'bar', 'host' => 'bar.backend.tld', 'port' => '456' } } - let(:fallback) { { 'name' => 'fall', 'host' => 'fall.backend.tld', 'port' => '789' } } - - before(:each) do - expect(subject.synapse).to receive(:'reconfigure!').at_least(:once) - end - - it 'runs' do - expect { subject.send(:configure_backends, []) }.not_to raise_error - end - - it 'sets backends correctly' do - subject.send(:configure_backends, [ backend1, backend2 ]) - expect(subject.backends).to eq([ backend1, backend2 ]) - end - - it 'resets to default backends if no instances are found' do - subject.default_servers = [ fallback ] - subject.send(:configure_backends, [ backend1 ]) - expect(subject.backends).to eq([ backend1 ]) - subject.send(:configure_backends, []) - expect(subject.backends).to eq([ fallback ]) - end - - it 'does not reset to default backends if there are no default backends' do - subject.default_servers = [] - subject.send(:configure_backends, [ backend1 ]) - expect(subject.backends).to eq([ backend1 ]) - subject.send(:configure_backends, []) - expect(subject.backends).to eq([ backend1 ]) - end - end end From 35f1f8f1f703eab39b2945ac53d459c98baa2f58 Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Fri, 27 Feb 2015 14:28:43 -0800 Subject: [PATCH 02/14] Allow registrations to be manifested on the file system If the configuration specifies a file_output key, synapse will manifest and update registrations on the filesystem in an atomic way. This is useful for applications that do not wish to communicate with service backends via haproxy --- lib/synapse.rb | 19 +++++++++++--- lib/synapse/file_output.rb | 53 ++++++++++++++++++++++++++++++++++++++ lib/synapse/haproxy.rb | 3 ++- 3 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 lib/synapse/file_output.rb diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..12c20aaf 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -1,6 +1,7 @@ require "synapse/version" require "synapse/service_watcher/base" require "synapse/haproxy" +require "synapse/file_output" require "synapse/service_watcher" require "synapse/log" @@ -17,9 +18,17 @@ def initialize(opts={}) raise "specify a list of services to connect in the config" unless opts.has_key?('services') @service_watchers = create_service_watchers(opts['services']) - # create the haproxy object + # create objects that need to be notified of service changes + @config_generators = [] + # create the haproxy config generator, this is mandatory raise "haproxy config section is missing" unless opts.has_key?('haproxy') - @haproxy = Haproxy.new(opts['haproxy']) + @config_generators << Haproxy.new(opts['haproxy']) + + # possibly create a file manifestation for services that do not + # want to communicate via haproxy, e.g. cassandra + if opts.has_key?('file_output') + @config_generators << FileOutput.new(opts['file_output']) + end # configuration is initially enabled to configure on first loop @config_updated = true @@ -47,8 +56,10 @@ def run if @config_updated @config_updated = false - log.info "synapse: regenerating haproxy config" - @haproxy.update_config(@service_watchers) + @config_generators.each do |config_generator| + log.info "synapse: regenerating #{config_generator.name} config" + config_generator.update_config(@service_watchers) + end else sleep 1 end diff --git a/lib/synapse/file_output.rb b/lib/synapse/file_output.rb new file mode 100644 index 00000000..a10884e3 --- /dev/null +++ b/lib/synapse/file_output.rb @@ -0,0 +1,53 @@ +require 'synapse/log' +require 'fileutils' +require 'tempfile' + +module Synapse + class FileOutput + include Logging + attr_reader :opts, :name + + def initialize(opts) + super() + + unless opts.has_key?("output_directory") + raise ArgumentError, "flat file generation requires an output_directory key" + end + + begin + FileUtils.mkdir_p(opts['output_directory']) + rescue SystemCallError => err + raise ArgumentError, "provided output directory #{opts['output_directory']} is not present or creatable" + end + + @opts = opts + @name = 'file_output' + end + + def update_config(watchers) + watchers.each do |watcher| + write_backends_to_file(watcher.name, watcher.backends) + end + end + + def write_backends_to_file(service_name, new_backends) + data_path = File.join(@opts['output_directory'], "#{service_name}.json") + begin + old_backends = JSON.load(File.read(data_path)) + rescue Errno::ENOENT + old_backends = nil + end + + if old_backends == new_backends + return false + else + # Atomically write new sevice configuration file + temp_path = File.join(@opts['output_directory'], + ".#{service_name}.json.tmp") + File.open(temp_path, 'w', 0644) {|f| f.write(new_backends.to_json)} + FileUtils.mv(temp_path, data_path) + return true + end + end + end +end diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 2093bb99..6873e233 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -4,7 +4,7 @@ module Synapse class Haproxy include Logging - attr_reader :opts + attr_reader :opts, :name # these come from the documentation for haproxy 1.5 # http://haproxy.1wt.eu/download/1.5/doc/configuration.txt @@ -523,6 +523,7 @@ def initialize(opts) end @opts = opts + @name = 'haproxy' # how to restart haproxy @restart_interval = 2 From 1fdd57e2d31c8e19eb3fe22f6ef737c3e8b22ce0 Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Mon, 16 Mar 2015 13:52:15 -0700 Subject: [PATCH 03/14] Explicitly deduplicate registrations --- lib/synapse/service_watcher/base.rb | 5 ++++- spec/lib/synapse/service_watcher_base_spec.rb | 15 ++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/synapse/service_watcher/base.rb b/lib/synapse/service_watcher/base.rb index 1f8bfbc5..81c2f9a6 100644 --- a/lib/synapse/service_watcher/base.rb +++ b/lib/synapse/service_watcher/base.rb @@ -100,7 +100,10 @@ def validate_discovery_opts end def set_backends(new_backends) - new_backends = (new_backends + (@keep_default_servers ? @default_servers : [])).uniq + # Aggregate and deduplicate all potential backend service instances. + new_backends = (new_backends + (@keep_default_servers ? @default_servers : [])).uniq {|b| + [b['host'], b['port'], b.fetch('name', '')] + } if new_backends.to_set == @backends.to_set return false diff --git a/spec/lib/synapse/service_watcher_base_spec.rb b/spec/lib/synapse/service_watcher_base_spec.rb index c1042a17..f2e4e6eb 100644 --- a/spec/lib/synapse/service_watcher_base_spec.rb +++ b/spec/lib/synapse/service_watcher_base_spec.rb @@ -38,19 +38,24 @@ def remove_arg(name) end context 'set_backends test' do - default_servers = ['default_server_1', 'default_server_2'] + default_servers = [ + {'name' => 'default_server1', 'host' => 'default_server1', 'port' => 123}, + {'name' => 'default_server2', 'host' => 'default_server2', 'port' => 123} + ] + backends = [ + {'name' => 'server1', 'host' => 'server1', 'port' => 123}, + {'name' => 'server2', 'host' => 'server2', 'port' => 123} + ] let(:args) { testargs.merge({'default_servers' => default_servers}) } it 'sets backends' do expect(subject).to receive(:'reconfigure!').exactly(:once) - backends = ['server1', 'server2'] expect(subject.send(:set_backends, backends)).to equal(true) expect(subject.backends).to eq(backends) end it 'removes duplicate backends' do expect(subject).to receive(:'reconfigure!').exactly(:once) - backends = ['server1', 'server2'] duplicate_backends = backends + backends expect(subject.send(:set_backends, duplicate_backends)).to equal(true) expect(subject.backends).to eq(backends) @@ -66,7 +71,6 @@ def remove_arg(name) let(:args) { remove_arg 'default_servers' } it 'uses previous backends if no default_servers set' do expect(subject).to receive(:'reconfigure!').exactly(:once) - backends = ['server1', 'server2'] expect(subject.send(:set_backends, backends)).to equal(true) expect(subject.send(:set_backends, [])).to equal(false) expect(subject.backends).to eq(backends) @@ -80,7 +84,6 @@ def remove_arg(name) } it 'removes all backends if no default_servers set and use_previous_backends disabled' do expect(subject).to receive(:'reconfigure!').exactly(:twice) - backends = ['server1', 'server2'] expect(subject.send(:set_backends, backends)).to equal(true) expect(subject.backends).to eq(backends) expect(subject.send(:set_backends, [])).to equal(true) @@ -90,7 +93,6 @@ def remove_arg(name) it 'calls reconfigure only once for duplicate backends' do expect(subject).to receive(:'reconfigure!').exactly(:once) - backends = ['server1', 'server2'] expect(subject.send(:set_backends, backends)).to equal(true) expect(subject.backends).to eq(backends) expect(subject.send(:set_backends, backends)).to equal(false) @@ -102,7 +104,6 @@ def remove_arg(name) testargs.merge({'default_servers' => default_servers, 'keep_default_servers' => true}) } it('keeps default_servers when setting backends') do - backends = ['server1', 'server2'] expect(subject).to receive(:'reconfigure!').exactly(:once) expect(subject.send(:set_backends, backends)).to equal(true) expect(subject.backends).to eq(backends + default_servers) From 6eb58508137929b289b6c4776598971d2daa9b39 Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Mon, 16 Mar 2015 14:45:27 -0700 Subject: [PATCH 04/14] Allow the option allredisp option to haproxy. Note that at this time only the Yelp fork of haproxy has support for redispatching on every retry. --- lib/synapse/haproxy.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 6873e233..3f5c490d 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -43,6 +43,7 @@ class Haproxy "option abortonclose", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option forceclose", "option forwardfor", @@ -173,6 +174,7 @@ class Haproxy "option accept-invalid-http-request", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option clitcpka", "option contstats", @@ -386,6 +388,7 @@ class Haproxy "option accept-invalid-http-request", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option clitcpka", "option contstats", From 136d1fe1ef5347c82a9f4d7afe42097b24c3295e Mon Sep 17 00:00:00 2001 From: John Billings Date: Wed, 20 May 2015 11:08:10 -0700 Subject: [PATCH 05/14] Add rate limiter. --- lib/synapse.rb | 10 ++- lib/synapse/haproxy.rb | 8 -- lib/synapse/rate_limiter.rb | 74 +++++++++++++++++ spec/lib/synapse/rate_limiter_spec.rb | 109 ++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 11 deletions(-) create mode 100644 lib/synapse/rate_limiter.rb create mode 100644 spec/lib/synapse/rate_limiter_spec.rb diff --git a/lib/synapse.rb b/lib/synapse.rb index 12c20aaf..1b80b4ed 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -2,6 +2,7 @@ require "synapse/service_watcher/base" require "synapse/haproxy" require "synapse/file_output" +require "synapse/rate_limiter" require "synapse/service_watcher" require "synapse/log" @@ -33,6 +34,8 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = true + @rate_limiter = RateLimiter.new(opts['rate_limiter_path']) + # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true @@ -54,16 +57,17 @@ def run raise "synapse: service watcher #{w.name} failed ping!" unless w.ping? end - if @config_updated + if @config_updated && @rate_limiter.proceed? @config_updated = false @config_generators.each do |config_generator| log.info "synapse: regenerating #{config_generator.name} config" config_generator.update_config(@service_watchers) end - else - sleep 1 end + @rate_limiter.tick + sleep 1 + loops += 1 log.debug "synapse: still running at #{Time.now}" if (loops % 60) == 0 end diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 3f5c490d..e11b2b3a 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -529,9 +529,7 @@ def initialize(opts) @name = 'haproxy' # how to restart haproxy - @restart_interval = 2 @restart_required = true - @last_restart = Time.new(0) # a place to store the parsed haproxy config from each watcher @watcher_configs = {} @@ -775,16 +773,10 @@ def write_config(new_config) # restarts haproxy def restart - # sleep if we restarted too recently - delay = (@last_restart - Time.now) + @restart_interval - sleep(delay) if delay > 0 - - # do the actual restart res = `#{opts['reload_command']}`.chomp raise "failed to reload haproxy via #{opts['reload_command']}: #{res}" unless $?.success? log.info "synapse: restarted haproxy" - @last_restart = Time.now() @restart_required = false end diff --git a/lib/synapse/rate_limiter.rb b/lib/synapse/rate_limiter.rb new file mode 100644 index 00000000..893f6508 --- /dev/null +++ b/lib/synapse/rate_limiter.rb @@ -0,0 +1,74 @@ +require 'json' + +module Synapse + class RateLimiter + include Logging + + def initialize(path) + @path = path + + # Our virtual clock + @time = 0 + + # Average at most one restart per minute, with the ability to burst to + # two restarts per minute + @tokens = 0 + @max_tokens = 2 + @token_period = 60 + + # Restart at most once every two seconds + @last_restart_time = 0 + @min_restart_period = 2 + + # Try reading state back from disk + unless @path.nil? + begin + data = JSON.parse(File.read(@path)) + @time = data["time"] + @last_restart_time = data["last_restart_time"] + @tokens = data["tokens"] + rescue => e + log.warn "Got error reading rate limiter state: #{e}" + end + end + end + + # Should be invoked once per second + def tick + @time += 1 + + # Add a token every @period ticks + if @time % @token_period == 0 + @tokens = [@tokens + 1, @max_tokens].min + end + + # Save state out to disk + data = { + "time" => @time, + "last_restart_time" => @last_restart_time, + "tokens" => @tokens, + } + unless @path.nil? + begin + File.open(@path, "w") do |f| + f.write(data.to_json) + end + rescue + end + end + end + + def proceed? + # Is there an available token? + if @tokens > 0 + # Has it been sufficiently long since our last restart? + if @time - @min_restart_period >= @last_restart_time + @tokens -= 1 + @last_restart_time = @time + return true + end + end + return false + end + end +end diff --git a/spec/lib/synapse/rate_limiter_spec.rb b/spec/lib/synapse/rate_limiter_spec.rb new file mode 100644 index 00000000..3c777aa8 --- /dev/null +++ b/spec/lib/synapse/rate_limiter_spec.rb @@ -0,0 +1,109 @@ +require 'spec_helper' + +class RateLimiter + attr_accessor :time, :tokens, :last_restart_time +end + +describe Synapse::RateLimiter do + let(:subject) { Synapse::RateLimiter.new('test') } + let(:file_like_object) { spy('file_like_object') } + + before { + allow(File).to receive(:open).with('test', 'w').and_yield(file_like_object) + allow(File).to receive(:read).and_raise(Errno::ENOENT.new) + } + + it "saves state" do + subject.time = 666 + subject.last_restart_time = 17 + subject.tokens = 42 + + subject.tick + + expect(file_like_object).to have_received(:write).with( + '{"time":667,"last_restart_time":17,"tokens":42}') + end + + it "initializes" do + expect(subject.time).to eql(0) + expect(subject.last_restart_time).to eql(0) + expect(subject.tokens).to eql(0) + end + + it "handles the nil path" do + subject = Synapse::RateLimiter.new(nil) + subject.proceed? + end + + it "loads state when available" do + allow(File).to receive(:read).and_return( + '{"time":667,"last_restart_time":17,"tokens":42}') + expect(subject.time).to eql(667) + expect(subject.last_restart_time).to eql(17) + expect(subject.tokens).to eql(42) + end + + it "yields a single token after the token period" do + 59.times do + subject.tick + end + expect(subject.time).to eql(59) + expect(subject.tokens).to eql(0) + + subject.tick + expect(subject.time).to eql(60) + expect(subject.tokens).to eql(1) + end + + it "respects the maximum token value" do + 666.times do + subject.tick + end + expect(subject.time).to eql(666) + expect(subject.tokens).to eql(2) + end + + it "does not allow client to proceed if there is no token" do + subject.time = 666 + subject.last_restart_time = 0 + subject.tokens = 0 + + expect(subject.proceed?).to eql(false) + end + + it "allows client to proceed when there is a token" do + subject.time = 666 + subject.last_restart_time = 0 + subject.tokens = 1 + + expect(subject.proceed?).to eql(true) + + expect(subject.time).to eql(666) + expect(subject.last_restart_time).to eql(666) + expect(subject.tokens).to eql(0) + end + + it "does not allow client to proceed too soon after last time" do + subject.time = 666 + subject.last_restart_time = 665 + subject.tokens = 1 + + expect(subject.proceed?).to eql(false) + + expect(subject.time).to eql(666) + expect(subject.last_restart_time).to eql(665) + expect(subject.tokens).to eql(1) + end + + it "allows client to proceed once enough time has passed" do + subject.time = 666 + subject.last_restart_time = 664 + subject.tokens = 1 + + expect(subject.proceed?).to eql(true) + + expect(subject.time).to eql(666) + expect(subject.last_restart_time).to eql(666) + expect(subject.tokens).to eql(0) + end +end From e6d03c243431fc1a4ab0f40e90bd1dc6907178ff Mon Sep 17 00:00:00 2001 From: John Billings Date: Wed, 27 May 2015 11:15:52 -0700 Subject: [PATCH 06/14] Revert "Add rate limiter." This reverts commit 136d1fe1ef5347c82a9f4d7afe42097b24c3295e. --- lib/synapse.rb | 10 +-- lib/synapse/haproxy.rb | 8 ++ lib/synapse/rate_limiter.rb | 74 ----------------- spec/lib/synapse/rate_limiter_spec.rb | 109 -------------------------- 4 files changed, 11 insertions(+), 190 deletions(-) delete mode 100644 lib/synapse/rate_limiter.rb delete mode 100644 spec/lib/synapse/rate_limiter_spec.rb diff --git a/lib/synapse.rb b/lib/synapse.rb index 1b80b4ed..12c20aaf 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -2,7 +2,6 @@ require "synapse/service_watcher/base" require "synapse/haproxy" require "synapse/file_output" -require "synapse/rate_limiter" require "synapse/service_watcher" require "synapse/log" @@ -34,8 +33,6 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = true - @rate_limiter = RateLimiter.new(opts['rate_limiter_path']) - # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true @@ -57,17 +54,16 @@ def run raise "synapse: service watcher #{w.name} failed ping!" unless w.ping? end - if @config_updated && @rate_limiter.proceed? + if @config_updated @config_updated = false @config_generators.each do |config_generator| log.info "synapse: regenerating #{config_generator.name} config" config_generator.update_config(@service_watchers) end + else + sleep 1 end - @rate_limiter.tick - sleep 1 - loops += 1 log.debug "synapse: still running at #{Time.now}" if (loops % 60) == 0 end diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index e11b2b3a..3f5c490d 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -529,7 +529,9 @@ def initialize(opts) @name = 'haproxy' # how to restart haproxy + @restart_interval = 2 @restart_required = true + @last_restart = Time.new(0) # a place to store the parsed haproxy config from each watcher @watcher_configs = {} @@ -773,10 +775,16 @@ def write_config(new_config) # restarts haproxy def restart + # sleep if we restarted too recently + delay = (@last_restart - Time.now) + @restart_interval + sleep(delay) if delay > 0 + + # do the actual restart res = `#{opts['reload_command']}`.chomp raise "failed to reload haproxy via #{opts['reload_command']}: #{res}" unless $?.success? log.info "synapse: restarted haproxy" + @last_restart = Time.now() @restart_required = false end diff --git a/lib/synapse/rate_limiter.rb b/lib/synapse/rate_limiter.rb deleted file mode 100644 index 893f6508..00000000 --- a/lib/synapse/rate_limiter.rb +++ /dev/null @@ -1,74 +0,0 @@ -require 'json' - -module Synapse - class RateLimiter - include Logging - - def initialize(path) - @path = path - - # Our virtual clock - @time = 0 - - # Average at most one restart per minute, with the ability to burst to - # two restarts per minute - @tokens = 0 - @max_tokens = 2 - @token_period = 60 - - # Restart at most once every two seconds - @last_restart_time = 0 - @min_restart_period = 2 - - # Try reading state back from disk - unless @path.nil? - begin - data = JSON.parse(File.read(@path)) - @time = data["time"] - @last_restart_time = data["last_restart_time"] - @tokens = data["tokens"] - rescue => e - log.warn "Got error reading rate limiter state: #{e}" - end - end - end - - # Should be invoked once per second - def tick - @time += 1 - - # Add a token every @period ticks - if @time % @token_period == 0 - @tokens = [@tokens + 1, @max_tokens].min - end - - # Save state out to disk - data = { - "time" => @time, - "last_restart_time" => @last_restart_time, - "tokens" => @tokens, - } - unless @path.nil? - begin - File.open(@path, "w") do |f| - f.write(data.to_json) - end - rescue - end - end - end - - def proceed? - # Is there an available token? - if @tokens > 0 - # Has it been sufficiently long since our last restart? - if @time - @min_restart_period >= @last_restart_time - @tokens -= 1 - @last_restart_time = @time - return true - end - end - return false - end - end -end diff --git a/spec/lib/synapse/rate_limiter_spec.rb b/spec/lib/synapse/rate_limiter_spec.rb deleted file mode 100644 index 3c777aa8..00000000 --- a/spec/lib/synapse/rate_limiter_spec.rb +++ /dev/null @@ -1,109 +0,0 @@ -require 'spec_helper' - -class RateLimiter - attr_accessor :time, :tokens, :last_restart_time -end - -describe Synapse::RateLimiter do - let(:subject) { Synapse::RateLimiter.new('test') } - let(:file_like_object) { spy('file_like_object') } - - before { - allow(File).to receive(:open).with('test', 'w').and_yield(file_like_object) - allow(File).to receive(:read).and_raise(Errno::ENOENT.new) - } - - it "saves state" do - subject.time = 666 - subject.last_restart_time = 17 - subject.tokens = 42 - - subject.tick - - expect(file_like_object).to have_received(:write).with( - '{"time":667,"last_restart_time":17,"tokens":42}') - end - - it "initializes" do - expect(subject.time).to eql(0) - expect(subject.last_restart_time).to eql(0) - expect(subject.tokens).to eql(0) - end - - it "handles the nil path" do - subject = Synapse::RateLimiter.new(nil) - subject.proceed? - end - - it "loads state when available" do - allow(File).to receive(:read).and_return( - '{"time":667,"last_restart_time":17,"tokens":42}') - expect(subject.time).to eql(667) - expect(subject.last_restart_time).to eql(17) - expect(subject.tokens).to eql(42) - end - - it "yields a single token after the token period" do - 59.times do - subject.tick - end - expect(subject.time).to eql(59) - expect(subject.tokens).to eql(0) - - subject.tick - expect(subject.time).to eql(60) - expect(subject.tokens).to eql(1) - end - - it "respects the maximum token value" do - 666.times do - subject.tick - end - expect(subject.time).to eql(666) - expect(subject.tokens).to eql(2) - end - - it "does not allow client to proceed if there is no token" do - subject.time = 666 - subject.last_restart_time = 0 - subject.tokens = 0 - - expect(subject.proceed?).to eql(false) - end - - it "allows client to proceed when there is a token" do - subject.time = 666 - subject.last_restart_time = 0 - subject.tokens = 1 - - expect(subject.proceed?).to eql(true) - - expect(subject.time).to eql(666) - expect(subject.last_restart_time).to eql(666) - expect(subject.tokens).to eql(0) - end - - it "does not allow client to proceed too soon after last time" do - subject.time = 666 - subject.last_restart_time = 665 - subject.tokens = 1 - - expect(subject.proceed?).to eql(false) - - expect(subject.time).to eql(666) - expect(subject.last_restart_time).to eql(665) - expect(subject.tokens).to eql(1) - end - - it "allows client to proceed once enough time has passed" do - subject.time = 666 - subject.last_restart_time = 664 - subject.tokens = 1 - - expect(subject.proceed?).to eql(true) - - expect(subject.time).to eql(666) - expect(subject.last_restart_time).to eql(666) - expect(subject.tokens).to eql(0) - end -end From a3e84d229d8c196ccc7206755cabed8861b546a2 Mon Sep 17 00:00:00 2001 From: John Billings Date: Wed, 27 May 2015 11:21:51 -0700 Subject: [PATCH 07/14] Increase HAProxy restart interval. --- lib/synapse/haproxy.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 3f5c490d..11b78fdb 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -529,7 +529,8 @@ def initialize(opts) @name = 'haproxy' # how to restart haproxy - @restart_interval = 2 + @restart_interval = @opts.fetch('restart_interval', 2).to_i + @restart_jitter = @opts.fetch('restart_jitter', 0).to_f @restart_required = true @last_restart = Time.new(0) @@ -660,7 +661,7 @@ def generate_frontend_stanza(watcher, config) def generate_backend_stanza(watcher, config) if watcher.backends.empty? - log.warn "synapse: no backends found for watcher #{watcher.name}" + log.debug "synapse: no backends found for watcher #{watcher.name}" end stanza = [ @@ -775,8 +776,9 @@ def write_config(new_config) # restarts haproxy def restart - # sleep if we restarted too recently + # sleep with jitter if we restarted too recently delay = (@last_restart - Time.now) + @restart_interval + delay += rand(@restart_jitter * @restart_interval + 1) sleep(delay) if delay > 0 # do the actual restart From 5928c2b5f984411d8791c8dbaa4edb6bd2825ec4 Mon Sep 17 00:00:00 2001 From: John Billings Date: Thu, 28 May 2015 00:00:39 -0700 Subject: [PATCH 08/14] ZooKeeper connection pooling. --- lib/synapse/service_watcher/zookeeper.rb | 48 ++++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 80c05f68..568bd090 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -1,13 +1,18 @@ require "synapse/service_watcher/base" +require 'thread' require 'zk' module Synapse class ZookeeperWatcher < BaseWatcher NUMBERS_RE = /^\d+$/ + @@zk_pool = {} + @@zk_pool_count = {} + @@zk_pool_lock = Mutex.new + def start - @zk_hosts = @discovery['hosts'].shuffle.join(',') + @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil @zk = nil @@ -99,18 +104,47 @@ def watcher_callback def zk_cleanup log.info "synapse: zookeeper watcher cleaning up" - @watcher.unsubscribe unless @watcher.nil? - @watcher = nil - - @zk.close! unless @zk.nil? - @zk = nil + begin + @watcher.unsubscribe unless @watcher.nil? + @watcher = nil + ensure + @@zk_pool_lock.synchronize { + @@zk_pool_count[@zk_hosts] -= 1 + # Last thread to use the connection closes it + if @@zk_pool_count[@zk_hosts] == 0 + log.info "synapse: closing zk connection to #{@zk_hosts}" + begin + @zk.close! unless @zk.nil? + @zk = nil + ensure + @@zk_pool.delete(@zk_hosts) + end + end + } + end log.info "synapse: zookeeper watcher cleaned up successfully" end def zk_connect log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}" - @zk = ZK.new(@zk_hosts) + + # Ensure that all Zookeeper watcher re-use a single zookeeper + # connection to any given set of zk hosts. + @@zk_pool_lock.synchronize { + unless @@zk_pool.has_key?(@zk_hosts) + log.info "synapse: creating pooled connection to #{@zk_hosts}" + @@zk_pool[@zk_hosts] = ZK.new(@zk_hosts, :timeout => 5) + @@zk_pool_count[@zk_hosts] = 1 + log.info "synapse: successfully created zk connection to #{@zk_hosts}" + else + @@zk_pool_count[@zk_hosts] += 1 + log.info "synapse: re-using existing zookeeper connection to #{@zk_hosts}" + end + } + + @zk = @@zk_pool[@zk_hosts] + log.info "synapse: retrieved zk connection to #{@zk_hosts}" # handle session expiry -- by cleaning up zk, this will make `ping?` # fail and so synapse will exit From 4eab7e0c7cddb4506ab03e7b0a478d51691e445f Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Wed, 27 May 2015 22:00:29 -0700 Subject: [PATCH 09/14] Rate limit restarts but not stats socket updates We can allow socket updates to go through any time that the config was updated, but restarts must be rate limited for stability --- lib/synapse.rb | 9 ++++--- lib/synapse/file_output.rb | 3 +++ lib/synapse/haproxy.rb | 50 +++++++++++++++++++++++--------------- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/lib/synapse.rb b/lib/synapse.rb index 12c20aaf..39dae625 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -57,11 +57,14 @@ def run if @config_updated @config_updated = false @config_generators.each do |config_generator| - log.info "synapse: regenerating #{config_generator.name} config" + log.info "synapse: configuring #{config_generator.name}" config_generator.update_config(@service_watchers) end - else - sleep 1 + end + + sleep 1 + @config_generators.each do |config_generator| + config_generator.tick(@service_watchers) end loops += 1 diff --git a/lib/synapse/file_output.rb b/lib/synapse/file_output.rb index a10884e3..335661d7 100644 --- a/lib/synapse/file_output.rb +++ b/lib/synapse/file_output.rb @@ -24,6 +24,9 @@ def initialize(opts) @name = 'file_output' end + def tick(watchers) + end + def update_config(watchers) watchers.each do |watcher| write_backends_to_file(watcher.name, watcher.backends) diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 11b78fdb..126a057d 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -532,16 +532,26 @@ def initialize(opts) @restart_interval = @opts.fetch('restart_interval', 2).to_i @restart_jitter = @opts.fetch('restart_jitter', 0).to_f @restart_required = true - @last_restart = Time.new(0) + + # virtual clock bookkeeping for controlling how often haproxy restarts + @time = 0 + @next_restart = @time # a place to store the parsed haproxy config from each watcher @watcher_configs = {} end + def tick(watchers) + @time += 1 + # We potentially have to restart if the restart was rate limited + # in the original call to update_config + restart if @opts['do_reloads'] && @restart_required + end + def update_config(watchers) # if we support updating backends, try that whenever possible if @opts['do_socket'] - update_backends(watchers) unless @restart_required + update_backends(watchers) else @restart_required = true end @@ -667,7 +677,7 @@ def generate_backend_stanza(watcher, config) stanza = [ "\nbackend #{watcher.name}", config.map {|c| "\t#{c}"}, - watcher.backends.shuffle.map {|backend| + watcher.backends.map {|backend| backend_name = construct_name(backend) b = "\tserver #{backend_name} #{backend['host']}:#{backend['port']}" b = "#{b} cookie #{backend_name}" unless config.include?('mode tcp') @@ -709,20 +719,19 @@ def update_backends(watchers) next if watcher.backends.empty? unless cur_backends.include? watcher.name - log.debug "synapse: restart required because we added new section #{watcher.name}" + log.info "synapse: restart required because we added new section #{watcher.name}" @restart_required = true - return + next end watcher.backends.each do |backend| backend_name = construct_name(backend) - unless cur_backends[watcher.name].include? backend_name - log.debug "synapse: restart required because we have a new backend #{watcher.name}/#{backend_name}" + if cur_backends[watcher.name].include? backend_name + enabled_backends[watcher.name] << backend_name + else + log.info "synapse: restart required because we have a new backend #{watcher.name}/#{backend_name}" @restart_required = true - return end - - enabled_backends[watcher.name] << backend_name end end @@ -743,12 +752,10 @@ def update_backends(watchers) rescue StandardError => e log.warn "synapse: unknown error writing to socket" @restart_required = true - return else unless output == "\n" log.warn "synapse: socket command #{command} failed: #{output}" @restart_required = true - return end end end @@ -774,19 +781,24 @@ def write_config(new_config) end end - # restarts haproxy + # restarts haproxy if the time is right def restart - # sleep with jitter if we restarted too recently - delay = (@last_restart - Time.now) + @restart_interval - delay += rand(@restart_jitter * @restart_interval + 1) - sleep(delay) if delay > 0 + if @time < @next_restart + log.info "synapse: at time #{@time} waiting until #{@next_restart} to restart" + return + end + + @next_restart = @time + @restart_interval + @next_restart += rand(@restart_jitter * @restart_interval + 1) # do the actual restart res = `#{opts['reload_command']}`.chomp - raise "failed to reload haproxy via #{opts['reload_command']}: #{res}" unless $?.success? + unless $?.success? + log.error "failed to reload haproxy via #{opts['reload_command']}: #{res}" + return + end log.info "synapse: restarted haproxy" - @last_restart = Time.now() @restart_required = false end From 030ccfef1c0d63c037407ec3d5d7382d4bfdbe81 Mon Sep 17 00:00:00 2001 From: John Billings Date: Fri, 29 May 2015 13:09:27 -0700 Subject: [PATCH 10/14] Add state file. --- lib/synapse/haproxy.rb | 76 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 126a057d..d9edb51b 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -1,3 +1,5 @@ +require 'fileutils' +require 'json' require 'synapse/log' require 'socket' @@ -539,10 +541,27 @@ def initialize(opts) # a place to store the parsed haproxy config from each watcher @watcher_configs = {} + + @state_file_path = @opts['state_file_path'] + @state_file_ttl = @opts.fetch('state_file_ttl', 60 * 60 * 24).to_i + @seen = {} + + unless @state_file_path.nil? + begin + @seen = JSON.load(File.read(@state_file_path)) + rescue StandardError => e + # It's ok if the state file doesn't exist + end + end end def tick(watchers) + if @time % 60 == 0 && !@state_file_path.nil? + update_state_file(watchers) + end + @time += 1 + # We potentially have to restart if the restart was rate limited # in the original call to update_config restart if @opts['do_reloads'] && @restart_required @@ -670,6 +689,21 @@ def generate_frontend_stanza(watcher, config) end def generate_backend_stanza(watcher, config) + backends = {} + + # The ordering here is important. First we add all the backends in the + # disabled state... + @seen.fetch(watcher.name, []).each do |backend_name, backend| + backends[backend_name] = backend.merge('enabled' => false) + end + + # ... and then we overwite any backends that the watchers know about, + # setting the enabled state. + watcher.backends.each do |backend| + backend_name = construct_name(backend) + backends[backend_name] = backend.merge('enabled' => true) + end + if watcher.backends.empty? log.debug "synapse: no backends found for watcher #{watcher.name}" end @@ -677,11 +711,11 @@ def generate_backend_stanza(watcher, config) stanza = [ "\nbackend #{watcher.name}", config.map {|c| "\t#{c}"}, - watcher.backends.map {|backend| - backend_name = construct_name(backend) + backends.map {|backend_name, backend| b = "\tserver #{backend_name} #{backend['host']}:#{backend['port']}" b = "#{b} cookie #{backend_name}" unless config.include?('mode tcp') b = "#{b} #{watcher.haproxy['server_options']}" + b = "#{b} disabled" unless backend['enabled'] b } ] end @@ -811,5 +845,43 @@ def construct_name(backend) return name end + + def update_state_file(watchers) + log.info "synapse: writing state file" + + timestamp = Time.now.to_i + + # Remove stale backends + @seen.each do |watcher_name, backends| + backends.each do |backend_name, backend| + ts = backend.fetch('timestamp', 0) + delta = (timestamp - ts).abs + if delta > @state_file_ttl + log.info "synapse: expiring #{backend_name} with age #{delta}" + backends.delete(backend_name) + end + end + end + + # Remove any services which no longer have any backends + @seen = @seen.reject{|watcher_name, backends| backends.keys.length == 0} + + # Add backends from watchers + watchers.each do |watcher| + unless @seen.key?(watcher.name) + @seen[watcher.name] = {} + end + + watcher.backends.each do |backend| + backend_name = construct_name(backend) + @seen[watcher.name][backend_name] = backend.merge('timestamp' => timestamp) + end + end + + # Atomically write new state file + tmp_state_file_path = @state_file_path + ".tmp" + File.write(tmp_state_file_path, JSON.pretty_generate(@seen)) + FileUtils.mv(tmp_state_file_path, @state_file_path) + end end end From 36f2a948883bde2ae1bb1bd39895a0fcffa11835 Mon Sep 17 00:00:00 2001 From: John Billings Date: Tue, 16 Jun 2015 17:01:57 -0700 Subject: [PATCH 11/14] Fix bug in caching logic. --- lib/synapse/haproxy.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index d9edb51b..601b152e 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -772,7 +772,7 @@ def update_backends(watchers) # actually enable the enabled backends, and disable the disabled ones cur_backends.each do |section, backends| backends.each do |backend| - if enabled_backends[section].include? backend + if enabled_backends.fetch(section, []).include? backend command = "enable server #{section}/#{backend}\n" else command = "disable server #{section}/#{backend}\n" From 86d7f9d5c41a0fe34fc74c20bae901b4bd310f84 Mon Sep 17 00:00:00 2001 From: Tomas Doran Date: Mon, 29 Jun 2015 15:20:06 +0100 Subject: [PATCH 12/14] Add support for the weight key added in nerve --- lib/synapse/service_watcher/zookeeper.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 568bd090..056ec7a3 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -59,7 +59,7 @@ def discover node = @zk.get("#{@discovery['path']}/#{id}") begin - host, port, name = deserialize_service_instance(node.first) + host, port, name, weight = deserialize_service_instance(node.first) rescue StandardError => e log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" else @@ -70,7 +70,7 @@ def discover numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id, 'weight' => weight } end end @@ -168,8 +168,9 @@ def deserialize_service_instance(data) host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') name = decoded['name'] || nil + weight = decoded['weight'] || nil - return host, port, name + return host, port, name, weight end end end From a6a48e347bf8b6c70478c532195ab733f24e3d59 Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Sun, 12 Jul 2015 17:31:34 -0700 Subject: [PATCH 13/14] Try out :per_callback threads and get more debug information We are experiencing some very slow updates in production and I think it may be due to the connection pooling, try :per_callback threading to see if that helps. Also fixes default values for do_writes, do_reloads, do_socket --- Gemfile.lock | 6 +++--- README.md | 1 + lib/synapse/haproxy.rb | 4 ++++ lib/synapse/service_watcher/zookeeper.rb | 4 +++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 4b7377f5..b9f1d77d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -62,11 +62,11 @@ GEM slyphon-zookeeper_jar (3.3.5-java) spoon (0.0.4) ffi - zk (1.9.4) + zk (1.9.5) logging (~> 1.8.2) zookeeper (~> 1.4.0) - zookeeper (1.4.8) - zookeeper (1.4.8-java) + zookeeper (1.4.10) + zookeeper (1.4.10-java) slyphon-log4j (= 1.2.15) slyphon-zookeeper_jar (= 3.3.5) diff --git a/README.md b/README.md index 0a5e8770..2dfccb67 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,7 @@ The `haproxy` section of the config file has the following options: * `config_file_path`: where Synapse will write the HAProxy config file * `do_writes`: whether or not the config file will be written (default to `true`) * `do_reloads`: whether or not Synapse will reload HAProxy (default to `true`) +* `do_socket`: whether or not Synapse will use the HAProxy socket commands to prevent reloads (default to `true`) * `global`: options listed here will be written into the `global` section of the HAProxy config * `defaults`: options listed here will be written into the `defaults` section of the HAProxy config * `extra_sections`: additional, manually-configured `frontend`, `backend`, or `listen` stanzas diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 601b152e..5344a576 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -530,6 +530,10 @@ def initialize(opts) @opts = opts @name = 'haproxy' + @opts['do_writes'] = true unless @opts.key?('do_writes') + @opts['do_socket'] = true unless @opts.key?('do_socket') + @opts['do_reloads'] = true unless @opts.key?('do_reloads') + # how to restart haproxy @restart_interval = @opts.fetch('restart_interval', 2).to_i @restart_jitter = @opts.fetch('restart_jitter', 0).to_f diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 056ec7a3..1ae6f6b9 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -80,6 +80,7 @@ def discover # sets up zookeeper callbacks if the data at the discovery path changes def watch return if @zk.nil? + log.debug "synapse: setting watch at #{@discovery['path']}" @watcher.unsubscribe unless @watcher.nil? @watcher = @zk.register(@discovery['path'], &watcher_callback) @@ -89,6 +90,7 @@ def watch log.error "synapse: zookeeper watcher path #{@discovery['path']} does not exist!" raise RuntimeError.new('could not set a ZK watch on a node that should exist') end + log.debug "synapse: set watch at #{@discovery['path']}" end # handles the event that a watched path has changed in zookeeper @@ -134,7 +136,7 @@ def zk_connect @@zk_pool_lock.synchronize { unless @@zk_pool.has_key?(@zk_hosts) log.info "synapse: creating pooled connection to #{@zk_hosts}" - @@zk_pool[@zk_hosts] = ZK.new(@zk_hosts, :timeout => 5) + @@zk_pool[@zk_hosts] = ZK.new(@zk_hosts, :timeout => 5, :thread => :per_callback) @@zk_pool_count[@zk_hosts] = 1 log.info "synapse: successfully created zk connection to #{@zk_hosts}" else From 420440fbd4d16c6dfc0920062791e790374bdcea Mon Sep 17 00:00:00 2001 From: Joseph Lynch Date: Wed, 15 Jul 2015 13:13:54 -0700 Subject: [PATCH 14/14] Turns out it's important to handle session disconnects correctly This fixes a bug where we could have a session expiry but not fail pings because the pooling code would not actually tear down the connection --- lib/synapse/service_watcher/zookeeper.rb | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 1ae6f6b9..715a5f88 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -111,17 +111,19 @@ def zk_cleanup @watcher = nil ensure @@zk_pool_lock.synchronize { - @@zk_pool_count[@zk_hosts] -= 1 - # Last thread to use the connection closes it - if @@zk_pool_count[@zk_hosts] == 0 - log.info "synapse: closing zk connection to #{@zk_hosts}" - begin - @zk.close! unless @zk.nil? - @zk = nil - ensure - @@zk_pool.delete(@zk_hosts) + if @@zk_pool.has_key?(@zk_hosts) + @@zk_pool_count[@zk_hosts] -= 1 + # Last thread to use the connection closes it + if @@zk_pool_count[@zk_hosts] == 0 + log.info "synapse: closing zk connection to #{@zk_hosts}" + begin + @zk.close! unless @zk.nil? + ensure + @@zk_pool.delete(@zk_hosts) + end end end + @zk = nil } end