# Copyright 2019 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from copy import deepcopy import logging import pprint import re from openpyxl import load_workbook from openpyxl import Workbook import yaml from spyglass_plugin_xls import exceptions LOG = logging.getLogger(__name__) class ExcelParser(object): """Parse data from excel into a dict""" def __init__( self, file_name: str, excel_specs: str, spec: str = 'xl_spec'): """Initializes an ExcelParser to extract data from the Excel workbook :param file_name: path to the Excel workbook :param excel_specs: path to the Excel workbook spec """ self.file_name = file_name with open(excel_specs, "r") as f: spec_raw_data = f.read() self.excel_specs = yaml.safe_load(spec_raw_data) # A combined design spec, returns a workbook object after combining # all the inputs excel specs combined_design_spec = self.load_excel_data(file_name) self.wb_combined = combined_design_spec self.spec = spec self.loaded_spec = self.excel_specs['specs'][self.spec] self.validate_sheet_names_with_spec() self.loaded_data = self.extract_data_using_spec() @staticmethod def sanitize(string): """Remove extra spaces and convert string to lower case""" return string.replace(" ", "").lower() def compare(self, string1, string2): """Compare the strings""" return bool(re.search(self.sanitize(string1), self.sanitize(string2))) def validate_sheet(self, spec, sheet): """Check if the sheet is correct or not""" ws = self.wb_combined[sheet] header_row = self.excel_specs["specs"][spec]["header_row"] ipmi_header = self.excel_specs["specs"][spec]["ipmi_address_header"] ipmi_column = self.excel_specs["specs"][spec]["ipmi_address_col"] header_value = ws.cell(row=header_row, column=ipmi_column).value return bool(self.compare(ipmi_header, header_value)) def _get_workbook(self, sheet_name, data=None): sheet_name_to_use = sheet_name if data and 'sheet_name' in data: sheet_name_to_use = data['sheet_name'] workbook_object, extracted_sheetname = self.get_xl_obj_and_sheetname( sheet_name_to_use) if workbook_object is not None: return workbook_object[extracted_sheetname] else: return self.wb_combined[sheet_name_to_use] @staticmethod def _check_sanitize_settings(data): no_sanitize_keys = [] sanitize_default = True if 'sanitize' in data and not data['sanitize']: sanitize_default = False if 'no_sanitize' in data: no_sanitize_keys = data['no_sanitize'] return sanitize_default, no_sanitize_keys def extract_data_points(self, data, sheet_name=None): extracted_data = {} ws = self._get_workbook(sheet_name, data) sanitize_default, no_sanitize_keys = self._check_sanitize_settings( data) for key, coordinate in data['data'].items(): value = ws.cell(row=coordinate[0], column=coordinate[1]).value if not sanitize_default or key in no_sanitize_keys: extracted_data[key] = value else: extracted_data[key] = self.sanitize(value) return extracted_data def extract_data_series(self, data, sheet_name=None): extracted_data = [] ws = self._get_workbook(sheet_name, data) sanitize_default, no_sanitize_keys = self._check_sanitize_settings( data) for x in range(data['iter']['start'], data['iter']['end'] + 1): data_dict = {} for key, y in data['data'].items(): if data['iter']['index-type'] == 'row': value = ws.cell(row=x, column=y).value elif data['iter']['index-type'] == 'col': value = ws.cell(row=y, column=x).value else: raise exceptions.InvalidSpec() if value: if not sanitize_default or key in no_sanitize_keys: data_dict[key] = value else: data_dict[key] = self.sanitize(value) if data_dict: extracted_data.append(data_dict) return extracted_data def extract_data_using_spec(self, spec_dict=None, sheet_name=None): if not spec_dict: spec_dict = self.loaded_spec extracted_data = {} for name, data in spec_dict.items(): data_type = self.sanitize(data['type']) if data_type == 'series': extracted_data[name] = self.extract_data_series( data, sheet_name) elif data_type == 'point': extracted_data[name] = self.extract_data_points( data, sheet_name) elif data_type == 'container': sheet = None if 'sheet_name' in data: sheet = data['sheet_name'] extracted_data[name] = self.extract_data_using_spec( data['data'], sheet or sheet_name) return extracted_data def get_ipmi_data(self): """Read IPMI data from the sheet""" ipmi_data = {} hosts = [] previous_server_gateway = None for entry in self.loaded_data['ipmi']: hostname = entry['hostname'] hosts.append(hostname) ipmi_data[hostname] = deepcopy(entry) ipmi_data[hostname].pop('hostname') if "/" in ipmi_data[hostname]['ipmi_address']: ipmi_data[hostname]['ipmi_address'] = ipmi_data[hostname][ 'ipmi_address'].split("/")[0] if ipmi_data[hostname]['ipmi_gateway']: previous_server_gateway = ipmi_data[hostname]['ipmi_gateway'] else: ipmi_data[hostname]['ipmi_gateway'] = previous_server_gateway if not ipmi_data[hostname]['host_profile']: raise exceptions.MissingData( missing_data='host_profile', section='host %s' % hostname) LOG.debug( "ipmi data extracted from excel:\n{}".format( pprint.pformat(ipmi_data))) LOG.debug( "host data extracted from excel:\n{}".format( pprint.pformat(hosts))) return [ipmi_data, hosts] def get_private_vlan_data(self): """Get private vlan data from private IP sheet""" vlan_data = {} for entry in self.loaded_data['private_vlan']: net_type = entry['net_type'] if net_type: vlan = re.sub(r'\W+', '', entry['vlan']).lower() vlan_data[vlan] = net_type LOG.debug( "vlan data extracted from excel:\n%s", pprint.pformat(vlan_data)) return vlan_data def get_private_network_data(self): """Read network data from the private ip sheet""" vlan_data = self.get_private_vlan_data() network_data = {} for entry in self.loaded_data['private_net']: vlan = re.sub(r'\W+', '', entry['vlan']).lower() network = entry['ip'] net_type = vlan_data[vlan] if net_type not in network_data: network_data[net_type] = {"vlan": vlan, "subnet": []} network_data[net_type]["subnet"].append(network) for network in network_data: network_data[network]["is_common"] = True return network_data def get_public_network_data(self): """Read public network data from public ip data""" oam_net = self.loaded_data['public']['oam']['ip'] if type(oam_net) is str: oam_net = [oam_net] network_data = { "oam": { 'subnet': oam_net, 'vlan': re.sub( r'\W+', '', self.loaded_data['public']['oam']['vlan']) }, "ingress": self.loaded_data['public']['ingress']['ip'], "oob": { "subnet": [] } } for entry in self.loaded_data['public']['oob']: oob_net = entry['ip'] if oob_net: network_data["oob"]["subnet"].append(self.sanitize(oob_net)) LOG.debug( "public network data extracted from\ excel:\n%s", pprint.pformat(network_data), ) return network_data def get_site_info(self): """Read location, dns, ntp and ldap data""" site_info = {} dns_servers = self.loaded_data['site_info']['dns'] ntp_servers = self.loaded_data['site_info']['ntp'] if dns_servers is None: raise exceptions.MissingData( missing_data='dns servers', section='site_info') dns_servers = list(filter(None, re.split(" |,|\n", dns_servers))) ntp_servers = list(filter(None, re.split(" |,|\n", ntp_servers))) site_info = { "location": self.get_location_data(), "dns": dns_servers, "ntp": ntp_servers, "domain": self.loaded_data['site_info']['domain'], "ldap": { "subdomain": self.loaded_data['site_info']['subdomain'], "common_name": self.loaded_data['site_info']['global_group'], "url": self.loaded_data['site_info']['ldap'], }, } LOG.debug( "Site Info extracted from\ excel:\n%s", pprint.pformat(site_info), ) return site_info def get_location_data(self): """Read location data from the site and zone sheet""" return { "corridor": self.loaded_data['location']['corridor'], "name": self.loaded_data['location']['sitename'], "state": self.loaded_data['location']['state'], "country": self.loaded_data['location']['country'], "physical_location": self.loaded_data['location']['clli'], } def validate_sheet_names_with_spec(self): """Checks is sheet name in spec file matches with excel file""" sheet_name_list = [] for key, data in self.loaded_spec.items(): sheet_name_list.append(data['sheet_name']) for sheet_name in sheet_name_list: workbook_object, extracted_sheetname = ( self.get_xl_obj_and_sheetname(sheet_name)) if workbook_object is not None: wb = workbook_object sheet_name = extracted_sheetname else: wb = self.wb_combined if sheet_name not in wb.sheetnames: raise exceptions.ExcelSheetNotFound(sheet_name=sheet_name) LOG.info("Sheet names in excel spec validated") def get_data(self): """Create a dict with combined data""" ipmi_data = self.get_ipmi_data() network_data = self.get_private_network_data() public_network_data = self.get_public_network_data() site_info_data = self.get_site_info() data = { "ipmi_data": ipmi_data, "network_data": { "private": network_data, "public": public_network_data, }, "site_info": site_info_data, } LOG.debug( "Location data extracted from\ excel:\n%s", pprint.pformat(data), ) return data @staticmethod def load_excel_data(filename): """Combines multiple excel file to a single design spec""" design_spec = Workbook() loaded_workbook = load_workbook(filename, data_only=True) for names in loaded_workbook.sheetnames: design_spec_worksheet = design_spec.create_sheet(names) loaded_workbook_ws = loaded_workbook[names] for row in loaded_workbook_ws: for cell in row: design_spec_worksheet[cell.coordinate].value = cell.value return design_spec @staticmethod def get_xl_obj_and_sheetname(sheetname): """The logic confirms if the sheetname is specified for example as: 'MTN57a_AEC_Network_Design_v1.6.xlsx:Public IPs' """ if re.search(".xlsx", sheetname) or re.search(".xls", sheetname): # Extract file name source_xl_file = sheetname.split(":")[0] wb = load_workbook(source_xl_file, data_only=True) return [wb, sheetname.split(":")[1]] else: return [None, sheetname]