#v20200120 # Extract files from an EGP project, since they cannot otherwise # be opened without Enterprise Guide # Provide list of directories and this program will find all EGP # files within it. For each, a subdir is created that shows all # contained files and attempts to rename them according to the node # in the process flow. import os import shutil import xml.etree.ElementTree as ET import zipfile # function libraries copied to this program's folder import admin_functions admin_functions.startup(print_to_log = False) #--------------------------------------------------------------------- # DEFINE PARAMETERS #--------------------------------------------------------------------- # Limit program to run a small number of reports, for testing loop_limit = 999999 #loop_limit = 1 # backslash is an escape character, so need special \\ syntax dirs_to_convert = [ 'C:\\Users\\user.name\\Downloads\\EGP converter', ] # remove any existing directories created by prior runs of this program purge_existing_dirs = False #--------------------------------------------------------------------- print('\n') #--------------------------------------------------------------------- EGPs_to_convert = [] for dir_to_convert in dirs_to_convert: # start timing this iteration and print to log admin_functions.lap_task(f'checking {dir_to_convert}') # traverse each subdirectory within the ID's folder for directory, subdir_list, file_list in os.walk(dir_to_convert): # loop through each file within the subdirectory for file_name in file_list: # if the file extension is egp, save to the conversion list if os.path.splitext(file_name)[1].lower() == '.egp': EGPs_to_convert.append(os.path.join(directory, file_name)) # done with this dir, move onto the next admin_functions.lap_time() print(f'# EGPs found: {len(EGPs_to_convert)}') #--------------------------------------------------------------------- print('\n\n') #--------------------------------------------------------------------- error_list = [] for loop_num, EGP_path in enumerate(EGPs_to_convert): # end the parent for loop if we hit the loop limit if loop_num == loop_limit: # no EGP is processed this time, so decrement loop counter loop_num -= 1 break # start timing this iteration and print to log admin_functions.lap_task(EGP_path) # EGP files extracted to folder with name = filename + 'files' unzip_dir = f'{EGP_path} files' # remove existing directory if desired if purge_existing_dirs and os.path.exists(unzip_dir): shutil.rmtree(unzip_dir) # EGP is a zip file in disguise; extract files to unzip folder # Files have generic names and are in a cryptic folder structure with zipfile.ZipFile(EGP_path, 'r') as zip_file: zip_file.extractall(unzip_dir) # open XML file and save needed info from it try: # project.xml has information to understand folder structure tree = ET.parse(f'{unzip_dir}/project.xml') root = tree.getroot() # project.xml has the following nested structure: # # # # # # YKA14vI8hNqfYvCN # traverse XML file to create cross-reference of Label and ID item_xref = {} for level1 in root: for level2 in level1: for level3 in level2: if level3.tag == 'Element': for level4 in level3: if level4.tag == 'Label': item_name = level4.text if level4.tag == 'ID': item_id = level4.text item_xref[item_id] = item_name except Exception as except_obj: # save error info error_list.append(f''' Error parsing XML file EGP: {EGP_path} ERROR: {except_obj}''') # stop this iteration if do not have a ID/name mapping admin_functions.lap_time() continue # For each ID, traverse subdir of same name and look for files # Files have generic names (e.g. code.sas, result.log) # Keep the file extension but replace the name with the ID's name # Also move renamed file to the top directory for the EGP try: # EGP files allow multiple nodes with the same file, but # this is not allowed for files in a folder. To avoid # conflicts, track list of new filenames used renamed_files = [] for item_id, item_name in item_xref.items(): # certain punctuation is not allowed in file names, so # replace with underscore if found in item_name item_name = item_name.replace('|', '_') item_name = item_name.replace('<', '_') item_name = item_name.replace('>', '_') item_name = item_name.replace('"', '_') item_name = item_name.replace('?', '_') item_name = item_name.replace('*', '_') item_name = item_name.replace(':', '_') item_name = item_name.replace('/', '_') # backslash is an escape character, so need special \\ syntax item_name = item_name.replace('\\', '_') # traverse each subdirectory within the ID's folder for directory, subdir_list, file_list in os.walk(f'{unzip_dir}/{item_id}'): # a subdir may have multiple files; loop through each for file_name in file_list: # find the file extension file_ext = os.path.splitext(file_name)[1] # if filename has been used already, add underscore # suffix until it is unique while f'{item_name}{file_ext}'.lower() in renamed_files: item_name = f'{item_name}_' # rename and move os.rename( f'{directory}/{file_name}', f'{unzip_dir}/{item_name}{file_ext}') # save the filename that was ultimately used renamed_files.append(f'{item_name}{file_ext}'.lower()) except Exception as except_obj: # save error info error_list.append(f''' Error renaming item EGP: {EGP_path} item_id: {item_id} item_name: {item_name} renamed_files: {renamed_files} ERROR: {except_obj}''') # Remove empty dirs in EGP subdir. Some may have been # empty from the start (these represent deleted nodes) # and some are empty due to moving the contained files. # The topdown=False option looks through child directories # before parents, so subdirs are cleared before their parents for directory, subdir_list, file_list in os.walk(unzip_dir, topdown=False): try: # do not try to remove the top parent directory if directory != unzip_dir: os.rmdir(directory) # Any remaining files in the directory did not have a # match in the ID/name cross-reference except Exception as except_obj: pass # save error info # error_list.append(f''' # Error removing directory # EGP: {EGP_path} # directory: {directory} # ERROR: {except_obj}''') # done with this EGP, move onto the next admin_functions.lap_time() #--------------------------------------------------------------------- print('\n\n') #--------------------------------------------------------------------- print(f''' EGPs converted: {f"{loop_num + 1:,}":>6} ''') if error_list: print(f'\n\n<{len(error_list)} ERRORS ENCOUNTERED>') for error in error_list: print(error) admin_functions.shutdown()