Skip to content

Commit

Permalink
synapse etcd service watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
bobtfish committed Feb 9, 2015
1 parent e012cd3 commit 15bfb29
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 47 deletions.
64 changes: 28 additions & 36 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,70 +4,62 @@ PATH
synapse (0.11.1)
aws-sdk (~> 1.39)
docker-api (~> 1.7.2)
etcd (~> 0.2.4)
zk (~> 1.9.4)

GEM
remote: https://rubygems.org/
specs:
archive-tar-minitar (0.5.2)
aws-sdk (1.47.0)
aws-sdk (1.51.0)
json (~> 1.4)
nokogiri (>= 1.4.4)
coderay (1.0.9)
diff-lcs (1.2.4)
coderay (1.1.0)
diff-lcs (1.2.5)
docker-api (1.7.6)
archive-tar-minitar
excon (>= 0.28)
json
excon (0.38.0)
ffi (1.9.3-java)
etcd (0.2.4)
mixlib-log
excon (0.39.5)
json (1.8.1)
json (1.8.1-java)
little-plugger (1.1.3)
logging (1.8.2)
little-plugger (>= 1.1.3)
multi_json (>= 1.8.4)
method_source (0.8.2)
mini_portile (0.6.0)
mixlib-log (1.6.0)
multi_json (1.10.1)
nokogiri (1.6.2.1)
nokogiri (1.6.3.1)
mini_portile (= 0.6.0)
nokogiri (1.6.2.1-java)
pry (0.9.12.2)
coderay (~> 1.0.5)
method_source (~> 0.8)
pry (0.10.1)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
pry (0.9.12.2-java)
coderay (~> 1.0.5)
method_source (~> 0.8)
slop (~> 3.4)
spoon (~> 0.0)
pry-nav (0.2.3)
pry (~> 0.9.10)
rake (10.1.1)
rspec (2.14.1)
rspec-core (~> 2.14.0)
rspec-expectations (~> 2.14.0)
rspec-mocks (~> 2.14.0)
rspec-core (2.14.5)
rspec-expectations (2.14.2)
diff-lcs (>= 1.1.3, < 2.0)
rspec-mocks (2.14.3)
slop (3.4.6)
slyphon-log4j (1.2.15)
slyphon-zookeeper_jar (3.3.5-java)
spoon (0.0.4)
ffi
pry-nav (0.2.4)
pry (>= 0.9.10, < 0.11.0)
rake (10.3.2)
rspec (3.0.0)
rspec-core (~> 3.0.0)
rspec-expectations (~> 3.0.0)
rspec-mocks (~> 3.0.0)
rspec-core (3.0.4)
rspec-support (~> 3.0.0)
rspec-expectations (3.0.4)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.0.0)
rspec-mocks (3.0.4)
rspec-support (~> 3.0.0)
rspec-support (3.0.4)
slop (3.6.0)
zk (1.9.4)
logging (~> 1.8.2)
zookeeper (~> 1.4.0)
zookeeper (1.4.8)
zookeeper (1.4.8-java)
slyphon-log4j (= 1.2.15)
slyphon-zookeeper_jar (= 3.3.5)

PLATFORMS
java
ruby

DEPENDENCIES
Expand Down
3 changes: 3 additions & 0 deletions lib/synapse/service_watcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "synapse/service_watcher/dns"
require "synapse/service_watcher/docker"
require "synapse/service_watcher/zookeeper_dns"
require "synapse/service_watcher/etcd"

module Synapse
class ServiceWatcher
Expand All @@ -15,6 +16,7 @@ class ServiceWatcher
'dns' => DnsWatcher,
'docker' => DockerWatcher,
'zookeeper_dns' => ZookeeperDnsWatcher,
'etcd' => EtcdWatcher
}

# the method which actually dispatches watcher creation requests
Expand All @@ -32,3 +34,4 @@ def self.create(name, opts, synapse)
end
end
end

154 changes: 154 additions & 0 deletions lib/synapse/service_watcher/etcd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
require "synapse/service_watcher/base"

require 'etcd'

module Synapse
class EtcdWatcher < BaseWatcher
NUMBERS_RE = /^\d+$/

def start
etcd_hosts = @discovery['host']

log.info "synapse: starting etcd watcher #{@name} @ host: #{@discovery['host']}, path: #{@discovery['path']}"
@should_exit = false
@etcd = ::Etcd.client(:host => @discovery['host'], :port => @discovery['port'])

# call the callback to bootstrap the process
discover
@synapse.reconfigure!
@watcher = Thread.new do
watch
end
end

def stop
log.warn "synapse: etcd watcher exiting"

@should_exit = true
@etcd = nil

log.info "synapse: etcd watcher cleaned up successfully"
end

def ping?
@etcd.leader
end

private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method #{@discovery['method']}" \
unless @discovery['method'] == 'etcd'
raise ArgumentError, "missing or invalid etcd host for service #{@name}" \
unless @discovery['host']
raise ArgumentError, "missing or invalid etcd port for service #{@name}" \
unless @discovery['port']
raise ArgumentError, "invalid etcd path for service #{@name}" \
unless @discovery['path']
end

