|
11 | 11 | import os |
12 | 12 | import socket |
13 | 13 |
|
| 14 | +import yaml |
14 | 15 |
|
15 | 16 | AWS_CLOUDWATCH_CFG_PATH = '/opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json' |
16 | 17 |
|
@@ -39,6 +40,10 @@ def parse_args(): |
39 | 40 | required=True, |
40 | 41 | choices=['slurm', 'awsbatch', 'plugin'], |
41 | 42 | help='Scheduler') |
| 43 | + parser.add_argument('--cluster-config-path', |
| 44 | + required=False, |
| 45 | + help='Cluster configuration path', |
| 46 | + ) |
42 | 47 | return parser.parse_args() |
43 | 48 |
|
44 | 49 |
|
@@ -132,6 +137,44 @@ def select_logs(configs, args): |
132 | 137 | return selected_configs |
133 | 138 |
|
134 | 139 |
|
| 140 | +def get_node_roles(scheudler_plugin_node_roles): |
| 141 | + node_type_roles_map = {"ALL": ["ComputeFleet", "HeadNode"], "HEAD": ["HeadNode"], "COMPUTE": ["ComputeFleet"]} |
| 142 | + return node_type_roles_map.get(scheudler_plugin_node_roles) |
| 143 | + |
| 144 | + |
| 145 | +def load_config(cluster_config_path): |
| 146 | + with open(cluster_config_path) as input_file: |
| 147 | + return yaml.load(input_file, Loader=yaml.SafeLoader) |
| 148 | + |
| 149 | + |
| 150 | +def add_scheduler_plugin_log(config_data, cluster_config_path): |
| 151 | + """Add custom log files to config data if log files specified in scheduler plugin.""" |
| 152 | + cluster_config = load_config(cluster_config_path) |
| 153 | + if ( |
| 154 | + get_dict_value(cluster_config, "Scheduling.SchedulerSettings.SchedulerDefinition.Monitoring.Logs.Files") |
| 155 | + and get_dict_value(cluster_config, "Scheduling.Scheduler") == "plugin" |
| 156 | + ): |
| 157 | + log_files = get_dict_value( |
| 158 | + cluster_config, "Scheduling.SchedulerSettings.SchedulerDefinition.Monitoring.Logs.Files" |
| 159 | + ) |
| 160 | + for log_file in log_files: |
| 161 | + # Add log config |
| 162 | + log_config = { |
| 163 | + "timestamp_format_key": log_file.get("LogStreamName"), |
| 164 | + "file_path": log_file.get("FilePath"), |
| 165 | + "log_stream_name": log_file.get("LogStreamName"), |
| 166 | + "schedulers": ["plugin"], |
| 167 | + "platforms": ["centos", "ubuntu", "amazon"], |
| 168 | + "node_roles": get_node_roles(log_file.get("NodeType")), |
| 169 | + "feature_conditions": [], |
| 170 | + } |
| 171 | + config_data["log_configs"].append(log_config) |
| 172 | + |
| 173 | + # Add timestamp formats |
| 174 | + config_data["timestamp_formats"][log_file.get("LogStreamName")] = log_file.get("TimestampFormat") |
| 175 | + return config_data |
| 176 | + |
| 177 | + |
135 | 178 | def add_timestamps(configs, timestamps_dict): |
136 | 179 | """For each config, set its timestamp_format field based on its timestamp_format_key field.""" |
137 | 180 | for config in configs: |
@@ -159,10 +202,21 @@ def create_config(log_configs): |
159 | 202 | } |
160 | 203 |
|
161 | 204 |
|
| 205 | +def get_dict_value(value, attributes, default=None): |
| 206 | + """Get key value from dictionary and return default if the key does not exist.""" |
| 207 | + for key in attributes.split("."): |
| 208 | + value = value.get(key, None) |
| 209 | + if value is None: |
| 210 | + return default |
| 211 | + return value |
| 212 | + |
| 213 | + |
162 | 214 | def main(): |
163 | 215 | """Create cloudwatch agent config file.""" |
164 | 216 | args = parse_args() |
165 | 217 | config_data = read_data(args.config) |
| 218 | + if args.cluster_config_path: |
| 219 | + config_data = add_scheduler_plugin_log(config_data, args.cluster_config_path) |
166 | 220 | log_configs = select_logs(config_data['log_configs'], args) |
167 | 221 | log_configs = add_timestamps(log_configs, config_data['timestamp_formats']) |
168 | 222 | log_configs = add_log_group_name_params(args.log_group, log_configs) |
|
0 commit comments