HEX
Server: Apache
System: Linux vpshost0650.publiccloud.com.br 4.4.79-grsec-1.lc.x86_64 #1 SMP Wed Aug 2 14:18:21 -03 2017 x86_64
User: bandeirantesbomb3 (10068)
PHP: 8.0.7
Disabled: apache_child_terminate,dl,escapeshellarg,escapeshellcmd,exec,link,mail,openlog,passthru,pcntl_alarm,pcntl_exec,pcntl_fork,pcntl_get_last_error,pcntl_getpriority,pcntl_setpriority,pcntl_signal,pcntl_signal_dispatch,pcntl_sigprocmask,pcntl_sigtimedwait,pcntl_sigwaitinfo,pcntl_strerror,pcntl_wait,pcntl_waitpid,pcntl_wexitstatus,pcntl_wifexited,pcntl_wifsignaled,pcntl_wifstopped,pcntl_wstopsig,pcntl_wtermsig,php_check_syntax,php_strip_whitespace,popen,proc_close,proc_open,shell_exec,symlink,system
Upload Files
File: //proc/thread-self/root/opt/puppetlabs/puppet/lib/ruby/vendor_ruby/mcollective/rpc/client.rb
module MCollective
  module RPC
    # The main component of the Simple RPC client system, this wraps around MCollective::Client
    # and just brings in a lot of convention and standard approached.
    class Client
      attr_accessor :timeout, :verbose, :filter, :config, :progress, :ttl, :reply_to
      attr_reader :client, :stats, :ddl, :agent, :limit_targets, :limit_method, :output_format, :batch_size, :batch_sleep_time, :batch_mode
      attr_reader :discovery_options, :discovery_method, :default_discovery_method, :limit_seed

      @@initial_options = nil

      # Creates a stub for a remote agent, you can pass in an options array in the flags
      # which will then be used else it will just create a default options array with
      # filtering enabled based on the standard command line use.
      #
      #   rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
      #
      # You typically would not call this directly you'd use MCollective::RPC#rpcclient instead
      # which is a wrapper around this that can be used as a Mixin
      def initialize(agent, flags = {})
        if flags.include?(:options)
          initial_options = flags[:options]

        elsif @@initial_options
          initial_options = Marshal.load(@@initial_options)

        else
          oparser = MCollective::Optionparser.new({ :verbose => false,
                                                    :progress_bar => true,
                                                    :mcollective_limit_targets => false,
                                                    :batch_size => nil,
                                                    :batch_sleep_time => 1 },
                                                  "filter")

          initial_options = oparser.parse do |parser, opts|
            if block_given?
              yield(parser, opts)
            end

            Helpers.add_simplerpc_options(parser, opts)
          end

          @@initial_options = Marshal.dump(initial_options)
        end

        @initial_options = initial_options

        @config = initial_options[:config]
        @client = MCollective::Client.new(@initial_options)

        @stats = Stats.new
        @agent = agent
        @timeout = initial_options[:timeout] || 5
        @verbose = initial_options[:verbose]
        @filter = initial_options[:filter] || Util.empty_filter
        @discovered_agents = nil
        @progress = initial_options[:progress_bar]
        @limit_targets = initial_options[:mcollective_limit_targets]
        @limit_method = Config.instance.rpclimitmethod
        @limit_seed = initial_options[:limit_seed] || nil
        @output_format = initial_options[:output_format] || :console
        @force_direct_request = false
        @reply_to = initial_options[:reply_to]
        @discovery_method = initial_options[:discovery_method]
        if !@discovery_method
          @discovery_method = Config.instance.default_discovery_method
          @default_discovery_method = true
        else
          @default_discovery_method = false
        end
        @discovery_options = initial_options[:discovery_options] || []
        @force_display_mode = initial_options[:force_display_mode] || false

        @batch_size = initial_options[:batch_size] || Config.instance.default_batch_size
        @batch_sleep_time = Float(initial_options[:batch_sleep_time] || Config.instance.default_batch_sleep_time)
        @batch_mode = determine_batch_mode(@batch_size)

        agent_filter agent

        @discovery_timeout = @initial_options.fetch(:disctimeout, nil) || Config.instance.discovery_timeout

        @collective = @client.collective
        @ttl = initial_options[:ttl] || Config.instance.ttl
        @publish_timeout = initial_options[:publish_timeout] || Config.instance.publish_timeout
        @threaded = initial_options[:threaded] || Config.instance.threaded

        # if we can find a DDL for the service override
        # the timeout of the client so we always magically
        # wait appropriate amounts of time.
        #
        # We add the discovery timeout to the ddl supplied
        # timeout as the discovery timeout tends to be tuned
        # for local network conditions and fact source speed
        # which would other wise not be accounted for and
        # some results might get missed.
        #
        # We do this only if the timeout is the default 5
        # seconds, so that users cli overrides will still
        # get applied
        #
        # DDLs are required, failure to find a DDL is fatal
        @ddl = DDL.new(agent)
        @stats.ddl = @ddl
        @timeout = @ddl.meta[:timeout] + discovery_timeout if @timeout == 5

        # allows stderr and stdout to be overridden for testing
        # but also for web apps that might not want a bunch of stuff
        # generated to actual file handles
        if initial_options[:stderr]
          @stderr = initial_options[:stderr]
        else
          @stderr = STDERR
          @stderr.sync = true
        end

        if initial_options[:stdout]
          @stdout = initial_options[:stdout]
        else
          @stdout = STDOUT
          @stdout.sync = true
        end

        if initial_options[:stdin]
          @stdin = initial_options[:stdin]
        else
          @stdin = STDIN
        end
      end

      # Disconnects cleanly from the middleware
      def disconnect
        @client.disconnect
      end

      # Returns help for an agent if a DDL was found
      def help(template)
        @ddl.help(template)
      end

      # Creates a suitable request hash for the SimpleRPC agent.
      #
      # You'd use this if you ever wanted to take care of sending
      # requests on your own - perhaps via Client#sendreq if you
      # didn't care for responses.
      #
      # In that case you can just do:
      #
      #   msg = your_rpc.new_request("some_action", :foo => :bar)
      #   filter = your_rpc.filter
      #
      #   your_rpc.client.sendreq(msg, msg[:agent], filter)
      #
      # This will send a SimpleRPC request to the action some_action
      # with arguments :foo = :bar, it will return immediately and
      # you will have no indication at all if the request was receieved or not
      #
      # Clearly the use of this technique should be limited and done only
      # if your code requires such a thing
      def new_request(action, data)
        callerid = PluginManager["security_plugin"].callerid

        raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid)

        {:agent  => @agent,
         :action => action,
         :caller => callerid,
         :data   => data}
      end

      # For the provided arguments and action the input arguments get
      # modified by supplying any defaults provided in the DDL for arguments
      # that were not supplied in the request
      #
      # We then pass the modified arguments to the DDL for validation
      def validate_request(action, args)
        raise "No DDL found for agent %s cannot validate inputs" % @agent unless @ddl

        @ddl.set_default_input_arguments(action, args)
        @ddl.validate_rpc_request(action, args)
      end

      # Magic handler to invoke remote methods
      #
      # Once the stub is created using the constructor or the RPC#rpcclient helper you can
      # call remote actions easily:
      #
      #   ret = rpc.echo(:msg => "hello world")
      #
      # This will call the 'echo' action of the 'rpctest' agent and return the result as an array,
      # the array will be a simplified result set from the usual full MCollective::Client#req with
      # additional error codes and error text:
      #
      # {
      #   :sender => "remote.box.com",
      #   :statuscode => 0,
      #   :statusmsg => "OK",
      #   :data => "hello world"
      # }
      #
      # If :statuscode is 0 then everything went find, if it's 1 then you supplied the correct arguments etc
      # but the request could not be completed, you'll find a human parsable reason in :statusmsg then.
      #
      # Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError
      # see below for a description of those, in each case :statusmsg would be the reason for failure.
      #
      # To get access to the full result of the MCollective::Client#req calls you can pass in a block:
      #
      #   rpc.echo(:msg => "hello world") do |resp|
      #      pp resp
      #   end
      #
      # In this case resp will the result from MCollective::Client#req.  Instead of returning simple
      # text and codes as above you'll also need to handle the following exceptions:
      #
      # UnknownRPCAction - There is no matching action on the agent
      # MissingRPCData - You did not supply all the needed parameters for the action
      # InvalidRPCData - The data you did supply did not pass validation
      # UnknownRPCError - Some other error prevented the agent from running
      #
      # During calls a progress indicator will be shown of how many results we've received against
      # how many nodes were discovered, you can disable this by setting progress to false:
      #
      #   rpc.progress = false
      #
      # This supports a 2nd mode where it will send the SimpleRPC request and never handle the
      # responses.  It's a bit like UDP, it sends the request with the filter attached and you
      # only get back the requestid, you have no indication about results.
      #
      # You can invoke this using:
      #
      #   puts rpc.echo(:process_results => false)
      #
      # This will output just the request id.
      #
      # Batched processing is supported:
      #
      #   printrpc rpc.ping(:batch_size => 5)
      #
      # This will do everything exactly as normal but communicate to only 5
      # agents at a time
      def method_missing(method_name, *args, &block)
        # set args to an empty hash if nothings given
        args = args[0]
        args = {} if args.nil?

        action = method_name.to_s

        @stats.reset

        validate_request(action, args)

        # TODO(ploubser): The logic here seems poor. It implies that it is valid to
        # pass arguments where batch_mode is set to false and batch_mode > 0.
        # If this is the case we completely ignore the supplied value of batch_mode
        # and do our own thing.

        # if a global batch size is set just use that else set it
        # in the case that it was passed as an argument
        batch_mode = args.include?(:batch_size) || @batch_mode
        batch_size = args.delete(:batch_size) || @batch_size
        batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time

        # if we were given a batch_size argument thats 0 and batch_mode was
        # determined to be on via global options etc this will allow a batch_size
        # of 0 to disable or batch_mode for this call only
        batch_mode = determine_batch_mode(batch_size)

        # Handle single target requests by doing discovery and picking
        # a random node.  Then do a custom request specifying a filter
        # that will only match the one node.
        if @limit_targets
          target_nodes = pick_nodes_from_discovered(@limit_targets)
          Log.debug("Picked #{target_nodes.join(',')} as limited target(s)")

          custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block)
        elsif batch_mode
          call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block)
        else
          call_agent(action, args, options, :auto, &block)
        end
      end

      # Constructs custom requests with custom filters and discovery data
      # the idea is that this would be used in web applications where you
      # might be using a cached copy of data provided by a registration agent
      # to figure out on your own what nodes will be responding and what your
      # filter would be.
      #
      # This will help you essentially short circuit the traditional cycle of:
      #
      # mc discover / call / wait for discovered nodes
      #
      # by doing discovery however you like, contructing a filter and a list of
      # nodes you expect responses from.
      #
      # Other than that it will work exactly like a normal call, blocks will behave
      # the same way, stats will be handled the same way etcetc
      #
      # If you just wanted to contact one machine for example with a client that
      # already has other filter options setup you can do:
      #
      # puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
      #
      # This will do runonce action on just 'your.box.com', no discovery will be
      # done and after receiving just one response it will stop waiting for responses
      #
      # If direct_addressing is enabled in the config file you can provide an empty
      # hash as a filter, this will force that request to be a directly addressed
      # request which technically does not need filters.  If you try to use this
      # mode with direct addressing disabled an exception will be raise
      def custom_request(action, args, expected_agents, filter = {}, &block)
        validate_request(action, args)

        if filter == {} && !Config.instance.direct_addressing
          raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes"
        end

        @stats.reset

        custom_filter = Util.empty_filter
        custom_options = options.clone

        # merge the supplied filter with the standard empty one
        # we could just use the merge method but I want to be sure
        # we dont merge in stuff that isnt actually valid
        ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype|
          if filter.include?(ftype)
            custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten
          end
        end

        # ensure that all filters at least restrict the call to the agent we're a proxy for
        custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent)
        custom_options[:filter] = custom_filter

        # Fake out the stats discovery would have put there
        @stats.discovered_agents([expected_agents].flatten)

        # Handle fire and forget requests
        #
        # If a specific reply-to was set then from the client perspective this should
        # be a fire and forget request too since no response will ever reach us - it
        # will go to the reply-to destination
        if args[:process_results] == false || @reply_to
          return fire_and_forget_request(action, args, custom_filter)
        end

        # Now do a call pretty much exactly like in method_missing except with our own
        # options and discovery magic
        if block_given?
          call_agent(action, args, custom_options, [expected_agents].flatten) do |r|
            block.call(r)
          end
        else
          call_agent(action, args, custom_options, [expected_agents].flatten)
        end
      end

      def discovery_timeout
        return @discovery_timeout if @discovery_timeout
        return @client.discoverer.ddl.meta[:timeout]
      end

      def discovery_timeout=(timeout)
        @discovery_timeout = Float(timeout)

        # we calculate the overall timeout from the DDL of the agent and
        # the supplied discovery timeout unless someone specifically
        # specifies a timeout to the constructor
        #
        # But if we also then specifically set a discovery_timeout on the
        # agent that has to override the supplied timeout so we then
        # calculate a correct timeout based on DDL timeout and the
        # supplied discovery timeout
        @timeout = @ddl.meta[:timeout] + discovery_timeout
      end

      # Sets the discovery method.  If we change the method there are a
      # number of steps to take:
      #
      #  - set the new method
      #  - if discovery options were provided, re-set those to initially
      #    provided ones else clear them as they might now apply to a
      #    different provider
      #  - update the client options so it knows there is a new discovery
      #    method in force
      #  - reset discovery data forcing a discover on the next request
      #
      # The remaining item is the discovery timeout, we leave that as is
      # since that is the user supplied timeout either via initial options
      # or via specifically setting it on the client.
      def discovery_method=(method)
        @default_discovery_method = false
        @discovery_method = method

        if @initial_options[:discovery_options]
          @discovery_options = @initial_options[:discovery_options]
        else
          @discovery_options.clear
        end

        @client.options = options

        reset
      end

      def discovery_options=(options)
        @discovery_options = [options].flatten
        reset
      end

      # Sets the class filter
      def class_filter(klass)
        @filter["cf_class"] = @filter["cf_class"] | [klass]
        @filter["cf_class"].compact!
        reset
      end

      # Sets the fact filter
      def fact_filter(fact, value=nil, operator="=")
        return if fact.nil?
        return if fact == false

        if value.nil?
          parsed = Util.parse_fact_string(fact)
          @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
        else
          parsed = Util.parse_fact_string("#{fact}#{operator}#{value}")
          @filter["fact"] = @filter["fact"] | [parsed] unless parsed == false
        end

        @filter["fact"].compact!
        reset
      end

      # Sets the agent filter
      def agent_filter(agent)
        @filter["agent"] = @filter["agent"] | [agent]
        @filter["agent"].compact!
        reset
      end

      # Sets the identity filter
      def identity_filter(identity)
        @filter["identity"] = @filter["identity"] | [identity]
        @filter["identity"].compact!
        reset
      end

      # Set a compound filter
      def compound_filter(filter)
        @filter["compound"] = @filter["compound"] |  [Matcher.create_compound_callstack(filter)]
        reset
      end

      # Resets various internal parts of the class, most importantly it clears
      # out the cached discovery
      def reset
        @discovered_agents = nil
      end

      # Reet the filter to an empty one
      def reset_filter
        @filter = Util.empty_filter
        agent_filter @agent
      end

      # Detects data on STDIN and sets the STDIN discovery method
      #
      # IF the discovery method hasn't been explicitly overridden
      #  and we're not being run interactively,
      #  and someone has piped us some data
      #
      # Then we assume it's a discovery list - this can be either:
      #  - list of hosts in plaintext
      #  - JSON that came from another rpc or printrpc
      #
      # Then we override discovery to try to grok the data on STDIN
      def detect_and_set_stdin_discovery
        if self.default_discovery_method && !@stdin.tty? && !@stdin.eof?
          self.discovery_method = 'stdin'
          self.discovery_options = 'auto'
        end
      end

      # Does discovery based on the filters set, if a discovery was
      # previously done return that else do a new discovery.
      #
      # Alternatively if identity filters are given and none of them are
      # regular expressions then just use the provided data as discovered
      # data, avoiding discovery
      #
      # Discovery can be forced if direct_addressing is enabled by passing
      # in an array of nodes with :nodes or JSON data like those produced
      # by mcollective RPC JSON output using :json
      #
      # Will show a message indicating its doing discovery if running
      # verbose or if the :verbose flag is passed in.
      #
      # Use reset to force a new discovery
      def discover(flags={})
        flags.keys.each do |key|
          raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key)
        end

        flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose

        verbose = false unless @output_format == :console

        # flags[:nodes] and flags[:hosts] are the same thing, we should never have
        # allowed :hosts as that was inconsistent with the established terminology
        flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts)

        reset if flags[:nodes] || flags[:json]

        unless @discovered_agents
          # if either hosts or JSON is supplied try to figure out discovery data from there
          # if direct_addressing is not enabled this is a critical error as the user might
          # not have supplied filters so raise an exception
          if flags[:nodes] || flags[:json]
            raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing

            hosts = []

            if flags[:nodes]
              hosts = Helpers.extract_hosts_from_array(flags[:nodes])
            elsif flags[:json]
              hosts = Helpers.extract_hosts_from_json(flags[:json])
            end

            raise "Could not find any hosts in discovery data provided" if hosts.empty?

            @discovered_agents = hosts
            @force_direct_request = true

          else
            identity_filter_discovery_optimization
          end
        end

        # All else fails we do it the hard way using a traditional broadcast
        unless @discovered_agents
          @stats.time_discovery :start

          @client.options = options

          # if compound filters are used the only real option is to use the mc
          # discovery plugin since its the only capable of using data queries etc
          # and we do not want to degrade that experience just to allow compounds
          # on other discovery plugins the UX would be too bad raising complex sets
          # of errors etc.
          @client.discoverer.force_discovery_method_by_filter(options[:filter])

          if verbose
            actual_timeout = @client.discoverer.discovery_timeout(discovery_timeout, options[:filter])

            if actual_timeout > 0
              @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout])
            else
              @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method])
            end
          end

          # if the requested limit is a pure number and not a percent
          # and if we're configured to use the first found hosts as the
          # limit method then pass in the limit thus minimizing the amount
          # of work we do in the discover phase and speeding it up significantly
          filter = @filter.merge({'collective' => @collective})
          if @limit_method == :first and @limit_targets.is_a?(Integer)
            @discovered_agents = @client.discover(filter, discovery_timeout, @limit_targets)
          else
            @discovered_agents = @client.discover(filter, discovery_timeout)
          end

          @stderr.puts(@discovered_agents.size) if verbose

          @force_direct_request = @client.discoverer.force_direct_mode?

          @stats.time_discovery :end
        end

        @stats.discovered_agents(@discovered_agents)
        RPC.discovered(@discovered_agents)

        @discovered_agents
      end

      # Provides a normal options hash like you would get from
      # Optionparser
      def options
        {:disctimeout => discovery_timeout,
         :timeout => @timeout,
         :verbose => @verbose,
         :filter => @filter,
         :collective => @collective,
         :output_format => @output_format,
         :ttl => @ttl,
         :discovery_method => @discovery_method,
         :discovery_options => @discovery_options,
         :force_display_mode => @force_display_mode,
         :config => @config,
         :publish_timeout => @publish_timeout,
         :threaded => @threaded}
      end

      # Sets the collective we are communicating with
      def collective=(c)
        raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c)

        @collective = c
        @client.options = options
        reset
      end

      # Sets and sanity checks the limit_targets variable
      # used to restrict how many nodes we'll target
      # Limit targets can be reset by passing nil or false
      def limit_targets=(limit)
        if !limit
          @limit_targets = nil
          return
        end

        if limit.is_a?(String)
          raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/

          begin
            @limit_targets = Integer(limit)
          rescue
            @limit_targets = limit
          end
        else
          @limit_targets = Integer(limit)
        end
      end

      # Sets and sanity check the limit_method variable
      # used to determine how to limit targets if limit_targets is set
      def limit_method=(method)
        method = method.to_sym unless method.is_a?(Symbol)

        raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method)

        @limit_method = method
      end

      # Sets the batch size, if the size is set to 0 that will disable batch mode
      def batch_size=(limit)
        unless Config.instance.direct_addressing
          raise "Can only set batch size if direct addressing is supported"
        end

        validate_batch_size(limit)

        @batch_size = limit
        @batch_mode = determine_batch_mode(@batch_size)
      end

      def batch_sleep_time=(time)
        raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing

        @batch_sleep_time = Float(time)
      end

      # Pick a number of nodes from the discovered nodes
      #
      # The count should be a string that can be either
      # just a number or a percentage like 10%
      #
      # It will select nodes from the discovered list based
      # on the rpclimitmethod configuration option which can
      # be either :first or anything else
      #
      #   - :first would be a simple way to do a distance based
      #     selection
      #   - anything else will just pick one at random
      #   - if random chosen, and batch-seed set, then set srand
      #     for the generator, and reset afterwards
      def pick_nodes_from_discovered(count)
        if count =~ /%$/
          pct = Integer((discover.size * (count.to_f / 100)))
          pct == 0 ? count = 1 : count = pct
        else
          count = Integer(count)
        end

        return discover if discover.size <= count

        result = []

        if @limit_method == :first
          return discover[0, count]
        else
          # we delete from the discovered list because we want
          # to be sure there is no chance that the same node will
          # be randomly picked twice.  So we have to clone the
          # discovered list else this method will only ever work
          # once per discovery cycle and not actually return the
          # right nodes.
          haystack = discover.clone

          if @limit_seed
            haystack.sort!
            srand(@limit_seed)
          end

          count.times do
            rnd = rand(haystack.size)
            result << haystack.delete_at(rnd)
          end

          # Reset random number generator to fresh seed
          # As our seed from options is most likely short
          srand if @limit_seed
        end

        [result].flatten
      end

      def load_aggregate_functions(action, ddl)
        return nil unless ddl
        return nil unless ddl.action_interface(action).keys.include?(:aggregate)

        return Aggregate.new(ddl.action_interface(action))

      rescue => e
        Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class])
        return nil
      end

      def aggregate_reply(reply, aggregate)
        return nil unless aggregate

        aggregate.call_functions(reply)
        return aggregate
      rescue Exception => e
        Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class])
        return nil
      end

      def rpc_result_from_reply(agent, action, reply)
        senderid = reply.include?("senderid") ? reply["senderid"] : reply[:senderid]
        body = reply.include?("body") ? reply["body"] : reply[:body]
        s_code = body.include?("statuscode") ? body["statuscode"] : body[:statuscode]
        s_msg = body.include?("statusmsg") ? body["statusmsg"] : body[:statusmsg]
        data = body.include?("data") ? body["data"] : body[:data]

        Result.new(agent, action, {:sender => senderid, :statuscode => s_code, :statusmsg => s_msg, :data => data})
      end

      # for requests that do not care for results just
      # return the request id and don't do any of the
      # response processing.
      #
      # We send the :process_results flag with to the
      # nodes so they can make decisions based on that.
      #
      # Should only be called via method_missing
      def fire_and_forget_request(action, args, filter=nil)
        validate_request(action, args)

        identity_filter_discovery_optimization

        req = new_request(action.to_s, args)

        filter = options[:filter] unless filter

        message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options})
        message.reply_to = @reply_to if @reply_to

        if @force_direct_request || @client.discoverer.force_direct_mode?
          message.discovered_hosts = discover.clone
          message.type = :direct_request
        end

        client.sendreq(message, nil)
      end

      # if an identity filter is supplied and it is all strings no regex we can use that
      # as discovery data, technically the identity filter is then redundant if we are
      # in direct addressing mode and we could empty it out but this use case should
      # only really be for a few -I's on the CLI
      #
      # For safety we leave the filter in place for now, that way we can support this
      # enhancement also in broadcast mode.
      #
      # This is only needed for the 'mc' discovery method, other methods might change
      # the concept of identity to mean something else so we should pass the full
      # identity filter to them
      def identity_filter_discovery_optimization
        if options[:filter]["identity"].size > 0 && @discovery_method == "mc"
          regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size

          if regex_filters == 0
            @discovered_agents = options[:filter]["identity"].clone
            @force_direct_request = true if Config.instance.direct_addressing
          end
        end
      end

      # Calls an agent in a way very similar to call_agent but it supports batching
      # the queries to the network.
      #
      # The result sets, stats, block handling etc is all exactly like you would expect
      # from normal call_agent.
      #
      # This is used by method_missing and works only with direct addressing mode
      def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
        raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
        raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
        validate_batch_size(batch_size)

        sleep_time = Float(sleep_time)

        Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")

        @force_direct_request = true

        discovered = discover
        results = []
        respcount = 0

        if discovered.size > 0
          req = new_request(action.to_s, args)

          aggregate = load_aggregate_functions(action, @ddl)

          if @progress && !block_given?
            twirl = Progress.new
            @stdout.puts
            @stdout.print twirl.twirl(respcount, discovered.size)
          end

          if (batch_size =~ /^(\d+)%$/)
            # determine batch_size as a percentage of the discovered array's size
            batch_size = (discovered.size / 100.0 * Integer($1)).ceil
          else
            batch_size = Integer(batch_size)
          end

          @stats.requestid = nil
          processed_nodes = 0

          discovered.in_groups_of(batch_size) do |hosts|
            message = Message.new(req, nil, {:agent => @agent,
                                             :type => :direct_request,
                                             :collective => @collective,
                                             :filter => opts[:filter],
                                             :options => opts})

            # first time round we let the Message object create a request id
            # we then re-use it for future requests to keep auditing sane etc
            @stats.requestid = message.create_reqid unless @stats.requestid
            message.requestid = @stats.requestid

            message.discovered_hosts = hosts.clone.compact

            @client.req(message) do |resp|
              respcount += 1

              if block_given?
                aggregate = process_results_with_block(action, resp, block, aggregate)
              else
                @stdout.print twirl.twirl(respcount, discovered.size) if @progress

                result, aggregate = process_results_without_block(resp, action, aggregate)

                results << result
              end
            end

            if @initial_options[:sort]
              results.sort!
            end

            @stats.noresponsefrom.concat @client.stats[:noresponsefrom]
            @stats.unexpectedresponsefrom.concat @client.stats[:unexpectedresponsefrom]
            @stats.responses += @client.stats[:responses]
            @stats.blocktime += @client.stats[:blocktime] + sleep_time
            @stats.totaltime += @client.stats[:totaltime]
            @stats.discoverytime += @client.stats[:discoverytime]

            processed_nodes += hosts.length
            if (discovered.length > processed_nodes)
              sleep sleep_time
            end
          end

          @stats.aggregate_summary = aggregate.summarize if aggregate
          @stats.aggregate_failures = aggregate.failed if aggregate
        else
          @stderr.print("\nNo request sent, we did not discover any nodes.")
        end

        @stats.finish_request

        RPC.stats(@stats)

        @stdout.print("\n") if @progress

        if block_given?
          return stats
        else
          return [results].flatten
        end
      end

      # Handles traditional calls to the remote agents with full stats
      # blocks, non blocks and everything else supported.
      #
      # Other methods of calling the nodes can reuse this code by
      # for example specifying custom options and discovery data
      def call_agent(action, args, opts, disc=:auto, &block)
        # Handle fire and forget requests and make sure
        # the :process_results value is set appropriately
        #
        # specific reply-to requests should be treated like
        # fire and forget since the client will never get
        # the responses
        if args[:process_results] == false || @reply_to
          return fire_and_forget_request(action, args)
        else
          args[:process_results] = true
        end

        # Do discovery when no specific discovery array is given
        #
        # If an array is given set the force_direct_request hint that
        # will tell the message object to be a direct request one
        if disc == :auto
          discovered = discover
        else
          @force_direct_request = true if Config.instance.direct_addressing
          discovered = disc
        end

        req = new_request(action.to_s, args)

        message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
        message.discovered_hosts = discovered.clone

        results = []
        respcount = 0

        if discovered.size > 0
          message.type = :direct_request if @force_direct_request

          if @progress && !block_given?
            twirl = Progress.new
            @stdout.puts
            @stdout.print twirl.twirl(respcount, discovered.size)
          end

          aggregate = load_aggregate_functions(action, @ddl)

          @client.req(message) do |resp|
            respcount += 1

            if block_given?
              aggregate = process_results_with_block(action, resp, block, aggregate)
            else
              @stdout.print twirl.twirl(respcount, discovered.size) if @progress

              result, aggregate = process_results_without_block(resp, action, aggregate)

              results << result
            end
          end

          if @initial_options[:sort]
            results.sort!
          end

          @stats.aggregate_summary = aggregate.summarize if aggregate
          @stats.aggregate_failures = aggregate.failed if aggregate
          @stats.client_stats = @client.stats
        else
          @stderr.print("\nNo request sent, we did not discover any nodes.")
        end

        @stats.finish_request

        RPC.stats(@stats)

        @stdout.print("\n\n") if @progress

        if block_given?
          return stats
        else
          return [results].flatten
        end
      end

      # Handles result sets that has no block associated, sets fails and ok
      # in the stats object and return a hash of the response to send to the
      # caller
      def process_results_without_block(resp, action, aggregate)
        @stats.node_responded(resp[:senderid])

        result = rpc_result_from_reply(@agent, action, resp)
        aggregate = aggregate_reply(result, aggregate) if aggregate

        if result[:statuscode] == 0 || result[:statuscode] == 1
          @stats.ok if result[:statuscode] == 0
          @stats.fail if result[:statuscode] == 1
        else
          @stats.fail
        end

        [result, aggregate]
      end

      # process client requests by calling a block on each result
      # in this mode we do not do anything fancy with the result
      # objects and we raise exceptions if there are problems with
      # the data
      def process_results_with_block(action, resp, block, aggregate)
        @stats.node_responded(resp[:senderid])

        result = rpc_result_from_reply(@agent, action, resp)
        aggregate = aggregate_reply(result, aggregate) if aggregate

        @stats.ok if result[:statuscode] == 0
        @stats.fail if result[:statuscode] != 0
        @stats.time_block_execution :start

        case block.arity
          when 1
            block.call(resp)
          when 2
            block.call(resp, result)
        end

        @stats.time_block_execution :end

        return aggregate
      end

      private

      def determine_batch_mode(batch_size)
        if (batch_size != 0 && batch_size != "0")
          return true
        end

        return false
      end

      # Validate the bach_size based on the following criteria
      # batch_size is percentage string and it's more than 0 percent
      # batch_size is a string of digits
      # batch_size is of type Integer
      def validate_batch_size(batch_size)
        if (batch_size.is_a?(Integer))
          return
        elsif (batch_size.is_a?(String))
          if ((batch_size =~ /^(\d+)%$/ && Integer($1) != 0) || batch_size =~ /^(\d+)$/)
            return
          end
        end

        raise("batch_size must be an integer or match a percentage string (e.g. '24%'")
      end
    end
  end
end