# helper method that ensures that the discovery path exists
def create(path)
log.debug "synapse: creating etcd path: #{path}"
@etcd.create(path, dir: true)
end

def each_node(node)
begin
host, port, name = deserialize_service_instance(node.value)
rescue StandardError => e
log.error "synapse: invalid data in etcd node #{node.inspect} at #{@discovery['path']}: #{e} DATA #{node.value}"
nil
else
server_port = @server_port_override ? @server_port_override : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = node.key.split('/').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.warn "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
{ 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end
end

def each_dir(d)
new_backends = []
d.children.each do |node|
if node.directory?
new_backends << each_dir(@etcd.get(node.key))
else
backend = each_node(node)
if backend
new_backends << backend
end
end
end
new_backends.flatten
end

# find the current backends at the discovery path; sets @backends
def discover
log.info "synapse: discovering backends for service #{@name}"

d = nil
begin
d = @etcd.get(@discovery['path'])
rescue Etcd::KeyNotFound
create(@discovery['path'])
d = @etcd.get(@discovery['path'])
end

new_backends = []
if d.directory?
new_backends = each_dir(d)
else
log.warn "synapse: path #{@discovery['path']} is not a directory"
end

if new_backends.empty?
if @default_servers.empty?
log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}"
false
else
log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}"
@backends = @default_servers
true
end
else
if @backends != new_backends
log.info "synapse: discovered #{new_backends.length} backends (including new) for service #{@name}"
@backends = new_backends
true
else
log.info "synapse: discovered #{new_backends.length} backends for service #{@name}"
false
end
end
end

def watch
while !@should_exit
begin
@etcd.watch(@discovery['path'], :timeout => 60, :recursive => true)
rescue Timeout::Error
else
if discover
@synapse.reconfigure!
end
end
end
end

# decode the data at a zookeeper endpoint
def deserialize_service_instance(data)
log.debug "synapse: deserializing process data"
decoded = JSON.parse(data)

host = decoded['host'] || (raise ValueError, 'instance json data does not have host key')
port = decoded['port'] || (raise ValueError, 'instance json data does not have port key')
name = decoded['name'] || nil

return host, port, name
end
end
end

18 changes: 8 additions & 10 deletions spec/lib/synapse/service_watcher_ec2tags_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,37 @@ def munge_haproxy_arg(name, new_value)
# done remotely; breaking into separate calls would result in
# unnecessary data being retrieved.

subject.ec2.should_receive(:instances).and_return(instance_collection)
expect(subject.ec2).to receive(:instances).and_return(instance_collection)

instance_collection.should_receive(:tagged).with('foo').and_return(instance_collection)
instance_collection.should_receive(:tagged_values).with('bar').and_return(instance_collection)
instance_collection.should_receive(:select).and_return(instance_collection)
expect(instance_collection).to receive(:tagged).with('foo').and_return(instance_collection)
expect(instance_collection).to receive(:tagged_values).with('bar').and_return(instance_collection)
expect(instance_collection).to receive(:select).and_return(instance_collection)

subject.send(:instances_with_tags, 'foo', 'bar')
end
end

context 'returned backend data structure' do
before do
subject.stub(:instances_with_tags).and_return([instance1, instance2])
expect(subject).to receive(:instances_with_tags).and_return([instance1, instance2])
end

let(:backends) { subject.send(:discover_instances) }

it 'returns an Array of backend name/host/port Hashes' do

expect { backends.all? {|b| %w[name host port].each {|k| b.has_key?(k) }} }.to be_true
expect(backends.all? {|b| %w[name host port].each {|k| b.has_key?(k) }}).to eql(true)
end

it 'sets the backend port to server_port_override for all backends' do
backends = subject.send(:discover_instances)
expect {
backends.all? { |b| b['port'] == basic_config['haproxy']['server_port_override'] }
}.to be_true
expect(backends.all? { |b| b['port'] == basic_config['haproxy']['server_port_override'] }).to eql(true)
end
end

context 'returned instance fields' do
before do
subject.stub(:instances_with_tags).and_return([instance1])
expect(subject).to receive(:instances_with_tags).and_return([instance1])
end

let(:backend) { subject.send(:discover_instances).pop }
Expand Down
1 change: 0 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
require 'support/configuration'

RSpec.configure do |config|
config.treat_symbols_as_metadata_keys_with_true_values = true
config.run_all_when_everything_filtered = true
config.filter_run :focus
config.include Configuration
Expand Down
1 change: 1 addition & 0 deletions synapse.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "aws-sdk", "~> 1.39"
gem.add_runtime_dependency "docker-api", "~> 1.7.2"
gem.add_runtime_dependency "zk", "~> 1.9.4"
gem.add_runtime_dependency "etcd", "~> 0.2.4"

gem.add_development_dependency "rake"
gem.add_development_dependency "rspec"
Expand Down

0 comments on commit 15bfb29

Please sign in to comment.