File: //proc/thread-self/root/opt/puppetlabs/puppet/lib/ruby/vendor_ruby/mcollective/rpc/client.rb
module MCollective
module RPC
# The main component of the Simple RPC client system, this wraps around MCollective::Client
# and just brings in a lot of convention and standard approached.
class Client
attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
attr_reader :discovery_options, :discovery_method, :default_discovery_method, :limit_seed
@@initial_options = nil
# Creates a stub for a remote agent, you can pass in an options array in the flags
# which will then be used else it will just create a default options array with
# filtering enabled based on the standard command line use.
#
# rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
#
# You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
# which is a wrapper around this that can be used as a Mixin
def initialize(agent, flags = {})
if flags.include?(:options)
initial_options = flags[:options]
elsif @@initial_options
initial_options = Marshal.load(@@initial_options)
else
oparser = MCollective::Optionparser.new({ :verbose => false,
:progress_bar => true,
:mcollective_limit_targets => false,
:batch_size => nil,
:batch_sleep_time => 1 },
"filter")
initial_options = oparser.parse do |parser, opts|
if block_given?
yield(parser, opts)
end
Helpers.add_simplerpc_options(parser, opts)
end
@@initial_options = Marshal.dump(initial_options)
end
@initial_options = initial_options
@config = initial_options[:config]
@client = MCollective::Client.new(@initial_options)
@stats = Stats.new
@agent = agent
@timeout = initial_options[:timeout] || 5
@verbose = initial_options[:verbose]
@filter = initial_options[:filter] || Util.empty_filter
@discovered_agents = nil
@progress = initial_options[:progress_bar]
@limit_targets = initial_options[:mcollective_limit_targets]
@limit_method = Config.instance.rpclimitmethod
@limit_seed = initial_options[:limit_seed] || nil
@output_format = initial_options[:output_format] || :console
@force_direct_request = false
@reply_to = initial_options[:reply_to]
@discovery_method = initial_options[:discovery_method]
if !@discovery_method
@discovery_method = Config.instance.default_discovery_method
@default_discovery_method = true
else
@default_discovery_method = false
end
@discovery_options = initial_options[:discovery_options] || []
@force_display_mode = initial_options[:force_display_mode] || false
@batch_size = initial_options[:batch_size] || Config.instance.default_batch_size
@batch_sleep_time = Float(initial_options[:batch_sleep_time] || Config.instance.default_batch_sleep_time)
@batch_mode = determine_batch_mode(@batch_size)
agent_filter agent
@discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout
@collective = @client.collective
@ttl = initial_options[:ttl] || Config.instance.ttl
@publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
@threaded = initial_options[:threaded] || Config.instance.threaded
# if we can find a DDL for the service override
# the timeout of the client so we always magically
# wait appropriate amounts of time.
#
# We add the discovery timeout to the ddl supplied
# timeout as the discovery timeout tends to be tuned
# for local network conditions and fact source speed
# which would other wise not be accounted for and
# some results might get missed.
#
# We do this only if the timeout is the default 5
# seconds, so that users cli overrides will still
# get applied
#
# DDLs are required, failure to find a DDL is fatal
@ddl = DDL.new(agent)
@stats.ddl = @ddl
@timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5
# allows stderr and stdout to be overridden for testing
# but also for web apps that might not want a bunch of stuff
# generated to actual file handles
if initial_options[:stderr]
@stderr = initial_options[:stderr]
else
@stderr = STDERR
@stderr.sync = true
end
if initial_options[:stdout]
@stdout = initial_options[:stdout]
else
@stdout = STDOUT
@stdout.sync = true
end
if initial_options[:stdin]
@stdin = initial_options[:stdin]
else
@stdin = STDIN
end
end
# Disconnects cleanly from the middleware
def disconnect
@client.disconnect
end
# Returns help for an agent if a DDL was found
def help(template)
@ddl.help(template)
end
# Creates a suitable request hash for the SimpleRPC agent.
#
# You'd use this if you ever wanted to take care of sending
# requests on your own - perhaps via Client#sendreq if you
# didn't care for responses.
#
# In that case you can just do:
#
# msg = your_rpc.new_request("some_action", :foo => :bar)
# filter = your_rpc.filter
#
# your_rpc.client.sendreq(msg, msg[:agent], filter)
#
# This will send a SimpleRPC request to the action some_action
# with arguments :foo = :bar, it will return immediately and
# you will have no indication at all if the request was receieved or not
#
# Clearly the use of this technique should be limited and done only
# if your code requires such a thing
def new_request(action, data)
callerid = PluginManager["security_plugin"].callerid
raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)
{:agent => @agent,
:action => action,
:caller => callerid,
:data => data}
end
# For the provided arguments and action the input arguments get
# modified by supplying any defaults provided in the DDL for arguments
# that were not supplied in the request
#
# We then pass the modified arguments to the DDL for validation
def validate_request(action, args)
raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl
@ddl.set_default_input_arguments(action, args)
@ddl.validate_rpc_request(action, args)
end
# Magic handler to invoke remote methods
#
# Once the stub is created using the constructor or the RPC#rpcclient helper you can
# call remote actions easily:
#
# ret = rpc.echo(:msg => "hello world")
#
# This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
# the array will be a simplified result set from the usual full MCollective::Client#req with
# additional error codes and error text:
#
# {
# :sender => "remote.box.com",
# :statuscode => 0,
# :statusmsg => "OK",
# :data => "hello world"
# }
#
# If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
# but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
#
# Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
# see below for a description of those, in each case :statusmsg would be the reason for failure.
#
# To get access to the full result of the MCollective::Client#req calls you can pass in a block:
#
# rpc.echo(:msg => "hello world") do |resp|
# pp resp
# end
#
# In this case resp will the result from MCollective::Client#req. Instead of returning simple
# text and codes as above you'll also need to handle the following exceptions:
#
# UnknownRPCAction - There is no matching action on the agent
# MissingRPCData - You did not supply all the needed parameters for the action
# InvalidRPCData - The data you did supply did not pass validation
# UnknownRPCError - Some other error prevented the agent from running
#
# During calls a progress indicator will be shown of how many results we've received against
# how many nodes were discovered, you can disable this by setting progress to false:
#
# rpc.progress = false
#
# This supports a 2nd mode where it will send the SimpleRPC request and never handle the
# responses. It's a bit like UDP, it sends the request with the filter attached and you
# only get back the requestid, you have no indication about results.
#
# You can invoke this using:
#
# puts rpc.echo(:process_results => false)
#
# This will output just the request id.
#
# Batched processing is supported:
#
# printrpc rpc.ping(:batch_size => 5)
#
# This will do everything exactly as normal but communicate to only 5
# agents at a time
def method_missing(method_name, *args, &block)
# set args to an empty hash if nothings given
args = args[0]
args = {} if args.nil?
action = method_name.to_s
@stats.reset
validate_request(action, args)
# TODO(ploubser): The logic here seems poor. It implies that it is valid to
# pass arguments where batch_mode is set to false and batch_mode > 0.
# If this is the case we completely ignore the supplied value of batch_mode
# and do our own thing.
# if a global batch size is set just use that else set it
# in the case that it was passed as an argument
batch_mode = args.include?(:batch_size) || @batch_mode
batch_size = args.delete(:batch_size) || @batch_size
batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time
# if we were given a batch_size argument thats 0 and batch_mode was
# determined to be on via global options etc this will allow a batch_size
# of 0 to disable or batch_mode for this call only
batch_mode = determine_batch_mode(batch_size)
# Handle single target requests by doing discovery and picking
# a random node. Then do a custom request specifying a filter
# that will only match the one node.
if @limit_targets
target_nodes = pick_nodes_from_discovered(@limit_targets)
Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")
custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
elsif batch_mode
call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
else
call_agent(action, args, options, :auto, &block)
end
end
# Constructs custom requests with custom filters and discovery data
# the idea is that this would be used in web applications where you
# might be using a cached copy of data provided by a registration agent
# to figure out on your own what nodes will be responding and what your
# filter would be.
#
# This will help you essentially short circuit the traditional cycle of:
#
# mc discover / call / wait for discovered nodes
#
# by doing discovery however you like, contructing a filter and a list of
# nodes you expect responses from.
#
# Other than that it will work exactly like a normal call, blocks will behave
# the same way, stats will be handled the same way etcetc
#
# If you just wanted to contact one machine for example with a client that
# already has other filter options setup you can do:
#
# puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
#
# This will do runonce action on just 'your.box.com', no discovery will be
# done and after receiving just one response it will stop waiting for responses
#
# If direct_addressing is enabled in the config file you can provide an empty
# hash as a filter, this will force that request to be a directly addressed
# request which technically does not need filters. If you try to use this
# mode with direct addressing disabled an exception will be raise
def custom_request(action, args, expected_agents, filter = {}, &block)
validate_request(action, args)
if filter == {} && !Config.instance.direct_addressing
raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
end
@stats.reset
custom_filter = Util.empty_filter
custom_options = options.clone
# merge the supplied filter with the standard empty one
# we could just use the merge method but I want to be sure
# we dont merge in stuff that isnt actually valid
["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
if filter.include?(ftype)
custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
end
end
# ensure that all filters at least restrict the call to the agent we're a proxy for
custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
custom_options[:filter] = custom_filter
# Fake out the stats discovery would have put there
@stats.discovered_agents([expected_agents].flatten)
# Handle fire and forget requests
#
# If a specific reply-to was set then from the client perspective this should
# be a fire and forget request too since no response will ever reach us - it
# will go to the reply-to destination
if args[:process_results] == false || @reply_to
return fire_and_forget_request(action, args, custom_filter)
end
# Now do a call pretty much exactly like in method_missing except with our own
# options and discovery magic
if block_given?
call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
block.call(r)
end
else
call_agent(action, args, custom_options, [expected_agents].flatten)
end
end
def discovery_timeout
return @discovery_timeout if @discovery_timeout
return @client.discoverer.ddl.meta[:timeout]
end
def discovery_timeout=(timeout)
@discovery_timeout = Float(timeout)
# we calculate the overall timeout from the DDL of the agent and
# the supplied discovery timeout unless someone specifically
# specifies a timeout to the constructor
#
# But if we also then specifically set a discovery_timeout on the
# agent that has to override the supplied timeout so we then
# calculate a correct timeout based on DDL timeout and the
# supplied discovery timeout
@timeout = @ddl.meta[:timeout] + discovery_timeout
end
# Sets the discovery method. If we change the method there are a
# number of steps to take:
#
# - set the new method
# - if discovery options were provided, re-set those to initially
# provided ones else clear them as they might now apply to a
# different provider
# - update the client options so it knows there is a new discovery
# method in force
# - reset discovery data forcing a discover on the next request
#
# The remaining item is the discovery timeout, we leave that as is
# since that is the user supplied timeout either via initial options
# or via specifically setting it on the client.
def discovery_method=(method)
@default_discovery_method = false
@discovery_method = method
if @initial_options[:discovery_options]
@discovery_options = @initial_options[:discovery_options]
else
@discovery_options.clear
end
@client.options = options
reset
end
def discovery_options=(options)
@discovery_options = [options].flatten
reset
end
# Sets the class filter
def class_filter(klass)
@filter["cf_class"] = @filter["cf_class"] | [klass]
@filter["cf_class"].compact!
reset
end
# Sets the fact filter
def fact_filter(fact, value=nil, operator="=")
return if fact.nil?
return if fact == false
if value.nil?
parsed = Util.parse_fact_string(fact)
@filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
else
parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
@filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
end
@filter["fact"].compact!
reset
end
# Sets the agent filter
def agent_filter(agent)
@filter["agent"] = @filter["agent"] | [agent]
@filter["agent"].compact!
reset
end
# Sets the identity filter
def identity_filter(identity)
@filter["identity"] = @filter["identity"] | [identity]
@filter["identity"].compact!
reset
end
# Set a compound filter
def compound_filter(filter)
@filter["compound"] = @filter["compound"] | [Matcher.create_compound_callstack(filter)]
reset
end
# Resets various internal parts of the class, most importantly it clears
# out the cached discovery
def reset
@discovered_agents = nil
end
# Reet the filter to an empty one
def reset_filter
@filter = Util.empty_filter
agent_filter @agent
end
# Detects data on STDIN and sets the STDIN discovery method
#
# IF the discovery method hasn't been explicitly overridden
# and we're not being run interactively,
# and someone has piped us some data
#
# Then we assume it's a discovery list - this can be either:
# - list of hosts in plaintext
# - JSON that came from another rpc or printrpc
#
# Then we override discovery to try to grok the data on STDIN
def detect_and_set_stdin_discovery
if self.default_discovery_method && !@stdin.tty? && !@stdin.eof?
self.discovery_method = 'stdin'
self.discovery_options = 'auto'
end
end
# Does discovery based on the filters set, if a discovery was
# previously done return that else do a new discovery.
#
# Alternatively if identity filters are given and none of them are
# regular expressions then just use the provided data as discovered
# data, avoiding discovery
#
# Discovery can be forced if direct_addressing is enabled by passing
# in an array of nodes with :nodes or JSON data like those produced
# by mcollective RPC JSON output using :json
#
# Will show a message indicating its doing discovery if running
# verbose or if the :verbose flag is passed in.
#
# Use reset to force a new discovery
def discover(flags={})
flags.keys.each do |key|
raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
end
flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose
verbose = false unless @output_format == :console
# flags[:nodes] and flags[:hosts] are the same thing, we should never have
# allowed :hosts as that was inconsistent with the established terminology
flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)
reset if flags[:nodes] || flags[:json]
unless @discovered_agents
# if either hosts or JSON is supplied try to figure out discovery data from there
# if direct_addressing is not enabled this is a critical error as the user might
# not have supplied filters so raise an exception
if flags[:nodes] || flags[:json]
raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing
hosts = []
if flags[:nodes]
hosts = Helpers.extract_hosts_from_array(flags[:nodes])
elsif flags[:json]
hosts = Helpers.extract_hosts_from_json(flags[:json])
end
raise "Could not find any hosts in discovery data provided" if hosts.empty?
@discovered_agents = hosts
@force_direct_request = true
else
identity_filter_discovery_optimization
end
end
# All else fails we do it the hard way using a traditional broadcast
unless @discovered_agents
@stats.time_discovery :start
@client.options = options
# if compound filters are used the only real option is to use the mc
# discovery plugin since its the only capable of using data queries etc
# and we do not want to degrade that experience just to allow compounds
# on other discovery plugins the UX would be too bad raising complex sets
# of errors etc.
@client.discoverer.force_discovery_method_by_filter(options[:filter])
if verbose
actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])
if actual_timeout > 0
@stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
else
@stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
end
end
# if the requested limit is a pure number and not a percent
# and if we're configured to use the first found hosts as the
# limit method then pass in the limit thus minimizing the amount
# of work we do in the discover phase and speeding it up significantly
filter = @filter.merge({'collective' => @collective})
if @limit_method == :first and @limit_targets.is_a?(Integer)
@discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets)
else
@discovered_agents = @client.discover(filter, discovery_timeout)
end
@stderr.puts(@discovered_agents.size) if verbose
@force_direct_request = @client.discoverer.force_direct_mode?
@stats.time_discovery :end
end
@stats.discovered_agents(@discovered_agents)
RPC.discovered(@discovered_agents)
@discovered_agents
end
# Provides a normal options hash like you would get from
# Optionparser
def options
{:disctimeout => discovery_timeout,
:timeout => @timeout,
:verbose => @verbose,
:filter => @filter,
:collective => @collective,
:output_format => @output_format,
:ttl => @ttl,
:discovery_method => @discovery_method,
:discovery_options => @discovery_options,
:force_display_mode => @force_display_mode,
:config => @config,
:publish_timeout => @publish_timeout,
:threaded => @threaded}
end
# Sets the collective we are communicating with
def collective=(c)
raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)
@collective = c
@client.options = options
reset
end
# Sets and sanity checks the limit_targets variable
# used to restrict how many nodes we'll target
# Limit targets can be reset by passing nil or false
def limit_targets=(limit)
if !limit
@limit_targets = nil
return
end
if limit.is_a?(String)
raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/
begin
@limit_targets = Integer(limit)
rescue
@limit_targets = limit
end
else
@limit_targets = Integer(limit)
end
end
# Sets and sanity check the limit_method variable
# used to determine how to limit targets if limit_targets is set
def limit_method=(method)
method = method.to_sym unless method.is_a?(Symbol)
raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)
@limit_method = method
end
# Sets the batch size, if the size is set to 0 that will disable batch mode
def batch_size=(limit)
unless Config.instance.direct_addressing
raise "Can only set batch size if direct addressing is supported"
end
validate_batch_size(limit)
@batch_size = limit
@batch_mode = determine_batch_mode(@batch_size)
end
def batch_sleep_time=(time)
raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing
@batch_sleep_time = Float(time)
end
# Pick a number of nodes from the discovered nodes
#
# The count should be a string that can be either
# just a number or a percentage like 10%
#
# It will select nodes from the discovered list based
# on the rpclimitmethod configuration option which can
# be either :first or anything else
#
# - :first would be a simple way to do a distance based
# selection
# - anything else will just pick one at random
# - if random chosen, and batch-seed set, then set srand
# for the generator, and reset afterwards
def pick_nodes_from_discovered(count)
if count =~ /%$/
pct = Integer((discover.size * (count.to_f / 100)))
pct == 0 ? count = 1 : count = pct
else
count = Integer(count)
end
return discover if discover.size <= count
result = []
if @limit_method == :first
return discover[0, count]
else
# we delete from the discovered list because we want
# to be sure there is no chance that the same node will
# be randomly picked twice. So we have to clone the
# discovered list else this method will only ever work
# once per discovery cycle and not actually return the
# right nodes.
haystack = discover.clone
if @limit_seed
haystack.sort!
srand(@limit_seed)
end
count.times do
rnd = rand(haystack.size)
result << haystack.delete_at(rnd)
end
# Reset random number generator to fresh seed
# As our seed from options is most likely short
srand if @limit_seed
end
[result].flatten
end
def load_aggregate_functions(action, ddl)
return nil unless ddl
return nil unless ddl.action_interface(action).keys.include?(:aggregate)
return Aggregate.new(ddl.action_interface(action))
rescue => e
Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
return nil
end
def aggregate_reply(reply, aggregate)
return nil unless aggregate
aggregate.call_functions(reply)
return aggregate
rescue Exception => e
Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
return nil
end
def rpc_result_from_reply(agent, action, reply)
senderid = reply.include?("senderid") ? reply["senderid"] : reply[:senderid]
body = reply.include?("body") ? reply["body"] : reply[:body]
s_code = body.include?("statuscode") ? body["statuscode"] : body[:statuscode]
s_msg = body.include?("statusmsg") ? body["statusmsg"] : body[:statusmsg]
data = body.include?("data") ? body["data"] : body[:data]
Result.new(agent, action, {:sender => senderid, :statuscode => s_code, :statusmsg => s_msg, :data => data})
end
# for requests that do not care for results just
# return the request id and don't do any of the
# response processing.
#
# We send the :process_results flag with to the
# nodes so they can make decisions based on that.
#
# Should only be called via method_missing
def fire_and_forget_request(action, args, filter=nil)
validate_request(action, args)
identity_filter_discovery_optimization
req = new_request(action.to_s, args)
filter = options[:filter] unless filter
message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
message.reply_to = @reply_to if @reply_to
if @force_direct_request || @client.discoverer.force_direct_mode?
message.discovered_hosts = discover.clone
message.type = :direct_request
end
client.sendreq(message, nil)
end
# if an identity filter is supplied and it is all strings no regex we can use that
# as discovery data, technically the identity filter is then redundant if we are
# in direct addressing mode and we could empty it out but this use case should
# only really be for a few -I's on the CLI
#
# For safety we leave the filter in place for now, that way we can support this
# enhancement also in broadcast mode.
#
# This is only needed for the 'mc' discovery method, other methods might change
# the concept of identity to mean something else so we should pass the full
# identity filter to them
def identity_filter_discovery_optimization
if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size
if regex_filters == 0
@discovered_agents = options[:filter]["identity"].clone
@force_direct_request = true if Config.instance.direct_addressing
end
end
end
# Calls an agent in a way very similar to call_agent but it supports batching
# the queries to the network.
#
# The result sets, stats, block handling etc is all exactly like you would expect
# from normal call_agent.
#
# This is used by method_missing and works only with direct addressing mode
def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
validate_batch_size(batch_size)
sleep_time = Float(sleep_time)
Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
@force_direct_request = true
discovered = discover
results = []
respcount = 0
if discovered.size > 0
req = new_request(action.to_s, args)
aggregate = load_aggregate_functions(action, @ddl)
if @progress && !block_given?
twirl = Progress.new
@stdout.puts
@stdout.print twirl.twirl(respcount, discovered.size)
end
if (batch_size =~ /^(\d+)%$/)
# determine batch_size as a percentage of the discovered array's size
batch_size = (discovered.size / 100.0 * Integer($1)).ceil
else
batch_size = Integer(batch_size)
end
@stats.requestid = nil
processed_nodes = 0
discovered.in_groups_of(batch_size) do |hosts|
message = Message.new(req, nil, {:agent => @agent,
:type => :direct_request,
:collective => @collective,
:filter => opts[:filter],
:options => opts})
# first time round we let the Message object create a request id
# we then re-use it for future requests to keep auditing sane etc
@stats.requestid = message.create_reqid unless @stats.requestid
message.requestid = @stats.requestid
message.discovered_hosts = hosts.clone.compact
@client.req(message) do |resp|
respcount += 1
if block_given?
aggregate = process_results_with_block(action, resp, block, aggregate)
else
@stdout.print twirl.twirl(respcount, discovered.size) if @progress
result, aggregate = process_results_without_block(resp, action, aggregate)
results << result
end
end
if @initial_options[:sort]
results.sort!
end
@stats.noresponsefrom.concat @client.stats[:noresponsefrom]
@stats.unexpectedresponsefrom.concat @client.stats[:unexpectedresponsefrom]
@stats.responses += @client.stats[:responses]
@stats.blocktime += @client.stats[:blocktime] + sleep_time
@stats.totaltime += @client.stats[:totaltime]
@stats.discoverytime += @client.stats[:discoverytime]
processed_nodes += hosts.length
if (discovered.length > processed_nodes)
sleep sleep_time
end
end
@stats.aggregate_summary = aggregate.summarize if aggregate
@stats.aggregate_failures = aggregate.failed if aggregate
else
@stderr.print("\nNo request sent, we did not discover any nodes.")
end
@stats.finish_request
RPC.stats(@stats)
@stdout.print("\n") if @progress
if block_given?
return stats
else
return [results].flatten
end
end
# Handles traditional calls to the remote agents with full stats
# blocks, non blocks and everything else supported.
#
# Other methods of calling the nodes can reuse this code by
# for example specifying custom options and discovery data
def call_agent(action, args, opts, disc=:auto, &block)
# Handle fire and forget requests and make sure
# the :process_results value is set appropriately
#
# specific reply-to requests should be treated like
# fire and forget since the client will never get
# the responses
if args[:process_results] == false || @reply_to
return fire_and_forget_request(action, args)
else
args[:process_results] = true
end
# Do discovery when no specific discovery array is given
#
# If an array is given set the force_direct_request hint that
# will tell the message object to be a direct request one
if disc == :auto
discovered = discover
else
@force_direct_request = true if Config.instance.direct_addressing
discovered = disc
end
req = new_request(action.to_s, args)
message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
message.discovered_hosts = discovered.clone
results = []
respcount = 0
if discovered.size > 0
message.type = :direct_request if @force_direct_request
if @progress && !block_given?
twirl = Progress.new
@stdout.puts
@stdout.print twirl.twirl(respcount, discovered.size)
end
aggregate = load_aggregate_functions(action, @ddl)
@client.req(message) do |resp|
respcount += 1
if block_given?
aggregate = process_results_with_block(action, resp, block, aggregate)
else
@stdout.print twirl.twirl(respcount, discovered.size) if @progress
result, aggregate = process_results_without_block(resp, action, aggregate)
results << result
end
end
if @initial_options[:sort]
results.sort!
end
@stats.aggregate_summary = aggregate.summarize if aggregate
@stats.aggregate_failures = aggregate.failed if aggregate
@stats.client_stats = @client.stats
else
@stderr.print("\nNo request sent, we did not discover any nodes.")
end
@stats.finish_request
RPC.stats(@stats)
@stdout.print("\n\n") if @progress
if block_given?
return stats
else
return [results].flatten
end
end
# Handles result sets that has no block associated, sets fails and ok
# in the stats object and return a hash of the response to send to the
# caller
def process_results_without_block(resp, action, aggregate)
@stats.node_responded(resp[:senderid])
result = rpc_result_from_reply(@agent, action, resp)
aggregate = aggregate_reply(result, aggregate) if aggregate
if result[:statuscode] == 0 || result[:statuscode] == 1
@stats.ok if result[:statuscode] == 0
@stats.fail if result[:statuscode] == 1
else
@stats.fail
end
[result, aggregate]
end
# process client requests by calling a block on each result
# in this mode we do not do anything fancy with the result
# objects and we raise exceptions if there are problems with
# the data
def process_results_with_block(action, resp, block, aggregate)
@stats.node_responded(resp[:senderid])
result = rpc_result_from_reply(@agent, action, resp)
aggregate = aggregate_reply(result, aggregate) if aggregate
@stats.ok if result[:statuscode] == 0
@stats.fail if result[:statuscode] != 0
@stats.time_block_execution :start
case block.arity
when 1
block.call(resp)
when 2
block.call(resp, result)
end
@stats.time_block_execution :end
return aggregate
end
private
def determine_batch_mode(batch_size)
if (batch_size != 0 && batch_size != "0")
return true
end
return false
end
# Validate the bach_size based on the following criteria
# batch_size is percentage string and it's more than 0 percent
# batch_size is a string of digits
# batch_size is of type Integer
def validate_batch_size(batch_size)
if (batch_size.is_a?(Integer))
return
elsif (batch_size.is_a?(String))
if ((batch_size =~ /^(\d+)%$/ && Integer($1) != 0) || batch_size =~ /^(\d+)$/)
return
end
end
raise("batch_size must be an integer or match a percentage string (e.g. '24%'")
end
end
end
end