""" Author: Weisen Pan Date: 2023-10-24 """ import re import argparse import subprocess import pandas as pd from pathlib import Path RESOURCE_KEYS = ['MilliCpu', 'Memory', 'Gpu', 'MilliGpu'] STATUS_KEYS = [ "q1_lack_both", 'q2_lack_gpu', 'q3_satisfied', 'q4_lack_cpu', 'xl_satisfied', 'xr_lack_cpu', 'no_access', "frag_gpu_milli" ] def camel_case_to_snake_case(name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() EVENT_TAGS = [ "InitSchedule", "PostEviction", "PostDeschedule", "ScheduleInflation", "DescheduleInflation" ] SNAKE_CASE_TAGS = [camel_case_to_snake_case(tag) for tag in EVENT_TAGS] RESOURCE_COLUMNS = [camel_case_to_snake_case(key) for key in RESOURCE_KEYS] RESOURCE_COLUMNS.extend([camel_case_to_snake_case(key + "Amount") for key in RESOURCE_KEYS]) RESOURCE_COLUMNS.extend(STATUS_KEYS) def restructure_dataframe_columns(df, tag_list=SNAKE_CASE_TAGS): meta_cols, data_cols = [], [] for col in df.columns: if any(col.endswith("_" + tag) for tag in tag_list): data_cols.append(col) else: meta_cols.append(col) output_rows = [] for _, row in df.iterrows(): meta_data = {col: row[col] for col in meta_cols} for tag in tag_list: data_dict = meta_data.copy() data_dict['tag'] = tag for col in data_cols: if col.endswith("_" + tag): key = col.rstrip("_" + tag) data_dict[key] = row.get(col) if 'tag' in data_dict: output_rows.append(pd.DataFrame.from_dict(data_dict, orient='index').T) return pd.concat(output_rows) def fill_missing_values(df): columns_to_fill = [ 'milli_cpu', 'memory', 'gpu', 'milli_gpu', 'milli_cpu_amount', 'memory_amount', 'gpu_amount', 'milli_gpu_amount' ] + STATUS_KEYS for col in columns_to_fill: df.loc[df.isnull().any(axis=1), col + "_schedule_inflation"] = df.loc[df.isnull().any(axis=1), col + "_init_schedule"] return df def extract_metadata_from_log(log_name: str, log_directory: Path = None): if log_name.startswith("log-"): log_name = log_name[4:] metadata = {} parts = log_name.split('-') if len(parts) > 2: parts[1] = "-".join(parts[1:]) if log_directory: exp_dir = Path(log_directory) config_client, config_server = parts[0].rstrip('.yaml'), parts[1].rstrip('.yaml') client_config_file = exp_dir / f"{config_client}.yaml" server_config_file = exp_dir / f"{config_server}.yaml" if client_config_file.is_file() and server_config_file.is_file(): for item in config_client.split('_'): # Add conditions and extraction logic for each prefix # ... (same logic as original) # Logic for parsing the server config metadata["policy"] = "" for item in config_server.split('_'): # Add conditions and extraction logic for each prefix # ... (same logic as original) return metadata else: print("ERROR: log_directory is NONE") return metadata # Removed the unused imports: `argparse` and `subprocess` from typing import Dict import pandas as pd from pathlib import Path def extract_information_from_line(line: str, dict_collectors: Dict): # ... (fill with the logic for extracting information from a line, based on your logic) def process_log_file(file: Path) -> Dict: # ... (fill with the logic for processing a single log file, using helper functions as needed) def export_to_csv(data: Dict, output_file: Path): # ... (fill with the logic for exporting your data to a CSV file) def log_to_csv(log_path: Path, outfile: Path): # Create paths for different outputs out_frag_path = outfile.parent / (outfile.stem + '_frag.csv') out_allo_path = outfile.parent / (outfile.stem + '_allo.csv') out_cdol_path = outfile.parent / (outfile.stem + '_cdol.csv') # Initial data collectors out_row_list = [] out_frag_col_dict = {} out_allo_col_dict = {} out_cdol_col_dict = {} # Iterate through log files for file in log_path.glob("*.log"): try: out_data = process_log_file(file) out_row_list.append(out_data['out_row']) out_frag_col_dict.update(out_data['out_frag']) out_allo_col_dict.update(out_data['out_allo']) out_cdol_col_dict.update(out_data['out_cdol']) except Exception as e: print(f"[Error] Failed at {file} with error: {e}") # Save to CSV outdf = pd.concat(out_row_list) outdf.to_csv(outfile, index=False) if out_frag_col_dict: export_to_csv(out_frag_col_dict, out_frag_path) if out_allo_col_dict: export_to_csv(out_allo_col_dict, out_allo_path) if out_cdol_col_dict: export_to_csv(out_cdol_col_dict, out_cdol_path) from pathlib import Path import subprocess def failed_pods_in_detail(log_path, outfile=None): outfilepath = outfile if outfile else log_path / "analysis_fail.out" print(f"Failed pods: {outfilepath}") with open(outfilepath, 'w') as outfile: log_file_counter = 0 INFOMSG = "level=info msg=" for file in log_path.glob("*.log"): with open(file, 'r') as f: try: log_file_counter += 1 outfile.write(f"\n===\n{file.name}\n") fail_line_counter = 0 rsrc_dict = {} for i, line in enumerate(f): if "Failed Pods in detail" in line: fail_line_counter = 1 elif INFOMSG in line: fail_line_counter += 1 line = line.split(INFOMSG)[1].strip('"') rsrc = line.split("<")[1].split(">")[0] rsrc_dict[rsrc] = rsrc_dict.get(rsrc, 0) + 1 elif fail_line_counter > 0: fail_line_counter = 0 sorted_rsrc = sorted(rsrc_dict.items(), key=lambda item: -item[1]) num_failed_pods = sum(v for _, v in sorted_rsrc) for k, v in sorted_rsrc: outfile.write(f"{v:2}; <{k}>\n") outfile.write(f"Failed No.: {num_failed_pods}\n") rsrc_dict = {} except Exception as e: print(f"[Error] Failed at {file} with error: {e}") def grep_log_cluster_analysis(log_path, outfile=None): outfilepath = log_path / "analysis_grep.out" if not outfile else outfile print(f"Log grep: {outfilepath}") if outfilepath.is_file(): subprocess.call(["rm", "-f", outfilepath]) for file in log_path.glob("*.log"): with open(outfilepath, 'ab') as out: out.write(f"\n===\n# {file.name}:\n".encode()) subprocess.call(["grep", "-e", "Cluster Analysis", "-A", "16", file], stdout=out) import argparse from pathlib import Path if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process log files") parser.add_argument("logfile", type=str, help="input log file", default='logs/') parser.add_argument("-o", "--outfile", type=str, help="output file name", default=None) parser.add_argument("-g", "--grep", action='store_true', help="output grepped results") parser.add_argument("-f", "--failed", action='store_true', help="output failed pods") parser.add_argument("-s", "--skipped", action='store_true', help="skip log_to_csv") args = parser.parse_args() ANAL_FILE_PREFIX = "analysis" FAIL_FILE = f"{ANAL_FILE_PREFIX}_fail.out" GREP_FILE = f"{ANAL_FILE_PREFIX}_grep.out" script_path = Path(__file__).parent log_path = script_path.parent / args.logfile if args.failed: failed_pods_in_detail(log_path, log_path / FAIL_FILE) if args.grep: grep_log_cluster_analysis(log_path, log_path / GREP_FILE) if not args.skipped: outfile = log_path / "analysis.csv" if not args.outfile else Path(args.outfile) print(f"In: {log_path}\nOut: {outfile}") log_to_csv(log_path, outfile)