# $Id: job_resource_manager.pl 1648 2008-09-17 16:00:56Z neyron $ # # The job_resource_manager script is a perl script that oar server deploys on nodes # to manage cpusets, users, job keys, ... # # Usage: # This script is deployed from the server and executed as oar on the nodes # ARGV[0] can have two different values: # - "init" : then this script must create the right cpuset and assign # corresponding cpus # - "clean" : then this script must kill all processes in the cpuset and # clean the cpuset structure # TAKTUK_HOSTNAME environment variable must be defined and must be a name # that we will be able to find in the transfered hashtable ($Cpuset variable). use Fcntl ':flock'; #use Data::Dumper; sub exit_myself($$); sub print_log($$); my $Old_umask = sprintf("%lo",umask()); umask(oct("022")); my $Cpuset; my $Log_level; # Retrieve parameters from STDIN in the "Cpuset" structure which looks like: # $Cpuset = { # job_id => id of the corresponding job # name => "cpuset name", # cpuset_path => "relative path in the cpuset FS", # nodes => hostname => [array with the content of the database cpuset field] # ssh_keys => { # public => { # file_name => "~oar/.ssh/authorized_keys" # key => "public key content" # } # private => { # file_name => "directory where to store the private key" # key => "private key content" # } # } # oar_tmp_directory => "path to the temp directory" # user => "user name" # job_user => "job user" # job_uid => "job uid for the job_user if needed" # types => hashtable with job types as keys # log_level => debug level number # } my $tmp = ""; while (){ $tmp .= $_; } $Cpuset = eval($tmp); if (!defined($Cpuset->{log_level})){ exit_myself(2,"Bad SSH hashtable transfered"); } $Log_level = $Cpuset->{log_level}; my $Cpuset_path_job; my @Cpuset_cpus; # Get the data structure only for this node if (defined($Cpuset->{cpuset_path})){ $Cpuset_path_job = $Cpuset->{cpuset_path}.'/'.$Cpuset->{name}; @Cpuset_cpus = @{$Cpuset->{nodes}->{$ENV{TAKTUK_HOSTNAME}}}; } print_log(3,"$ARGV[0]"); if ($ARGV[0] eq "init"){ # Initialize cpuset for this node # First, create the tmp oar directory if (!(((-d $Cpuset->{oar_tmp_directory}) and (-O $Cpuset->{oar_tmp_directory})) or (mkdir($Cpuset->{oar_tmp_directory})))){ exit_myself(13,"Directory $Cpuset->{oar_tmp_directory} does not exist and cannot be created"); } if (defined($Cpuset->{job_uid})){ my $prevuser = getpwuid($Cpuset->{job_uid}); system("oardodo /usr/sbin/userdel -f $prevuser") if (defined($prevuser)); my @tmp = getpwnam($Cpuset->{user}); if ($#tmp < 0){ exit_myself(15,"Cannot get information from user '$Cpuset->{user}'"); } if (system("oardodo /usr/sbin/adduser --disabled-password --gecos 'OAR temporary user' --no-create-home --force-badname --quiet --home $tmp[7] --gid $tmp[3] --shell $tmp[8] --uid $Cpuset->{job_uid} $Cpuset->{job_user}")){ exit_myself(15,"Failed to create $Cpuset->{job_user} with uid $Cpuset->{job_uid} and home $tmp[7] and group $tmp[3] and shell $tmp[8]"); } } if (defined($Cpuset_path_job)){ if (open(LOCKFILE,"> $Cpuset->{oar_tmp_directory}/job_manager_lock_file")){ flock(LOCKFILE,LOCK_EX) or exit_myself(17,"flock failed: $!"); if (system('oardodo mount -t cpuset | grep " /dev/cpuset " > /dev/null 2>&1')){ if (system('oardodo mkdir -p /dev/cpuset && oardodo mount -t cpuset none /dev/cpuset')){ exit_myself(4,"Failed to mount cpuset pseudo filesystem"); } } if (!(-d '/dev/cpuset/'.$Cpuset->{cpuset_path})){ if (system( 'oardodo mkdir -p /dev/cpuset/'.$Cpuset->{cpuset_path}.' &&'. 'oardodo chown -R oar /dev/cpuset/'.$Cpuset->{cpuset_path}.' &&'. '/bin/echo 0 | cat > /dev/cpuset/'.$Cpuset->{cpuset_path}.'/notify_on_release && '. '/bin/echo 0 | cat > /dev/cpuset/'.$Cpuset->{cpuset_path}.'/cpu_exclusive && '. 'cat /dev/cpuset/mems > /dev/cpuset/'.$Cpuset->{cpuset_path}.'/mems &&'. 'cat /dev/cpuset/cpus > /dev/cpuset/'.$Cpuset->{cpuset_path}.'/cpus' )){ exit_myself(4,"Failed to create cpuset $Cpuset->{cpuset_path}"); } } flock(LOCKFILE,LOCK_UN) or exit_myself(17,"flock failed: $!"); close(LOCKFILE); }else{ exit_myself(16,"Failed to open or create $Cpuset->{oar_tmp_directory}/job_manager_lock_file"); } my $mem = ""; my $c = 0; while (($c <= $#Cpuset_cpus)){ my $core_id=$Cpuset_cpus[$c]; my $physical_id=`cat /sys/devices/system/cpu/cpu$core_id/topology/physical_package_id`; chomp($physical_id); if ($physical_id == 1){ if (($mem eq "") or ($mem eq "1")){ $mem = "1"; }else{ $mem = "0,1"; } }else{ if (($mem eq "") or ($mem eq "0")){ $mem = "0"; }else{ $mem = "0,1"; } } $c++; } #'for c in '."@Cpuset_cpus".';do cat /sys/devices/system/cpu/cpu$c/topology/physical_package_id > /dev/cpuset/'.$Cpuset_path_job.'/mems; done && '. # Be careful with the physical_package_id. Is it corresponding to the memory banc? if (system( 'oardodo mkdir -p /dev/cpuset/'.$Cpuset_path_job.' && '. 'oardodo chown -R oar /dev/cpuset/'.$Cpuset_path_job.' && '. '/bin/echo 0 | cat > /dev/cpuset/'.$Cpuset_path_job.'/notify_on_release && '. '/bin/echo 0 | cat > /dev/cpuset/'.$Cpuset_path_job.'/cpu_exclusive && '. '/bin/echo '.$mem.' | cat > /dev/cpuset/'.$Cpuset_path_job.'/mems && '. '/bin/echo '.join(",",@Cpuset_cpus).' | cat > /dev/cpuset/'.$Cpuset_path_job.'/cpus' )){ exit_myself(5,"Failed to create and feed the cpuset $Cpuset_path_job"); } } # Copy ssh key files if ($Cpuset->{ssh_keys}->{private}->{key} ne ""){ # private key if (open(PRIV, ">".$Cpuset->{ssh_keys}->{private}->{file_name})){ chmod(0600,$Cpuset->{ssh_keys}->{private}->{file_name}); if (!print(PRIV $Cpuset->{ssh_keys}->{private}->{key})){ unlink($Cpuset->{ssh_keys}->{private}->{file_name}); exit_myself(8,"Error writing $Cpuset->{ssh_keys}->{private}->{file_name}"); } close(PRIV); if (defined($Cpuset->{job_uid})){ system("ln -s $Cpuset->{ssh_keys}->{private}->{file_name} $Cpuset->{oar_tmp_directory}/$Cpuset->{job_user}.jobkey"); } }else{ exit_myself(7,"Error opening $Cpuset->{ssh_keys}->{private}->{file_name}"); } # public key if (open(PUB,"+<",$Cpuset->{ssh_keys}->{public}->{file_name})){ flock(PUB,LOCK_EX) or exit_myself(17,"flock failed: $!"); seek(PUB,0,0) or exit_myself(18,"seek failed: $!"); my $out = "\n".$Cpuset->{ssh_keys}->{public}->{key}."\n"; while (){ if ($_ =~ /environment=\"OAR_KEY=1\"/){ # We are reading a OAR key $_ =~ /(ssh-dss|ssh-rsa)\s+([^\s^\n]+)/; my $oar_key = $2; $Cpuset->{ssh_keys}->{public}->{key} =~ /(ssh-dss|ssh-rsa)\s+([^\s^\n]+)/; my $curr_key = $2; if ($curr_key eq $oar_key){ exit_myself(13,"ERROR: the user has specified the same ssh key than used by the user oar"); } $out .= $_; }elsif ($_ =~ /environment=\"OAR_CPUSET=([\w\/]+)\"/){ # Remove from authorized keys outdated keys (typically after a reboot) if (-d "/dev/cpuset/$1"){ $out .= $_; } }else{ $out .= $_; } } if (!(seek(PUB,0,0) and print(PUB $out) and truncate(PUB,tell(PUB)))){ exit_myself(9,"Error writing $Cpuset->{ssh_keys}->{public}->{file_name}"); } flock(PUB,LOCK_UN) or exit_myself(17,"flock failed: $!"); close(PUB); }else{ unlink($Cpuset->{ssh_keys}->{private}->{file_name}); exit_myself(10,"Error opening $Cpuset->{ssh_keys}->{public}->{file_name}"); } } }elsif ($ARGV[0] eq "clean"){ # delete ssh key files if ($Cpuset->{ssh_keys}->{private}->{key} ne ""){ # private key unlink($Cpuset->{ssh_keys}->{private}->{file_name}); if (defined($Cpuset->{job_uid})){ unlink("$Cpuset->{oar_tmp_directory}/$Cpuset->{job_user}.jobkey"); } # public key if (open(PUB,"+<", $Cpuset->{ssh_keys}->{public}->{file_name})){ flock(PUB,LOCK_EX) or exit_myself(17,"flock failed: $!"); seek(PUB,0,0) or exit_myself(18,"seek failed: $!"); #Change file on the fly my $out = ""; while (){ if (($_ ne "\n") and ($_ ne $Cpuset->{ssh_keys}->{public}->{key})){ $out .= $_; } } if (!(seek(PUB,0,0) and print(PUB $out) and truncate(PUB,tell(PUB)))){ exit_myself(12,"Error changing $Cpuset->{ssh_keys}->{public}->{file_name}"); } flock(PUB,LOCK_UN) or exit_myself(17,"flock failed: $!"); close(PUB); }else{ exit_myself(11,"Error opening $Cpuset->{ssh_keys}->{public}->{file_name}"); } } # Clean cpuset on this node if (defined($Cpuset_path_job)){ system('PROCESSES=$(cat /dev/cpuset/'.$Cpuset_path_job.'/tasks) while [ "$PROCESSES" != "" ] do oardodo kill -9 $PROCESSES PROCESSES=$(cat /dev/cpuset/'.$Cpuset_path_job.'/tasks) done' ); if (system('oardodo rmdir /dev/cpuset'.$Cpuset_path_job)){ # Uncomment this line if you want to use several network_address properties # which are the same physical computer (linux kernel) #exit(0); exit_myself(6,"Failed to delete the cpuset $Cpuset_path_job"); } } if (defined($Cpuset->{job_uid})){ my $ipcrm_args=""; if (open(IPCMSG,"< /proc/sysvipc/msg")) { ; while () { if (/\s+\d+\s+(\d+)(?:\s+\d+){5}\s+$Cpuset->{job_uid}(?:\s+\d+){6}$/) { $ipcrm_args .= " -q $1"; } } close (IPCMSG); }else{ exit_myself(14,"Cannot open /proc/sysvipc/msg: $!"); } if (open(IPCSHM,"< /proc/sysvipc/shm")) { ; while () { if (/\s+\d+\s+(\d+)(?:\s+\d+){5}\s+$Cpuset->{job_uid}(?:\s+\d+){6}$/) { $ipcrm_args .= " -m $1"; } } close (IPCSHM); }else{ exit_myself(14,"Cannot open /proc/sysvipc/shm: $!"); } if (open(IPCSEM,"< /proc/sysvipc/sem")) { ; while () { if (/\s+\d+\s+(\d+)(?:\s+\d+){2}\s+$Cpuset->{job_uid}(?:\s+\d+){5}$/) { $ipcrm_args .= " -s $1"; } } close (IPCSEM); }else{ exit_myself(14,"Cannot open /proc/sysvipc/sem: $!"); } if ($ipcrm_args) { print_log(3,"Purging SysV IPC: ipcrm $ipcrm_args"); if(system("oardodo ipcrm $ipcrm_args")){ exit_myself(14,"Failed to purge IPC: ipcrm $ipcrm_args"); } } print_log(3,"Purging /tmp..."); #system("oardodo find /tmp/ -user $Cpuset->{job_user} -exec rm -rfv {} \\;"); system("oardodo find /tmp/. -user $Cpuset->{job_user} -delete"); system("oardodo /usr/sbin/userdel -f $Cpuset->{job_user}"); } }else{ exit_myself(3,"Bad command line argument $ARGV[0]"); } exit(0); # Print error message and exit sub exit_myself($$){ my $exit_code = shift; my $str = shift; warn("[job_resource_manager][$Cpuset->{job_id}][ERROR] ".$str."\n"); exit($exit_code); } # Print log message depending on the LOG_LEVEL config value sub print_log($$){ my $l = shift; my $str = shift; if ($l <= $Log_level){ print("[job_resource_manager][$Cpuset->{job_id}][DEBUG] $str\n"); } }