From 596bd5ab8a9b2308f68d81ad1db57938d1e93f1d Mon Sep 17 00:00:00 2001 From: Georg Felbinger Date: Sun, 26 Apr 2015 01:20:08 +0200 Subject: [PATCH 1/4] Add watcher zookeeper_recursive: Looking for nodes in a given path, creates "zookeeper"-watcher for each node. The configuration for the created subwatcher is generated by the "zookeeper_recursive"-config, the string '#[service]' will be replaced by the name of the node. For a example-configuration see "synapse_zookeeper_recursive.yaml" --- .gitignore | 1 + config/synapse_zookeeper_recursive.yaml | 35 ++++ lib/synapse.rb | 15 ++ lib/synapse/service_watcher.rb | 2 + lib/synapse/service_watcher/zookeeper.rb | 12 +- .../service_watcher/zookeeper_recursive.rb | 129 ++++++++++++ ...ervice_watcher_zookeeper_recursive_spec.rb | 186 ++++++++++++++++++ spec/lib/synapse_spec.rb | 69 +++++++ spec/spec_helper.rb | 1 + spec/support/tree.rb | 53 +++++ 10 files changed, 500 insertions(+), 3 deletions(-) create mode 100644 config/synapse_zookeeper_recursive.yaml create mode 100644 lib/synapse/service_watcher/zookeeper_recursive.rb create mode 100644 spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb create mode 100644 spec/lib/synapse_spec.rb create mode 100644 spec/support/tree.rb diff --git a/.gitignore b/.gitignore index 842eb0fc..e1fe6b67 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,6 @@ tmp .vagrant .*sw? vendor/ +/.idea synapse.jar diff --git a/config/synapse_zookeeper_recursive.yaml b/config/synapse_zookeeper_recursive.yaml new file mode 100644 index 00000000..ca081ce9 --- /dev/null +++ b/config/synapse_zookeeper_recursive.yaml @@ -0,0 +1,35 @@ +haproxy: + reload_command: "haproxy -p /tmp/haproxy.pid -f /etc/haproxy/haproxy.cfg -sf `cat /tmp/haproxy.pid`" + config_file_path: "/etc/haproxy/haproxy.cfg" + do_writes: true + do_reloads: true + global: + - "daemon" + - "user haproxy" + - "group haproxy" + - "maxconn 4096" + - "log 127.0.0.1 local2 notice" + - "stats socket /var/run/haproxy.pid" + defaults: + - "log global" + - "mode http" + - "balance roundrobin" + - "timeout connect 5000ms" + - "timeout client 50000ms" + - "timeout server 50000ms" + shared_frontend: + - "bind 127.0.0.1:80" +services: + zookeeper_recursive: + discovery: + method: "zookeeper_recursive" + path: "/path/to/synapse" + hosts: + - "localhost:2181" + empty_backend_pool: "true" + haproxy: + server_options: "check inter 2s rise 3 fall 2" + shared_frontend: + - "acl is#[service] path_beg #[servicePath]" + - "use_backend #[service] if is#[service]" + diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..24a80dc6 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -71,6 +71,21 @@ def reconfigure! @config_updated = true end + def append_service_watcher(service_name, service_config) + watcher = ServiceWatcher.create(service_name, service_config, self) + @service_watchers << watcher + watcher.start + end + + def remove_watcher_by_name(service_name) + @service_watchers.each do |watcher| + if watcher.name == service_name + watcher.stop + @service_watchers.delete(watcher) + end + end + end + private def create_service_watchers(services={}) service_watchers =[] diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index ee05e6c2..4b23eb1c 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -4,6 +4,7 @@ require "synapse/service_watcher/dns" require "synapse/service_watcher/docker" require "synapse/service_watcher/zookeeper_dns" +require "synapse/service_watcher/zookeeper_recursive" module Synapse class ServiceWatcher @@ -15,6 +16,7 @@ class ServiceWatcher 'dns' => DnsWatcher, 'docker' => DockerWatcher, 'zookeeper_dns' => ZookeeperDnsWatcher, + 'zookeeper_recursive' => ZookeeperRecursiveWatcher } # the method which actually dispatches watcher creation requests diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 4a88e077..043b861c 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -65,13 +65,19 @@ def discover numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} - end + new_backends << {'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + end unless node[1].ephemeralOwner == 0 end if new_backends.empty? if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + log.info @discovery['empty_backend_pool'] + if @discovery['empty_backend_pool'].nil? or @discovery['empty_backend_pool']=="false" + log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" + else + log.warn "synapse: no backends and no default servers for service #{@name}; purging backends" + @backends=[] + end else log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" @backends = @default_servers diff --git a/lib/synapse/service_watcher/zookeeper_recursive.rb b/lib/synapse/service_watcher/zookeeper_recursive.rb new file mode 100644 index 00000000..687c54aa --- /dev/null +++ b/lib/synapse/service_watcher/zookeeper_recursive.rb @@ -0,0 +1,129 @@ +require "synapse/service_watcher/base" +require 'zk' +require 'thread' + +module Synapse + class ZookeeperRecursiveWatcher < BaseWatcher + + # Overriden methods start, stop, validate_discovery_opts, ping + def start + boot unless @already_started + end + + def boot + @already_started = true + log.info "#{@name}: Starting @ hosts: #{@discovery['hosts']}, path: #{@discovery['path']}, id #{self.object_id}" + setup_zk_connection + setup_haproxy_configuration + start_watching_services + end + + def stop + log.info "#{@name}: Stopping using default stop handler" + @subwatcher.each { |watcher| cleanup_service_watcher(watcher) } + @should_exit = true + end + + def validate_discovery_opts + raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ + unless @discovery['method'] == 'zookeeper_recursive' + raise ArgumentError, "missing or invalid zookeeper host for service #{@name}" \ + unless @discovery['hosts'] + raise ArgumentError, "invalid zookeeper path for service #{@name}" \ + unless @discovery['path'] + end + + def ping? + @zk && @zk.connected? + end + + # Methods for Initializing + def setup_zk_connection + @zk_hosts = @discovery['hosts'].shuffle.join(',') + @zk = ZK.new(@zk_hosts) + end + + def setup_haproxy_configuration + @haproxy_template = @haproxy.dup + #Purge the own haproxy-conf to a minimum, in order to be no haproxy-instance + @haproxy = {"listen" => @haproxy['listen']} + end + + def start_watching_services + @subwatcher = [] + create_if_not_exists(@discovery['path']) + watch_services(@discovery['path']) + end + + def create_if_not_exists(path) + log.debug "#{@name}: Creating ZK path: #{path}" + current = "" + path.split('/').drop(1).each { |node| + current += "/#{node}" + @zk.create(current) unless @zk.exists?(current) + } + end + + # Methods for running + def watch_services(path) + log.info("Watching path #{path}") + # Register each time a event is fired, since we're getting only one event per register + @zk.register(path, [:deleted, :child]) do |event| + if event.node_deleted? + cleanup_service_watcher(path) + else + watch_services(path) + end + end + + children = @zk.children(path,:watch => true).map{|child| "#{path}/#{child}"} + + persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 } + persistent_children.each{ |child| watch_services(child) } + + create_service_watcher(path) unless @subwatcher.include? path + end + + def create_service_watcher(service_path) + service_name = service_path.gsub(/[\/\.]/,"_") + service_config = { + "discovery" => { + "method"=>"zookeeper", + "path"=>"#{service_path}", + "hosts"=>@discovery['hosts'], + "empty_backend_pool"=>@discovery['empty_backend_pool'] + }, + "haproxy" => build_haproxy_section(service_name, service_path, @haproxy_template) + } + log.info "#{@name}: Creating new Service-Watcher for #{service_name}@ hosts: #{@zk_hosts}" + log.debug service_config + @subwatcher << service_path + @synapse.append_service_watcher(service_name, service_config) + end + + def build_haproxy_section(service_name, service_path, template) + new_haproxy = {} + template.each {|key, section| new_haproxy[key] = parse_section(section, service_name, service_path)} + return new_haproxy + end + + def parse_section(section, service_name, service_path) + service_url = service_path.sub(@discovery['path'],"") + if section.is_a?(String) + new_section = section.gsub(/#\[servicePath\]/,"#{service_url}").gsub(/#\[service\]/,"#{service_name}") + else + unless section.nil? || section == 0 + new_section = section.map{ |subsection| parse_section(subsection, service_name, service_path) } + end + end + new_section + end + + def cleanup_service_watcher(service_path) + service_name = service_path.gsub(/\//,"_") + log.info("#{@name}: Removing Watcher: #{service_name}") + @synapse.remove_watcher_by_name(service_name) + @subwatcher.delete(service_path) + end + end +end \ No newline at end of file diff --git a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb new file mode 100644 index 00000000..e29443af --- /dev/null +++ b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb @@ -0,0 +1,186 @@ +require 'spec_helper' + +class Synapse::ZookeeperRecursiveWatcher + attr_reader :should_exit, :default_servers + + def get_zk + @zk + end + + def get_synapse + @synapse + end + + def set_subwatcher(subwatcher) + @subwatcher = subwatcher + end +end + +describe Synapse::ZookeeperRecursiveWatcher do + let(:mocksynapse) { double() } + subject { Synapse::ZookeeperRecursiveWatcher.new(args, mocksynapse) } + let(:testargs) { + {'name' => 'foo', + 'discovery' => { + 'method' => 'zookeeper_recursive', + 'hosts' => ["localhost:2181"], + 'path' => '/foo/synapse' + }, + 'haproxy' => { + 'option_with_param' => 'has #[service] param' + } + } + } + + context "can construct normally" do + let(:args) { testargs } + it('can at least construct') { expect { subject }.not_to raise_error } + end + + def remove_discovery_arg(name) + args = testargs.clone + discovery = testargs['discovery'].clone + discovery.delete name + args['discovery'] = discovery + args + end + + context "without path argument" do + let(:args) { remove_discovery_arg "path" } + it('gots bang') { expect { subject }.to raise_error(ArgumentError, "invalid zookeeper path for service #{args['name']}") } + end + + {"path" => "invalid zookeeper path for service foo", + "hosts" => "missing or invalid zookeeper host for service foo", + "method" => "invalid discovery method "}.each do |to_remove, message| + context "without path argument" do + let(:args) { remove_discovery_arg to_remove } + it('gots bang') { expect { subject }.to raise_error(ArgumentError, message) } + end + end + + context "when watcher gets started" do + let(:args) { testargs } + before(:each) do + ZK = ZKMock + end + it('sets up the zk-client and registers for the path') { + expect(subject.get_synapse()).to receive(:append_service_watcher). + with("_foo_synapse", + {"discovery" => { "method" => "zookeeper", "path" => "/foo/synapse", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => { "option_with_param" => "has _foo_synapse param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => [] }}) + subject.start() + expect(subject.get_zk().start_successful()).to be true + } + context("when a registered event is fired") do + before(:each) do + ZK = ZKMock + expect(subject.get_synapse()).to receive(:append_service_watcher) + subject.start() + end + it('adds a new zookeeper service_watcher on child-events and discovers new services in the new directory') { + expect(subject.get_synapse()).to receive(:append_service_watcher). + with("_foo_synapse_service1", + {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => {"option_with_param" => "has _foo_synapse_service1 param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + subject.get_zk().set_children("/foo/synapse", ["service1"]) + subject.get_zk().fire_event("/foo/synapse", false) + + expect(subject.get_synapse()).to receive(:append_service_watcher). + with("_foo_synapse_service1_subservice", + {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1/subservice", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => {"option_with_param" => "has _foo_synapse_service1_subservice param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + subject.get_zk().set_children("/foo/synapse/service1", ["subservice"]) + subject.get_zk().fire_event("/foo/synapse/service1", false) + } + it('removes a service_watcher on delete-events') { + expect(subject.get_synapse()).to receive(:remove_watcher_by_name).with("_foo_synapse") + subject.get_zk().fire_event("/foo/synapse", true) + } + end + end + + context("when watcher gets stopped") do + let(:args) { testargs } + before(:each) do + subject.set_subwatcher(["service1"]) + end + it("cleans up every subwatcher") { + expect(subject.get_synapse()).to receive(:remove_watcher_by_name).with("service1") + subject.stop() + } + end + + class ZKMock + def initialize(zk_connect_string) + @initialized = true + @root = Tree.new(to_zk_node("root"), []) + @registered_paths = {} + end + + def exists?(path) + @root.find(to_zk_path(path)).nil? + end + + def create(path, *args) + @root.add_path(to_zk_path(path)) + end + + def register(path, opts={}, &block) + blocks = @registered_paths[path] || [] + blocks.push(block) + @registered_paths[path] = blocks + end + + def children(path, opts={}) + node = @root.find(to_zk_path(path)) + if node.nil? then + return Array.new + else + return node.get_children.map { |child| extract_name(child.get_value) } + end + end + + def get(path) + node = @root.find(to_zk_path(path)) + unless node.nil? + node.get_value + else + raise Exception "Node does not exist!" + end + end + + # Helper + def set_children(path, children) + @root.add_path(to_zk_path(path)) + node = @root.find(to_zk_path(path)) + children.each { |child| node.add_child(Tree.new(to_zk_node(child), [])) } + end + + def fire_event(path, is_delete_event) + blocks = @registered_paths.delete(path) + blocks.each { |block| block.call(Event.new(is_delete_event)) } + end + + def start_successful + return @initialized && @registered_paths.size > 0 + end + + def to_zk_path(path) + path.split('/').drop(1).map { |node| to_zk_node(node) } + end + + def to_zk_node(name) + data = "" + node_stat = ZKStat.new(0, name) + return [data, node_stat] + end + + def extract_name(zk_node) + return zk_node[1]['name'] + end + end + + ZKStat = Struct.new(:ephemeralOwner, :name) + Event = Struct.new(:node_deleted?) +end diff --git a/spec/lib/synapse_spec.rb b/spec/lib/synapse_spec.rb new file mode 100644 index 00000000..f5bfcbb1 --- /dev/null +++ b/spec/lib/synapse_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +class Synapse::Synapse + def initialize() end + + def set_service_watchers(watchers) + @service_watchers = watchers + end + + def get_service_watchers + @service_watchers + end + + class ServiceWatcher + def self.create(service_name, service_config, synapse) + return WatcherMock.new(service_name, false) + end + end +end + +class WatcherMock + def initialize(name, started) + @name=name + @started=started + end + def start + @started = true + end + def stop + @started = false + end + def started? + @started + end + def name + @name + end +end + +describe Synapse::Synapse do + subject { Synapse::Synapse.new() } + + context("when a watcher gets appended at runtime") do + before(:each) { + subject.set_service_watchers([]) + } + it("creates the watcher, appends it to the list and starts it") { + service_name = "serviceName" + service_config = { "foo" => "bar" } + subject.append_service_watcher(service_name, service_config) + expect(subject.get_service_watchers.length).to be(1) + expect(subject.get_service_watchers[0].started?).to be true + } + end + + context("when a watcher gets removed at runtime") do + before(:each) { + watcher1 = WatcherMock.new("watcher1", true) + watcher2 = WatcherMock.new("watcher2", true) + subject.set_service_watchers([watcher1, watcher2]) + } + it("stops the watcher and removes it from the list") { + service_name = "watcher1" + subject.remove_watcher_by_name(service_name) + expect(subject.get_service_watchers.length).to be(1) + expect(subject.get_service_watchers[0].name).to eq("watcher2") + } + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 3d187cad..1d92d656 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -7,6 +7,7 @@ require "#{File.dirname(__FILE__)}/../lib/synapse" require 'pry' require 'support/configuration' +require 'support/tree' RSpec.configure do |config| config.run_all_when_everything_filtered = true diff --git a/spec/support/tree.rb b/spec/support/tree.rb new file mode 100644 index 00000000..c365871f --- /dev/null +++ b/spec/support/tree.rb @@ -0,0 +1,53 @@ +class Tree + include Comparable + def initialize(value, children) + @value = value + @children = children + end + def get_value + @value + end + def <=>(anOther) + @value <=> anOther.get_value + end + def get_children() + @children + end + def getChild(value) + selected = @children.select{ |child| + child.get_value == value + } + return selected[0] unless selected.nil? + end + def add_child(child) + existing = getChild(child.get_value) + if existing.nil? && child.is_a?(Tree) + @children = @children.push(child) + existing = child + end + existing + end + def add_path(path) + if path.is_a?(Array) && path.size > 0 + child = add_child(Tree.new(path[0], [])) + child.add_path(path.drop(1)) + end + end + def find(path) + if path.is_a?(Array) + if path.size == 0 + return self + else + child = getChild(path[0]) + return child.find(path.drop(1)) unless child.nil? + end + end + return nil + end + def to_s + "<#{@value}>: #{@children}" + end + def inspect + to_s + end +end \ No newline at end of file From 3e2f6dcaaa33460c89209ffafedd1bfbacc6646a Mon Sep 17 00:00:00 2001 From: gfelbing Date: Mon, 27 Apr 2015 22:42:52 +0200 Subject: [PATCH 2/4] Integrated code-style comments from carsten-luckmann --- lib/synapse/service_watcher/zookeeper.rb | 2 +- .../service_watcher/zookeeper_recursive.rb | 64 +++++++------- ...ervice_watcher_zookeeper_recursive_spec.rb | 88 ++++++++++--------- spec/lib/synapse_spec.rb | 4 +- spec/support/tree.rb | 4 +- 5 files changed, 85 insertions(+), 77 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 043b861c..0803e358 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -72,7 +72,7 @@ def discover if new_backends.empty? if @default_servers.empty? log.info @discovery['empty_backend_pool'] - if @discovery['empty_backend_pool'].nil? or @discovery['empty_backend_pool']=="false" + if @discovery['empty_backend_pool'].nil? or @discovery['empty_backend_pool'] == "false" log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" else log.warn "synapse: no backends and no default servers for service #{@name}; purging backends" diff --git a/lib/synapse/service_watcher/zookeeper_recursive.rb b/lib/synapse/service_watcher/zookeeper_recursive.rb index 687c54aa..418e68fa 100644 --- a/lib/synapse/service_watcher/zookeeper_recursive.rb +++ b/lib/synapse/service_watcher/zookeeper_recursive.rb @@ -1,18 +1,18 @@ require "synapse/service_watcher/base" -require 'zk' -require 'thread' +require "zk" +require "thread" module Synapse class ZookeeperRecursiveWatcher < BaseWatcher - + # Overriden methods start, stop, validate_discovery_opts, ping def start boot unless @already_started end - + def boot @already_started = true - log.info "#{@name}: Starting @ hosts: #{@discovery['hosts']}, path: #{@discovery['path']}, id #{self.object_id}" + log.info "#{@name}: Starting @ hosts: #{@discovery["hosts"]}, path: #{@discovery["path"]}, id #{self.object_id}" setup_zk_connection setup_haproxy_configuration start_watching_services @@ -25,12 +25,12 @@ def stop end def validate_discovery_opts - raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ - unless @discovery['method'] == 'zookeeper_recursive' + raise ArgumentError, "invalid discovery method #{@discovery["method"]}" \ + unless @discovery["method"] == "zookeeper_recursive" raise ArgumentError, "missing or invalid zookeeper host for service #{@name}" \ - unless @discovery['hosts'] + unless @discovery["hosts"] raise ArgumentError, "invalid zookeeper path for service #{@name}" \ - unless @discovery['path'] + unless @discovery["path"] end def ping? @@ -39,26 +39,26 @@ def ping? # Methods for Initializing def setup_zk_connection - @zk_hosts = @discovery['hosts'].shuffle.join(',') + @zk_hosts = @discovery["hosts"].shuffle.join(",") @zk = ZK.new(@zk_hosts) end def setup_haproxy_configuration @haproxy_template = @haproxy.dup #Purge the own haproxy-conf to a minimum, in order to be no haproxy-instance - @haproxy = {"listen" => @haproxy['listen']} + @haproxy = {"listen" => @haproxy["listen"]} end def start_watching_services @subwatcher = [] - create_if_not_exists(@discovery['path']) - watch_services(@discovery['path']) + create_if_not_exists(@discovery["path"]) + watch_services(@discovery["path"]) end def create_if_not_exists(path) log.debug "#{@name}: Creating ZK path: #{path}" current = "" - path.split('/').drop(1).each { |node| + path.split("/").drop(1).each { |node| current += "/#{node}" @zk.create(current) unless @zk.exists?(current) } @@ -67,7 +67,7 @@ def create_if_not_exists(path) # Methods for running def watch_services(path) log.info("Watching path #{path}") - # Register each time a event is fired, since we're getting only one event per register + # Register each time a event is fired, since we"re getting only one event per register @zk.register(path, [:deleted, :child]) do |event| if event.node_deleted? cleanup_service_watcher(path) @@ -76,54 +76,54 @@ def watch_services(path) end end - children = @zk.children(path,:watch => true).map{|child| "#{path}/#{child}"} + children = @zk.children(path, :watch => true).map { |child| "#{path}/#{child}" } persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 } - persistent_children.each{ |child| watch_services(child) } + persistent_children.each { |child| watch_services(child) } create_service_watcher(path) unless @subwatcher.include? path end def create_service_watcher(service_path) - service_name = service_path.gsub(/[\/\.]/,"_") + service_name = service_path.gsub(/[\/\.]/, "_") service_config = { - "discovery" => { - "method"=>"zookeeper", - "path"=>"#{service_path}", - "hosts"=>@discovery['hosts'], - "empty_backend_pool"=>@discovery['empty_backend_pool'] - }, - "haproxy" => build_haproxy_section(service_name, service_path, @haproxy_template) + "discovery" => { + "method" => "zookeeper", + "path" => "#{service_path}", + "hosts" => @discovery["hosts"], + "empty_backend_pool" => @discovery["empty_backend_pool"] + }, + "haproxy" => build_haproxy_section(service_name, service_path, @haproxy_template) } log.info "#{@name}: Creating new Service-Watcher for #{service_name}@ hosts: #{@zk_hosts}" log.debug service_config @subwatcher << service_path @synapse.append_service_watcher(service_name, service_config) end - + def build_haproxy_section(service_name, service_path, template) new_haproxy = {} - template.each {|key, section| new_haproxy[key] = parse_section(section, service_name, service_path)} + template.each { |key, section| new_haproxy[key] = parse_section(section, service_name, service_path) } return new_haproxy end def parse_section(section, service_name, service_path) - service_url = service_path.sub(@discovery['path'],"") + service_url = service_path.sub(@discovery["path"], "") if section.is_a?(String) - new_section = section.gsub(/#\[servicePath\]/,"#{service_url}").gsub(/#\[service\]/,"#{service_name}") + new_section = section.gsub(/#\[servicePath\]/, "#{service_url}").gsub(/#\[service\]/, "#{service_name}") else unless section.nil? || section == 0 - new_section = section.map{ |subsection| parse_section(subsection, service_name, service_path) } + new_section = section.map { |subsection| parse_section(subsection, service_name, service_path) } end end new_section end def cleanup_service_watcher(service_path) - service_name = service_path.gsub(/\//,"_") + service_name = service_path.gsub(/\//, "_") log.info("#{@name}: Removing Watcher: #{service_name}") @synapse.remove_watcher_by_name(service_name) @subwatcher.delete(service_path) end end -end \ No newline at end of file +end diff --git a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb index e29443af..f0a7cc70 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb @@ -1,4 +1,4 @@ -require 'spec_helper' +require "spec_helper" class Synapse::ZookeeperRecursiveWatcher attr_reader :should_exit, :default_servers @@ -17,37 +17,37 @@ def set_subwatcher(subwatcher) end describe Synapse::ZookeeperRecursiveWatcher do - let(:mocksynapse) { double() } + let(:mocksynapse) { double } subject { Synapse::ZookeeperRecursiveWatcher.new(args, mocksynapse) } let(:testargs) { - {'name' => 'foo', - 'discovery' => { - 'method' => 'zookeeper_recursive', - 'hosts' => ["localhost:2181"], - 'path' => '/foo/synapse' + {"name" => "foo", + "discovery" => { + "method" => "zookeeper_recursive", + "hosts" => ["localhost:2181"], + "path" => "/foo/synapse" }, - 'haproxy' => { - 'option_with_param' => 'has #[service] param' + "haproxy" => { + "option_with_param" => "has #[service] param" } } } context "can construct normally" do let(:args) { testargs } - it('can at least construct') { expect { subject }.not_to raise_error } + it("can at least construct") { expect { subject }.not_to raise_error } end def remove_discovery_arg(name) args = testargs.clone - discovery = testargs['discovery'].clone + discovery = testargs["discovery"].clone discovery.delete name - args['discovery'] = discovery + args["discovery"] = discovery args end context "without path argument" do let(:args) { remove_discovery_arg "path" } - it('gots bang') { expect { subject }.to raise_error(ArgumentError, "invalid zookeeper path for service #{args['name']}") } + it("gots bang") { expect { subject }.to raise_error(ArgumentError, "invalid zookeeper path for service #{args["name"]}") } end {"path" => "invalid zookeeper path for service foo", @@ -55,7 +55,7 @@ def remove_discovery_arg(name) "method" => "invalid discovery method "}.each do |to_remove, message| context "without path argument" do let(:args) { remove_discovery_arg to_remove } - it('gots bang') { expect { subject }.to raise_error(ArgumentError, message) } + it("gots bang") { expect { subject }.to raise_error(ArgumentError, message) } end end @@ -64,38 +64,38 @@ def remove_discovery_arg(name) before(:each) do ZK = ZKMock end - it('sets up the zk-client and registers for the path') { - expect(subject.get_synapse()).to receive(:append_service_watcher). - with("_foo_synapse", - {"discovery" => { "method" => "zookeeper", "path" => "/foo/synapse", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, - "haproxy" => { "option_with_param" => "has _foo_synapse param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => [] }}) - subject.start() - expect(subject.get_zk().start_successful()).to be true + it("sets up the zk-client and registers for the path") { + expect(subject.get_synapse).to receive(:append_service_watcher). + with("_foo_synapse", + {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, + "haproxy" => {"option_with_param" => "has _foo_synapse param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + subject.start + expect(subject.get_zk.start_successful).to be true } context("when a registered event is fired") do before(:each) do ZK = ZKMock - expect(subject.get_synapse()).to receive(:append_service_watcher) - subject.start() + expect(subject.get_synapse).to receive(:append_service_watcher) + subject.start end - it('adds a new zookeeper service_watcher on child-events and discovers new services in the new directory') { - expect(subject.get_synapse()).to receive(:append_service_watcher). + it("adds a new zookeeper service_watcher on child-events and discovers new services in the new directory") { + expect(subject.get_synapse).to receive(:append_service_watcher). with("_foo_synapse_service1", {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, "haproxy" => {"option_with_param" => "has _foo_synapse_service1 param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) - subject.get_zk().set_children("/foo/synapse", ["service1"]) - subject.get_zk().fire_event("/foo/synapse", false) + subject.get_zk.set_children("/foo/synapse", ["service1"]) + subject.get_zk.fire_event("/foo/synapse", false) - expect(subject.get_synapse()).to receive(:append_service_watcher). + expect(subject.get_synapse).to receive(:append_service_watcher). with("_foo_synapse_service1_subservice", {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1/subservice", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, "haproxy" => {"option_with_param" => "has _foo_synapse_service1_subservice param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) - subject.get_zk().set_children("/foo/synapse/service1", ["subservice"]) - subject.get_zk().fire_event("/foo/synapse/service1", false) + subject.get_zk.set_children("/foo/synapse/service1", ["subservice"]) + subject.get_zk.fire_event("/foo/synapse/service1", false) } - it('removes a service_watcher on delete-events') { - expect(subject.get_synapse()).to receive(:remove_watcher_by_name).with("_foo_synapse") - subject.get_zk().fire_event("/foo/synapse", true) + it("removes a service_watcher on delete-events") { + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse") + subject.get_zk.fire_event("/foo/synapse", true) } end end @@ -106,12 +106,22 @@ def remove_discovery_arg(name) subject.set_subwatcher(["service1"]) end it("cleans up every subwatcher") { - expect(subject.get_synapse()).to receive(:remove_watcher_by_name).with("service1") - subject.stop() + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("service1") + subject.stop } end class ZKMock + ZKMock::ZKStat = Struct.new(:ephemeralOwner, :name) + class ZKMock::ZKEvent + def initialize(is_delete_event) + @is_delete_event = is_delete_event + end + def node_deleted? + return @is_delete_event + end + end + def initialize(zk_connect_string) @initialized = true @root = Tree.new(to_zk_node("root"), []) @@ -159,7 +169,7 @@ def set_children(path, children) def fire_event(path, is_delete_event) blocks = @registered_paths.delete(path) - blocks.each { |block| block.call(Event.new(is_delete_event)) } + blocks.each { |block| block.call(ZKEvent.new(is_delete_event)) } end def start_successful @@ -167,7 +177,7 @@ def start_successful end def to_zk_path(path) - path.split('/').drop(1).map { |node| to_zk_node(node) } + path.split("/").drop(1).map { |node| to_zk_node(node) } end def to_zk_node(name) @@ -177,10 +187,8 @@ def to_zk_node(name) end def extract_name(zk_node) - return zk_node[1]['name'] + return zk_node[1]["name"] end end - ZKStat = Struct.new(:ephemeralOwner, :name) - Event = Struct.new(:node_deleted?) end diff --git a/spec/lib/synapse_spec.rb b/spec/lib/synapse_spec.rb index f5bfcbb1..3b7871ea 100644 --- a/spec/lib/synapse_spec.rb +++ b/spec/lib/synapse_spec.rb @@ -49,7 +49,7 @@ def name service_config = { "foo" => "bar" } subject.append_service_watcher(service_name, service_config) expect(subject.get_service_watchers.length).to be(1) - expect(subject.get_service_watchers[0].started?).to be true + expect(subject.get_service_watchers.first.started?).to be true } end @@ -63,7 +63,7 @@ def name service_name = "watcher1" subject.remove_watcher_by_name(service_name) expect(subject.get_service_watchers.length).to be(1) - expect(subject.get_service_watchers[0].name).to eq("watcher2") + expect(subject.get_service_watchers.first.name).to eq("watcher2") } end end \ No newline at end of file diff --git a/spec/support/tree.rb b/spec/support/tree.rb index c365871f..27bb68d6 100644 --- a/spec/support/tree.rb +++ b/spec/support/tree.rb @@ -28,14 +28,14 @@ def add_child(child) existing end def add_path(path) - if path.is_a?(Array) && path.size > 0 + if path.is_a?(Array) && !path.empty? child = add_child(Tree.new(path[0], [])) child.add_path(path.drop(1)) end end def find(path) if path.is_a?(Array) - if path.size == 0 + if path.empty? return self else child = getChild(path[0]) From 2d037da56d6a4365153e4f51eb601a9a278e3781 Mon Sep 17 00:00:00 2001 From: gfelbing Date: Tue, 5 May 2015 08:54:25 +0200 Subject: [PATCH 3/4] Service watcher will only be created for nodes without persistent children, due to random behaviour of haproxy with non-disjoint acl criteria --- lib/synapse/service_watcher/zookeeper_recursive.rb | 8 +++++++- .../synapse/service_watcher_zookeeper_recursive_spec.rb | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/synapse/service_watcher/zookeeper_recursive.rb b/lib/synapse/service_watcher/zookeeper_recursive.rb index 418e68fa..c5c9f4f8 100644 --- a/lib/synapse/service_watcher/zookeeper_recursive.rb +++ b/lib/synapse/service_watcher/zookeeper_recursive.rb @@ -81,7 +81,12 @@ def watch_services(path) persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 } persistent_children.each { |child| watch_services(child) } - create_service_watcher(path) unless @subwatcher.include? path + if (!@subwatcher.include?(path) && persistent_children.empty?) then + create_service_watcher(path) + end + if (@subwatcher.include?(path) && !persistent_children.empty?) then + cleanup_service_watcher(path) + end end def create_service_watcher(service_path) @@ -109,6 +114,7 @@ def build_haproxy_section(service_name, service_path, template) def parse_section(section, service_name, service_path) service_url = service_path.sub(@discovery["path"], "") + service_url = "/" if service_url.empty? if section.is_a?(String) new_section = section.gsub(/#\[servicePath\]/, "#{service_url}").gsub(/#\[service\]/, "#{service_name}") else diff --git a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb index f0a7cc70..4c979273 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb @@ -84,12 +84,14 @@ def remove_discovery_arg(name) {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, "haproxy" => {"option_with_param" => "has _foo_synapse_service1 param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) subject.get_zk.set_children("/foo/synapse", ["service1"]) + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse") subject.get_zk.fire_event("/foo/synapse", false) expect(subject.get_synapse).to receive(:append_service_watcher). with("_foo_synapse_service1_subservice", {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1/subservice", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, "haproxy" => {"option_with_param" => "has _foo_synapse_service1_subservice param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse_service1") subject.get_zk.set_children("/foo/synapse/service1", ["subservice"]) subject.get_zk.fire_event("/foo/synapse/service1", false) } From 0e19ab07a81a417a1fb51a88a03aea7a686a88b1 Mon Sep 17 00:00:00 2001 From: gfelbing Date: Tue, 26 May 2015 15:23:46 +0200 Subject: [PATCH 4/4] Excluded base path from service watcher --- lib/synapse/service_watcher/zookeeper_recursive.rb | 13 ++++++++----- .../service_watcher_zookeeper_recursive_spec.rb | 13 +++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/synapse/service_watcher/zookeeper_recursive.rb b/lib/synapse/service_watcher/zookeeper_recursive.rb index c5c9f4f8..c5cfd43d 100644 --- a/lib/synapse/service_watcher/zookeeper_recursive.rb +++ b/lib/synapse/service_watcher/zookeeper_recursive.rb @@ -81,11 +81,14 @@ def watch_services(path) persistent_children = children.select { |child| @zk.get("#{child}")[1].ephemeralOwner == 0 } persistent_children.each { |child| watch_services(child) } - if (!@subwatcher.include?(path) && persistent_children.empty?) then - create_service_watcher(path) - end - if (@subwatcher.include?(path) && !persistent_children.empty?) then - cleanup_service_watcher(path) + + unless (path == @discovery["path"]) + if (!@subwatcher.include?(path) && persistent_children.empty?) then + create_service_watcher(path) + end + if (@subwatcher.include?(path) && !persistent_children.empty?) then + cleanup_service_watcher(path) + end end end diff --git a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb index 4c979273..7863f315 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_recursive_spec.rb @@ -65,17 +65,12 @@ def remove_discovery_arg(name) ZK = ZKMock end it("sets up the zk-client and registers for the path") { - expect(subject.get_synapse).to receive(:append_service_watcher). - with("_foo_synapse", - {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, - "haproxy" => {"option_with_param" => "has _foo_synapse param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) subject.start expect(subject.get_zk.start_successful).to be true } context("when a registered event is fired") do before(:each) do ZK = ZKMock - expect(subject.get_synapse).to receive(:append_service_watcher) subject.start end it("adds a new zookeeper service_watcher on child-events and discovers new services in the new directory") { @@ -84,7 +79,6 @@ def remove_discovery_arg(name) {"discovery" => {"method" => "zookeeper", "path" => "/foo/synapse/service1", "hosts" => ["localhost:2181"], "empty_backend_pool" => nil}, "haproxy" => {"option_with_param" => "has _foo_synapse_service1 param", "server_options" => "", "server_port_override" => nil, "backend" => [], "frontend" => [], "listen" => []}}) subject.get_zk.set_children("/foo/synapse", ["service1"]) - expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse") subject.get_zk.fire_event("/foo/synapse", false) expect(subject.get_synapse).to receive(:append_service_watcher). @@ -96,8 +90,11 @@ def remove_discovery_arg(name) subject.get_zk.fire_event("/foo/synapse/service1", false) } it("removes a service_watcher on delete-events") { - expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse") - subject.get_zk.fire_event("/foo/synapse", true) + expect(subject.get_synapse).to receive(:append_service_watcher) + expect(subject.get_synapse).to receive(:remove_watcher_by_name).with("_foo_synapse_service1") + subject.get_zk.set_children("/foo/synapse", ["service1"]) + subject.get_zk.fire_event("/foo/synapse", false) + subject.get_zk.fire_event("/foo/synapse/service1", true) } end end