#!/usr/bin/env ruby

require "#{File.dirname(__FILE__)}/../../cluster-home/lib/ruby/tungsten"

class TungstenMultipleTrepctl
  include TungstenScript
  
  def main
    begin
      # Collect records from all trepctl locations filtering out
      # replication services that don't match --roles or --services if given
      records = load_services({
        "role" => opt(:limit_roles),
        "serviceName" => opt(:limit_services)
      })
      
      if command() == "backups"
        raise "The backups command has not been implemented"
      elsif command() == "db"
        raise "The db command has not been implemented"
      elsif command() == "masterof"
        raise "The masterof command has not been implemented"
      elsif command() == "sql"
        raise "The sql command has not been implemented"
      elsif command() == "run"
        #raise "This command has not been implemented"
        records = collect_trepctl_output_from_services(records, @trepctl_command_parts.join(" "), 0)
        
        records.each{|rec|
          if rec[:command_output] == nil
            rec["commandResult"] = 1
            rec["commandMessage"] = "No response was received for this service"
          elsif rec[:command_output].is_a?(CommandError)
            rec["commandResult"] = rec[:command_output].rc
            rec["commandMessage"] = rec[:command_output].result
          elsif rec[:command_output].is_a?(StandardError)
            rec["commandResult"] = 1
            rec["commandMessage"] = rec[:command_output].message
          else
            rec["commandResult"] = 0
            rec["commandMessage"] = rec[:command_output].to_s()
          end
        }
        
        # Sort records by the fields passed in on --sort-by
        records = sort_records(records, opt(:sort_by))
        
        # Output the matching records
        output_records(records, opt(:fields), opt(:format))
      elsif command() == "list"
        # Sort records by the fields passed in on --sort-by
        records = sort_records(records, opt(:sort_by))

        # Output the matching records
        output_records(records, opt(:fields), opt(:format))
      end
    rescue => e
      raise e
    end
  end
  
  # Run the command in parallel on each replication service collecting the output
  def collect_trepctl_output_from_services(records, command, timeout = 5)
    pids = []
    results = []
    
    # Run the trepctl services -full -json command on each matching
    # location and wait for the output
    records.each_index{
      |idx|
      
      results[idx] = Tempfile.new('ssh')
      pids << fork {
        rec = records[idx]
        loc = @trepctl_locations[rec[:loc_id]]
        
        options = []
        if loc.has_key?(:rmi_port)
          options << "-port #{loc[:rmi_port]}"
        end
        
        begin
          Timeout::timeout(timeout) do
            results[idx].write(TU.ssh_result("#{loc[:directory]}/tungsten/tungsten-replicator/bin/trepctl -service #{rec['serviceName']} #{options.join(" ")} #{command}", loc[:host], TI.user()))
          end
        rescue Timeout::Error => te
          results[loc_id].write(Marshal.dump(MessageError.new("The replicator at #{loc[:host]}:#{loc[:directory]} is taking too long to respond")))
        rescue RemoteCommandError => rce
          results[idx].write(Marshal.dump(rce))
        rescue CommandError => ce
          results[idx].write(Marshal.dump(ce))
        end
      }
    }
    # Wait for each thread to finish   
    pids.each{|pid| Process.waitpid(pid) }
    
    results.each_index{
      |idx|
      file = results[idx]
      file.rewind()
      results[idx] = file.read()
      
      begin
        exception = Marshal.load(results[idx])
        if exception.is_a?(StandardError)
          results[idx] = exception
        end
      rescue
      end
      
      records[idx][:command_output] = results[idx]
    }
    
    records
  end
  
  # Run the command in parallel on each trepctl location collecting the output
  def collect_trepctl_output_from_locations(command, timeout = 5)
    pids = []
    results = {}
    
    # Run the trepctl services -full -json command on each matching
    # location and wait for the output
    @trepctl_locations.each{
      |loc_id,loc|
      
      results[loc_id] = Tempfile.new('ssh')
      pids << fork {
        options = []
        if loc.has_key?(:rmi_port)
          options << "-port #{loc[:rmi_port]}"
        end
        
        begin
          Timeout::timeout(timeout) do
            results[loc_id].write(TU.ssh_result("#{loc[:directory]}/tungsten/tungsten-replicator/bin/trepctl #{options.join(" ")} #{command}", loc[:host], TI.user()))
          end
        rescue Timeout::Error => te
          results[loc_id].write(Marshal.dump(MessageError.new("The replicator at #{loc[:host]}:#{loc[:directory]} is taking too long to respond")))
        rescue RemoteCommandError => rce
          results[loc_id].write(Marshal.dump(rce))
        rescue CommandError => ce
          results[loc_id].write(Marshal.dump(ce))
        end
      }
    }
    # Wait for each thread to finish   
    pids.each{|pid| Process.waitpid(pid) }
    
    results.each{
      |loc_id, file|
      file.rewind()
      results[loc_id] = file.read()
      
      begin
        exception = Marshal.load(results[loc_id])
        if exception.is_a?(StandardError)
          results[loc_id] = exception
        end
      rescue
      end
    }
    
    results
  end
  
  # Load the status for all matching replication services and return them
  # as an array
  def load_services(filters = {})
    services = []
    filters = prepare_filters(filters)
    results = collect_trepctl_output_from_locations("services -full -json")
    
    # Parse the JSON output from each return value and add matching
    # replication services to the services array
    results.each{
      |loc_id, output|
      host = @trepctl_locations[loc_id][:host]
      dir = @trepctl_locations[loc_id][:directory]
      begin
        if output.is_a?(MessageError)
          TU.debug(output.message)
          next
        elsif output.is_a?(StandardError)
          TU.debug("Unable to load information from #{host}:#{dir}")
          next
        end
        
        host_trepctl_services = JSON.parse(output)
      rescue JSON::ParserError => pe
        raise "Unable to parse the response from #{host}:#{dir}"
      end
      
      unless host_trepctl_services.instance_of?(Array)
        raise "Unable to parse the response from #{host}:#{dir}"
      end
      
      host_trepctl_services.each{
        |svc|
      
        # Only add it to the final array if it matches the filters
        if matches_filters?(svc, filters) == true
          services << svc.merge({
            :loc_id => loc_id,
            "host" => @trepctl_locations[loc_id][:host],
            "directory" => @trepctl_locations[loc_id][:directory]
          })
        end
      }
    }
    
    services
  end
  
  # Turn each filter value into an array for evaluation
  def prepare_filters(filters = {})
    filters.each_key{
      |key|
      if filters[key].is_a?(String)
        filters[key] = filters[key].split(",")
      end
    }
    
    filters
  end
  
  def matches_filters?(record, filters = {})
    matches_filters = true
    filters.each{
      |key,values|
      if values == nil
        next
      end
      
      if values.is_a?(Array)
        unless values.include?(record[key])
          matches_filters = false
        end
      else
        unless values == record[key]
          matches_filters = false
        end
      end
    }
    
    matches_filters
  end
  
  def output_records(records, fields, format)
    if fields.is_a?(String)
      fields = fields.split(",")
    end
    
    if fields.size() == 0
      raise "Unable to print output because no fields have been requested"
    end
    
    if format == "info"
      output_table(build_table_rows(records, fields, opt(:hide_headers)), fields)
    elsif format == "tab"
      # Display the fields from each record as tab-delimited lines
      build_table_rows(records, fields, opt(:hide_headers)).each{|row|
        puts row.join("\t")
      }
    elsif format == "list"
      is_first = true
      build_table_rows(records, fields, true).each{|row|
        if is_first == false
          print ";"
        end
        print row.join(",")
        is_first = false
      }
    elsif format == "name"
      build_table_rows(records, fields, true).each{|row|
        puts row.join(" ")
      }
    elsif format == "yaml"
      is_first = true
      build_table_rows(records, fields, true).each{|row|
        if is_first == false
          print "---\n"
        end
        
        row.each_index{|idx|
          if row[idx].is_a?(Float)
            format = "%s: %.3f\n"
          elsif row[idx].is_a?(Integer)
            format = "%s: %s\n"
          elsif column_format(fields[idx]) == "block"
            format = "%s: |\n  %s\n"
          else
            format = "%s: %s\n"
          end
          printf(format, fields[idx], row[idx])
        }
        
        is_first = false
      }
      puts "...\n"
    elsif format == "json"
      # Display the fields from each record as a JSON array of hashes
      puts JSON.pretty_generate(records.map{|rec|
        if fields.include?("*")
          rec.dup().delete_if{
            |key,value|
            (key.is_a?(String) != true)
          }
        else
          to_display = {}
          fields.each{|field| to_display[field] = get_record_value(rec, field)}
          to_display
        end
      })
    end
  end
  
  def build_table_rows(records, fields, hide_headers)
    table_rows = []
    
    unless hide_headers == true
      table_rows << fields.dup()
    end
    
    # Display the fields from each record as a table
    records.each{|rec|
      to_display = []
      fields.each{|field| to_display << get_record_value(rec, field)}
      
      table_rows << to_display.dup()
    }
    
    table_rows
  end
  
  def output_table(table_rows, fields)
    fields_max_length = []
    
    # Determine the maximum length for each column
    table_rows.each{|row|
      row.each_index{|idx|
        length = row[idx].to_s().length()
        
        if fields_max_length[idx] == nil || fields_max_length[idx] < length
          fields_max_length[idx] = length
        end
      }
    }
    
    # Print a line for each row
    table_rows.each{|row|
      row.each_index{|idx|
        if row[idx].is_a?(Float)
          format = "| %#{fields_max_length[idx]}.3f "
        elsif row[idx].is_a?(Integer)
          format = "| %#{fields_max_length[idx]}s "
        else
          format = "| %-#{fields_max_length[idx]}s "
        end
        printf(format, row[idx])
      }
      print "|\n"
    }
  end
  
  def sort_records(records, sort_keys)
    if sort_keys.is_a?(String)
      sort_keys = sort_keys.split(",")
    end
    
    # Create a sort index for each record based on sort_keys
    records.each{
      |rec|
      rec[:aliases] = {}
      rec.keys().each{
        |key|
        if key.is_a?(String)
          rec[:aliases][key.downcase()] = rec[key]
        end
      }
      
      get_field_aliases().each{
        |dest,src|
        if rec.has_key?(src)
          rec[:aliases][dest] = rec[src]
        end
      }
      
      sort_values = []
      sort_keys.each{|key|
        sort_values << get_record_value(rec, key).to_s()
      }
      rec[:sort] = sort_values
    }
    
    records.sort { |a, b|
      unless a[:sort].is_a?(Array)
        a[:sort] = []
      end
      unless b[:sort].is_a?(Array)
        b[:sort] = []
      end
      (a[:sort] <=> b[:sort])
    }
  end
  
  def get_record_value(rec, key)
    value = rec[key]
    if value == nil
      value = rec[:aliases][key]
    end
    
    # Make sure numeric values are returned as numbers
    if value =~ /^(-)?[\d]+(\.[\d]+){0,1}$/
      if column_format(key) == "float"
        value.to_f()
      else
        value.to_i()
      end
    else
      value
    end
  end
  
  def configure
    super()
    description("Run the trepctl command across many file locations and hosts.<br>
<br>
The script will automatically find all instances of Tungsten Replicator running on the current box. It will use those locations to discover additional servers that it will attempt to contact. You may override this behavior with the --paths and --hosts options.<br>
<br>
Examples:<br>
$> multi_trepctl<br>
$> multi_trepctl list --fields=host,service,latency,channels<br>
$> multi_trepctl offline<br>
$> multi_trepctl online<br>
$> multi_trepctl -- online -skip-seqno 5<br>")

    add_option(:hosts, {
      :on => ["--hosts String", "--host String"],
      :help => "The hosts to check replicators on"
    })
    
    add_option(:paths, {
      :on => ["--paths String", "--path String"],
      :help => "The Tungsten directories to check"
    })
    
    add_option(:limit_roles, {
      :on => ["--roles String", "--role String"],
      :help => "The replication service roles to limit on"
    })
    
    add_option(:limit_services, {
      :on => ["--services String", "--service String"],
      :help => "The replication service names to limit on"
    })
    
    add_option(:fields, {
      :on => "--fields String",
      :help => "The status fields to display in the output"
    })
    
    add_option(:sort_by, {
      :on => "--sort-by String",
      :default => "host,role,serviceName",
      :help => "Sort output by these fields"
    })
    
    add_option(:by_service, {
      :on => "--by-service",
      :default => false,
      :help => "Sort output by replication service"
    }) {
      opt(:sort_by, "serviceName,role,host")
      nil
    }
    
    add_option(:hide_headers, {
      :on => "--hide-headers",
      :default => false,
      :help => "Suppress headers in output",
      :aliases => ["--skip-headers", "--no-headers"]
    }) {
      opt(:hide_headers, true)
      nil
    }
    
    add_option(:format, {
      :on => "--output String",
      :help => "Display format for record output (info|tab|list|name|json|yaml)"
    })
    
    add_field_alias("serviceName", "service")
    add_field_alias("appliedLastSeqno", "seqno")
    add_field_alias("appliedLatency", "latency")
    add_field_alias("commandResult", "rc")
    add_field_alias("commandMessage", "output")
    
    column_format("latency", "float")
    column_format("appliedlatency", "float")
    column_format("relativelatency", "float")
    column_format("timeinstateseconds", "float")
    column_format("uptimeseconds", "float")
    column_format("commandmessage", "block")
    column_format("output", "block")
    
    
    #add_command(:backups, {
    #  :help => "List available backups for each replication service"
    #})
    #add_command(:db, {
    #  :help => "Display the status of trep_commit_seqno for each replication service"
    #})
    #add_command(:masterof, {
    #  :help => "Display the master for each replication service"
    #})
    #add_command(:sql, {
    #  :help => "Run the SQL command against the database server for each replication service"
    #})
    add_command(:run, {
      :help => "Run the trepctl command against the database server for each replication service"
    })
    add_command(:list, {
      :help => "Display information about each replication service"
    })
  end
  
  def validate
    if TI && TI.use_tpm?() != true
      TU.error("Unable to run the multi_trepctl script in a directory installed with tungsten-installer")
      return
    end
    
    super()
    
    check_directories = []
    opt(:paths).to_s().split(",").each{
      |dir|
      dir_parts = dir.split(":")
      if dir_parts.size() == 1
        check_directories << {
          :directory => dir_parts[0]
        }
      elsif dir_parts.size() == 2
        check_directories << {
          :directory => dir_parts[0],
          :rmi_port => dir_parts[1]
        }
      else
        TU.error("Each directory to check must be valid path and include the RMI port.")
      end
    }
    
    if check_directories.size() == 0
      running_replicators = TU.cmd_result("ps -eo args | grep \"Tungsten Replicator\"").split("\n")
      running_replicators.each{
        |ps|
        match = ps.match(/^([a-zA-Z\-_\.\/]*)\/releases/)
        unless match == nil
          check_directories << {
            :directory => match[1]
          }
        end
      }
    else
      check_directories.collect!{|dir|
        if dir[:directory] == "self"
          dir[:directory] = TI.root()
        end
        
        dir
      }
    end
    
    if check_directories.size() == 0
      check_directories = [{
        :directory => TI.root()
      }]
    end
    
    check_hosts = opt(:hosts).to_s().split(",")
    
    if check_hosts.size() == 0
      check_directories.each{
        |dir|
        ds_list = TU.cmd_result("#{dir[:directory]}/tungsten/tools/tpm query dataservices | awk -F \":\" '{print $1}' | tr -d \" \"")
        ds_list.split("\n").each{
          |ds|
          begin
            value_key = "dataservices.#{ds}.dataservice_replication_members"
            host_list_raw = TU.cmd_result("#{dir[:directory]}/tungsten/tools/tpm query values #{value_key}")
            host_list = JSON.parse(host_list_raw)
            check_hosts = check_hosts + host_list[value_key].split(",")
          rescue CommandError
          end
        }
        check_hosts.uniq!()
      }
    else
      check_hosts.collect!{|host|
        if host == "self"
          TI.hostname()
        else
          host
        end
      }
    end
    
    if check_hosts.size() == 0
      check_hosts = [TI.hostname()]
    end
    
    # Identify the combinations of check_hosts and check_directories that
    # actually contain a replicator
    pids = []
    results = []
    idx = 0
    check_hosts.each{
      |host|
      check_directories.each{
        |dir|

        results[idx] = Tempfile.new('tpm')
        pids[idx] = fork {
          begin
            present = TU.ssh_result("test -f #{dir[:directory]}/tungsten/tungsten-replicator/bin/trepctl", host, TI.user())
            results[idx].write(Marshal.dump(dir.merge({:host => host})))
          rescue RemoteCommandError
            TU.debug("Unable to find a replicator at #{host}:#{dir[:directory]}")
          rescue CommandError
            TU.debug("Unable to find a replicator at #{host}:#{dir[:directory]}")
          end
        }
        idx = idx+1
      }
    }
    pids.each{|pid| Process.waitpid(pid) }

    @trepctl_locations = {}
    results.each{
      |file|
      file.rewind()

      begin
        result = Marshal.load(file.read())
        loc_id = TU.to_identifier("#{result[:host]}_#{result[:directory]}")
        @trepctl_locations[loc_id] = result
      rescue TypeError => te
      rescue ArgumentError => ae
      end
    }
    
    if command() == nil
      if (TU.remaining_arguments() + TU.extra_arguments()).size() > 0
        @command = "run"
      else
        @command = "list"
      end
    end
    
    if opt(:fields) == nil
      if command() == "backups"
      elsif command() == "db"
      elsif command() == "masterof"
      elsif command() == "sql"
      elsif command() == "run"
        opt(:fields, "host,serviceName,output")
      else
        opt(:fields, "host,serviceName,role,state,appliedLastSeqno,appliedLatency")
      end
    end
    
    opt(:fields, opt(:fields).downcase())
    opt(:sort_by, opt(:sort_by).downcase())
    
    if command() == "backups"
      raise "The backups command has not been implemented"
    elsif command() == "db"
      raise "The db command has not been implemented"
    elsif command() == "masterof"
      raise "The masterof command has not been implemented"
    elsif command() == "sql"
      raise "The sql command has not been implemented"
    elsif command() == "run"
      @trepctl_command_parts = TU.remaining_arguments() + TU.extra_arguments()
      
      fields = opt(:fields).split(",")
      unless fields.include?("commandmessage") || fields.include?("output")
        fields << "output"
        opt(:fields, fields.join(","))
      end
    elsif command() == "list"
    end
    
    if opt(:format) == nil
      if command() == "backups"
      elsif command() == "db"
      elsif command() == "masterof"
      elsif command() == "sql"
      elsif command() == "run"
        opt(:format, "yaml")
      else
        opt(:format, "info")
      end
    end
    
    unless ["info","tab","list","name","json", "yaml"].include?(opt(:format))
      TU.error("The specified value for --output is invalid. Valid values are info, tab, list, name, yaml and json.")
    end
  end
  
  def add_field_alias(src, dest)
    @field_aliases ||= {}
    @field_aliases[dest] = src
  end
  
  def get_field_aliases()
    @field_aliases || {}
  end
  
  def column_format(src, format = nil)
    @column_formats ||= {}
    if format != nil
      @column_formats[src] = format
    end
    @column_formats[src]
  end
  
  def require_command?
    false
  end
  
  self.new().run()
end