2828cfnconfig_file = '/opt/parallelcluster/cfnconfig'
2929
3030
31- def load_scheduler_module (scheduler ):
31+ def _load_scheduler_module (scheduler ):
32+ """
33+ Load scheduler module, containing scheduler specific functions.
34+
35+ :param scheduler: scheduler name, it must corresponds to the <scheduler>.py file in the current folder.
36+ :return: the scheduler module
37+ """
3238 scheduler = 'jobwatcher.plugins.' + scheduler
3339 _scheduler = __import__ (scheduler )
3440 _scheduler = sys .modules [scheduler ]
@@ -38,7 +44,15 @@ def load_scheduler_module(scheduler):
3844 return _scheduler
3945
4046
41- def get_asg_name (stack_name , region , proxy_config ):
47+ def _get_asg_name (stack_name , region , proxy_config ):
48+ """
49+ Get autoscaling group name.
50+
51+ :param stack_name: stack name to search for
52+ :param region: AWS region
53+ :param proxy_config: Proxy configuration
54+ :return: the ASG name
55+ """
4256 asg_conn = boto3 .client ('autoscaling' , region_name = region , config = proxy_config )
4357 asg_name = ""
4458 no_asg = True
@@ -48,14 +62,19 @@ def get_asg_name(stack_name, region, proxy_config):
4862 r = asg_conn .describe_tags (Filters = [{'Name' : 'value' , 'Values' : [stack_name ]}])
4963 asg_name = r .get ('Tags' )[0 ].get ('ResourceId' )
5064 no_asg = False
51- except IndexError as e :
65+ except IndexError :
5266 log .error ("No asg found for cluster %s" % stack_name )
5367 time .sleep (30 )
5468
5569 return asg_name
5670
5771
58- def read_cfnconfig ():
72+ def _read_cfnconfig ():
73+ """
74+ Read configuration file.
75+
76+ :return: a dictionary containing the configuration parameters
77+ """
5978 cfnconfig_params = {}
6079 with open (cfnconfig_file ) as f :
6180 for kvp in f :
@@ -64,61 +83,94 @@ def read_cfnconfig():
6483 return cfnconfig_params
6584
6685
67- def get_vcpus_from_pricing_file (instance_type ):
86+ def _get_vcpus_from_pricing_file (instance_type ):
87+ """
88+ Read pricing file and get number of vcpus for the given instance type.
89+
90+ :param instance_type: the instance type to search for.
91+ :return: the number of vcpus or -1 if the instance type cannot be found
92+ """
6893 with open (pricing_file ) as f :
6994 instances = json .load (f )
7095 try :
7196 vcpus = int (instances [instance_type ]["vcpus" ])
7297 log .info ("Instance %s has %s vcpus." % (instance_type , vcpus ))
73- return vcpus
74- except KeyError as e :
75- log .error ("Instance %s not found in file %s." % (instance_type , pricing_file ))
76- exit (1 )
98+ except KeyError :
99+ log .error ("Unable to get vcpus from file %s. Instance type %s not found." % (pricing_file , instance_type ))
100+ vcpus = - 1
77101
102+ return vcpus
78103
79- def get_instance_properties (instance_type ):
80- cfnconfig_params = read_cfnconfig ()
104+
105+ def _get_instance_properties (instance_type ):
106+ """
107+ Get instance properties for the given instance type, according to the cfn_scheduler_slots configuration parameter.
108+
109+ :param instance_type: instance type to search for
110+ :return: a dictionary containing the instance properties. E.g. {'slots': <slots>}
111+ """
81112 try :
113+ cfnconfig_params = _read_cfnconfig ()
82114 cfn_scheduler_slots = cfnconfig_params ["cfn_scheduler_slots" ]
83- slots = 0
84- vcpus = get_vcpus_from_pricing_file (instance_type )
85-
86- if cfn_scheduler_slots == "cores" :
87- log .info ("Instance %s will use number of cores as slots based on configuration." % instance_type )
88- slots = - (- vcpus // 2 )
89- elif cfn_scheduler_slots == "vcpus" :
90- log .info ("Instance %s will use number of vcpus as slots based on configuration." % instance_type )
115+ except KeyError :
116+ log .error (
117+ "Required config parameter 'cfn_scheduler_slots' not found in file %s. Assuming 'vcpus'" % cfnconfig_file
118+ )
119+ cfn_scheduler_slots = "vcpus"
120+
121+ vcpus = _get_vcpus_from_pricing_file (instance_type )
122+
123+ if cfn_scheduler_slots == "cores" :
124+ log .info ("Instance %s will use number of cores as slots based on configuration." % instance_type )
125+ slots = - (- vcpus // 2 )
126+
127+ elif cfn_scheduler_slots == "vcpus" :
128+ log .info ("Instance %s will use number of vcpus as slots based on configuration." % instance_type )
129+ slots = vcpus
130+
131+ elif cfn_scheduler_slots .isdigit ():
132+ slots = int (cfn_scheduler_slots )
133+ log .info ("Instance %s will use %s slots based on configuration." % (instance_type , slots ))
134+
135+ if slots <= 0 :
136+ log .error (
137+ "cfn_scheduler_slots config parameter '%s' must be greater than 0. "
138+ "Assuming 'vcpus'" % cfn_scheduler_slots
139+ )
91140 slots = vcpus
92- elif cfn_scheduler_slots . isdigit () :
93- slots = int ( cfn_scheduler_slots )
94- log . info ( "Instance %s will use %s slots based on configuration." % ( instance_type , slots ))
141+ else :
142+ log . error ( "cfn_scheduler_slots config parameter '%s' is invalid. Assuming 'vcpus'" % cfn_scheduler_slots )
143+ slots = vcpus
95144
96- if not slots > 0 :
97- log .critical ("cfn_scheduler_slots config parameter '%s' was invalid" % cfn_scheduler_slots )
98- exit ( 1 )
145+ if slots <= 0 :
146+ log .critical ("slots value is invalid. Setting it to 0." )
147+ slots = 0
99148
100- return {'slots' : slots }
149+ return {'slots' : slots }
101150
102- except KeyError :
103- log .error ("Required config parameter 'cfn_scheduler_slots' not found in file %s." % cfnconfig_file )
104- exit (1 )
105151
152+ def _fetch_pricing_file (pcluster_dir , region , proxy_config ):
153+ """
154+ Download pricing file.
106155
107- def fetch_pricing_file (proxy_config , cfncluster_dir , region ):
156+ :param proxy_config: Proxy Configuration
157+ :param pcluster_dir: Parallelcluster configuration folder
158+ :param region: AWS Region
159+ """
108160 s3 = boto3 .resource ('s3' , region_name = region , config = proxy_config )
109161 try :
110- if not os .path .exists (cfncluster_dir ):
111- os .makedirs (cfncluster_dir )
162+ if not os .path .exists (pcluster_dir ):
163+ os .makedirs (pcluster_dir )
112164 except OSError as ex :
113- log .critical ('Could not create directory %s. Failed with exception: %s' % (cfncluster_dir , ex ))
165+ log .critical ('Could not create directory %s. Failed with exception: %s' % (pcluster_dir , ex ))
114166 raise
115167 bucket_name = '%s-aws-parallelcluster' % region
116168 try :
117169 bucket = s3 .Bucket (bucket_name )
118- bucket .download_file ('instances/instances.json' , '%s/instances.json' % cfncluster_dir )
170+ bucket .download_file ('instances/instances.json' , '%s/instances.json' % pcluster_dir )
119171 except ClientError as e :
120172 log .critical ("Could not save instance mapping file %s/instances.json from S3 bucket %s. "
121- "Failed with exception: %s" % (cfncluster_dir , bucket_name , e ))
173+ "Failed with exception: %s" % (pcluster_dir , bucket_name , e ))
122174 raise
123175
124176
@@ -139,7 +191,7 @@ def main():
139191 scheduler = config .get ('jobwatcher' , 'scheduler' )
140192 stack_name = config .get ('jobwatcher' , 'stack_name' )
141193 instance_type = config .get ('jobwatcher' , 'compute_instance_type' )
142- cfncluster_dir = config .get ('jobwatcher' , 'cfncluster_dir' )
194+ pcluster_dir = config .get ('jobwatcher' , 'cfncluster_dir' )
143195 _proxy = config .get ('jobwatcher' , 'proxy' )
144196 proxy_config = Config ()
145197
@@ -150,53 +202,70 @@ def main():
150202 try :
151203 asg_name = config .get ('jobwatcher' , 'asg_name' )
152204 except ConfigParser .NoOptionError :
153- asg_name = get_asg_name (stack_name , region , proxy_config )
205+ asg_name = _get_asg_name (stack_name , region , proxy_config )
154206 config .set ('jobwatcher' , 'asg_name' , asg_name )
155207 log .info ("Saving asg_name %s in the config file %s" % (asg_name , _configfilename ))
156208 with open (_configfilename , 'w' ) as configfile :
157209 config .write (configfile )
158210
159211 # fetch the pricing file on startup
160- fetch_pricing_file ( proxy_config , cfncluster_dir , region )
212+ _fetch_pricing_file ( pcluster_dir , region , proxy_config )
161213
162214 # load scheduler
163- s = load_scheduler_module (scheduler )
215+ s = _load_scheduler_module (scheduler )
164216
165217 while True :
166218 # get the number of vcpu's per compute instance
167- instance_properties = get_instance_properties (instance_type )
168-
169- # Get number of nodes requested
170- pending = s .get_required_nodes (instance_properties )
171-
172- # Get number of nodes currently
173- running = s .get_busy_nodes (instance_properties )
174-
175- log .info ("%s jobs pending; %s jobs running" % (pending , running ))
219+ instance_properties = _get_instance_properties (instance_type )
220+ if instance_properties .get ('slots' ) <= 0 :
221+ log .critical ("Error detecting number of slots per instance. The cluster will not scale up." )
176222
177- if pending > 0 :
178- # connect to asg
179- asg_conn = boto3 . client ( 'autoscaling' , region_name = region , config = proxy_config )
223+ else :
224+ # Get number of nodes requested
225+ pending = s . get_required_nodes ( instance_properties )
180226
181- # get current limits
182- asg = asg_conn . describe_auto_scaling_groups ( AutoScalingGroupNames = [ asg_name ]). get ( 'AutoScalingGroups' )[ 0 ]
227+ if pending < 0 :
228+ log . critical ( "Error detecting number of required nodes. The cluster will not scale up." )
183229
184- min = asg .get ('MinSize' )
185- current_desired = asg .get ('DesiredCapacity' )
186- max = asg .get ('MaxSize' )
187- log .info ("min/desired/max %d/%d/%d" % (min , current_desired , max ))
188- log .info ("Nodes requested %d, Nodes running %d" % (pending , running ))
230+ elif pending == 0 :
231+ log .debug ("There are no pending jobs. Noop." )
189232
190- # check to make sure it's in limits
191- desired = running + pending
192- if desired > max :
193- log .info ("%d requested nodes is greater than max %d. Requesting max %d." % (desired , max , max ))
194- asg_conn .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = max )
195- elif desired <= current_desired :
196- log .info ("%d nodes desired %d nodes in asg. Noop" % (desired , current_desired ))
197233 else :
198- log .info ("Setting desired to %d nodes, requesting %d more nodes from asg." % (desired , desired - current_desired ))
199- asg_conn .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = desired )
234+ # Get current number of nodes
235+ running = s .get_busy_nodes (instance_properties )
236+ log .info ("%s jobs pending; %s jobs running" % (pending , running ))
237+
238+ # connect to asg
239+ asg_client = boto3 .client ('autoscaling' , region_name = region , config = proxy_config )
240+
241+ # get current limits
242+ asg = asg_client .describe_auto_scaling_groups (AutoScalingGroupNames = [asg_name ]).get ('AutoScalingGroups' )[0 ]
243+
244+ min_size = asg .get ('MinSize' )
245+ current_desired = asg .get ('DesiredCapacity' )
246+ max_size = asg .get ('MaxSize' )
247+ log .info ("min/desired/max %d/%d/%d" % (min_size , current_desired , max_size ))
248+ log .info ("%d nodes requested, %d nodes running" % (pending , running ))
249+
250+ # Check to make sure requested number of instances is within ASG limits
251+ required = running + pending
252+ if required <= current_desired :
253+ log .info ("%d nodes required, %d nodes in asg. Noop" % (required , current_desired ))
254+ else :
255+ if required > max_size :
256+ log .info (
257+ "The number of required nodes %d is greater than max %d. Requesting max %d."
258+ % (required , max_size , max_size )
259+ )
260+ else :
261+ log .info (
262+ "Setting desired to %d nodes, requesting %d more nodes from asg."
263+ % (required , required - current_desired )
264+ )
265+ requested = min (required , max_size )
266+
267+ # update ASG
268+ asg_client .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = requested )
200269
201270 time .sleep (60 )
202271
0 commit comments