#!/usr/bin/env python # Copyright 2007 ETH Zuerich, CISD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # Script which migrates locations in the data store. Supported data store structure is: # # []/Project_/Experiment_/ # ObservableType_[IMAGE|IMAGE ANALYSIS]/Barcode_/1/ # # This structure will be migrated to: # # Instance_/Group_/Project_/ # Experiment_/ObservableType_[HCS_IMAGE|HCS_IMAGE_ANALYSIS]/ # Sample_/Dataset_/ # # @author Christian Ribeaud ## import os, sys, csv, string from optparse import OptionParser INSTANCE_PREFIX = 'Instance_' GROUP_PREFIX = 'Group_' PROJECT_PREFIX = 'Project_' OBSERVABLE_TYPE_PREFIX = 'ObservableType_' BARCODE_PREFIX = 'Barcode_' SAMPLE_PREFIX = 'Sample_' DATASET_PREFIX = 'Dataset_' IMAGE_ANALYSIS = 'IMAGE ANALYSIS' IMAGE = 'IMAGE' # The observable type mapping, order is important here OBSERVABLE_TYPE_MAP = {IMAGE_ANALYSIS :'HCS_IMAGE_ANALYSIS_DATA', IMAGE :'HCS_IMAGE'} unmappedDataSetDirs = [] """ Data set directories that could no be found in the mapping file. """ class DataSet: """ Small class which defines a data set, composed of a code and a location. Both are unique. """ processed = False def __init__(self, code, originalLocation, location): self.code = code self.originalLocation = originalLocation self.location = location def __str__(self): return "[code=" + self.code + ",originalLocation=" + self.originalLocation + ",location=" + self.location + "]" def renameBarcodeDir(barcodeDir): """ Renames 'Barcode_' to 'Sample_'. Finally returns the new sample directory name if it has been renamed. """ assert os.path.isdir(barcodeDir) and os.path.basename(barcodeDir).startswith(BARCODE_PREFIX) basename = os.path.basename(barcodeDir) renamed = renameFile(barcodeDir, replaceBarcode(basename)) if renamed is not None: return renamed return barcodeDir def replaceBarcode(text): return text.replace(BARCODE_PREFIX, SAMPLE_PREFIX, 1) def renameObservableTypeDir(observableTypeDir): """ Renames IMAGE ANALYSIS to HCS_IMAGE_ANALYSIS_DATA and IMAGE to HCS_IMAGE. Finally returns the new observable type directory name if it has been renamed. """ assert os.path.isdir(observableTypeDir) and os.path.basename(observableTypeDir).startswith(OBSERVABLE_TYPE_PREFIX) basename = os.path.basename(observableTypeDir) renamed = renameFile(observableTypeDir, replaceObservableType(basename)) if renamed is not None: return renamed return observableTypeDir def renameFile(file, newName): """ Renames given file to given new name and returns it if it does not already exist. Return None otherwise. """ newFile = os.path.join(os.path.dirname(file), newName) if not os.path.exists(newFile): os.rename(file, newFile) return newFile def getFirstChildDirectory(path): """ Returns None or the first sub-directory that is found in given path. Exits with an error (indicating a wrong structure) if there is more than one directory. """ dirs = [] for file in os.listdir(path): if os.path.isdir(os.path.join(path, file)): dirs.append(file) else: warning("'%s' is not a directory." % os.path.abspath(file)) size = len(dirs) if size == 1: return dirs[0] elif size > 1: input = None while input not in dirs: input = raw_input(("%s directories has been found in '%s':\n" + " %s. Which one should I take? ") % (size, os.path.abspath(path), dirs)) return input def renameDatasetDir(dataSets, dataSetDir, organization=None): """ Renames given data set directory and updates given data sets map. Finally returns the new data set directory name. """ assert os.path.isdir(dataSetDir) # In both cases, 3V and IMSB, the location starts with project name in the mapping file. key = extractProjectPath(dataSetDir) # Append the child directory to the key as it is present in the location found in the # mapping file. firstChildDirectory = getFirstChildDirectory(dataSetDir) if firstChildDirectory is not None: key = os.path.join(key, firstChildDirectory) if dict(dataSets).has_key(key): dataSet = dict(dataSets).get(key) newDataSetDir = renameFile(dataSetDir, DATASET_PREFIX + dataSet.code) dataSet.processed = True dataSet.location = os.path.normpath(os.path.join(newDataSetDir, firstChildDirectory)) return newDataSetDir unmappedDataSetDirs.append(dataSetDir) return dataSetDir def replaceObservableType(text): """ Replaces the old observable type found in given text with the corresponding one. As we can not rely on the order in a dictionary and as we absolutely need to start with IMAGE_ANALYSIS, we implement this using a list. """ for key in [IMAGE_ANALYSIS, IMAGE]: text = text.replace(OBSERVABLE_TYPE_PREFIX + key, OBSERVABLE_TYPE_PREFIX + OBSERVABLE_TYPE_MAP[key], 1) return text def readMappingFile(mappingFile): """ Reads and store the code-location mapping file """ assert os.path.isfile(mappingFile) file = open(mappingFile, "r") reader = csv.reader(file, delimiter='\t') dataSets = {} try: try: for row in reader: cols = [] for col in row: cols.append(string.strip(col)) originalLocation = cols[1] location = replaceBarcode(replaceObservableType(originalLocation)) dataSet = DataSet(cols[0], originalLocation, location) dataSets[location] = dataSet except csv.Error, e: error("A problem has occurred while parsing file '%s', line %d: %s" % (mappingFile, reader.line_num, e)) finally: file.close() return dataSets def writeMappingFile(dataSets, groupDir, mappingFile): """ Writes out the mapping file with given code-locations mappings """ writer = open(mappingFile, "w") groupDir = groupDir[string.find(groupDir, INSTANCE_PREFIX):] try: for dataSet in dict(dataSets).values(): location = os.path.join(groupDir, extractProjectPath(dataSet.location)) print >>writer, (dataSet.code + "\t" + location) finally: writer.close() def checkFile(file, type): """ Checks given file of given type ("file" or "dir"). """ assert type in ("file", "dir") abspath = os.path.abspath(file) if not os.path.exists(file): error("'%s' does not exist." % abspath) if type == "dir": if not os.path.isdir(file): error("Given path '%s' is not a directory" % abspath) elif type == "file": if not os.path.isfile(file): error("Given path '%s' is not a file" % abspath) def getProjectDirs(path = os.curdir): """ Returns a list of project directories that have been found in current directory. """ projectDirs = [] for projectDirName in os.listdir(path): projectPath = os.path.join(path, projectDirName) if os.path.isdir(projectPath): if projectDirName.startswith(PROJECT_PREFIX): projectDirs.append(projectPath) else: warning("Directory '%s' is not a project directory and will not be processed." % os.path.abspath(projectPath)) else: warning("File '%s' is not a directory." % os.path.abspath(projectPath)) return projectDirs def createGroupDir(instanceName, groupName, dataStoreDir = os.curdir): """ Create group directory (instance sub-directory) in given data store root directory only if it does not already exists. """ instanceDirName = INSTANCE_PREFIX + instanceName groupDirName = GROUP_PREFIX + groupName groupDir = os.path.join(os.path.join(dataStoreDir, instanceDirName), groupDirName) if not os.path.exists(groupDir): os.makedirs(groupDir) assert os.path.exists(groupDir) return groupDir def processProjectDir(dataSets, projectDir, organization): """ Processes given project directory. """ for dir in dirWalk(dict(dataSets), projectDir, organization, 5): pass def dirWalk(dataSets, directory, organization, maxLevel = 10): """ Walks a directory tree, using a generator, renames certain directories and updates given data sets map on the way. """ if maxLevel == 0: return dirName = os.path.basename(directory) # Rename observable type directory. This changes the current directory and must be reset. if dirName.startswith(OBSERVABLE_TYPE_PREFIX): directory = renameObservableTypeDir(directory) # Rename barcode directory. This changes the current directory and must be reset. if dirName.startswith(BARCODE_PREFIX): directory = renameBarcodeDir(directory) # Rename data set directory. This changes the current directory and must be reset. if isDatasetDirectory(directory): directory = renameDatasetDir(dataSets, directory, organization) for subDirName in os.listdir(directory): subDirectory = os.path.join(directory, subDirName) if not os.path.isdir(subDirectory): warning("Given sub-directory '%s' is not a directory" % os.path.abspath(subDirectory)) continue for x in dirWalk(dataSets, subDirectory, organization, maxLevel - 1): yield x def isDatasetDirectory(directory): """ Whether given directory is a data set directory or not. As 'Barcode_' has been renamed to 'Sample_', we look now for it. """ parent = os.path.dirname(directory) basename = os.path.basename(directory) try: return os.path.basename(parent).startswith(SAMPLE_PREFIX) and (basename.startswith(DATASET_PREFIX) or int(basename)); except ValueError, e: return False def error(message, code = 2): """ Prints error message and exits with given code. """ print >>sys.stderr, "ERROR:", message sys.exit(code) def warning(message): """ Prints warning message """ print "WARNING:", message def getProjectParentDir(organization, storeDir = os.curdir): """ Returns the directory which contains the projects. As side effect, checks whether this directory exists. """ if organization is not None: storeDir = os.path.join(storeDir, organization) checkFile(storeDir, "dir") return storeDir def extractProjectPath(path): """ Extracts project path in given path by removing everything that prepends PROJECT_PREFIX """ return path[string.find(path, PROJECT_PREFIX):] def normalizeDataSetDir(groupDir, dataSetDir): """ Normalizes given data set directory (by extracting the project path and prepending the group directory). """ return os.path.join(groupDir, extractProjectPath(dataSetDir)) def printUnprocessedMappings(dataSets, storeDir): """ Prints out mappings that have not been processed and returns the number of processed mappings. """ header = True counter = 0 for dataSet in dataSets.values(): if not dataSet.processed: if header: warning(("Following locations found in the original mapping file have not been " + "found in data store directory '%s':") % os.path.abspath(storeDir)) header = False print " '%s'" % dataSet.originalLocation else: counter += 1 return counter def printUnmappedLocations(groupDir): """ Print out locations that could not be found in the mapping file. """ size = len(unmappedDataSetDirs) if size > 0: warning(("%d data set directories found in the file system have not " + "been found in the original mapping file:") % size) for dataSetDir in unmappedDataSetDirs: print " '%s'" % os.path.abspath(normalizeDataSetDir(groupDir, dataSetDir)) def main(): """ Main method. """ parser = OptionParser("usage: %prog [options] ") parser.add_option("-g", "--group", action="store", dest="group", default="CISD", help="group code (default: \"CISD\")", metavar="GROUP") parser.add_option("-m", "--mapping", action="store", dest="mapping", default="code_location_mapping.tsv", help="code-location mapping file path (default: \"./code_location_mapping.tsv\")", metavar="FILE") parser.add_option("-s", "--store", action="store", dest="store", default=".", help="data store directory (default: \".\")", metavar="PATH") parser.add_option("-o", "--organization", action="store", default=None, dest="organization", metavar="ORGANIZATION", help="organization sub-directory in which the projects are located") (options, args) = parser.parse_args() if len(args) == 1: instanceName = args[0] else: parser.print_help() sys.exit(1) mapping = os.path.abspath(options.mapping) checkFile(mapping, "file") store = os.path.abspath(options.store) checkFile(store, "dir") groupDir = createGroupDir(instanceName, options.group, store) dataSets = readMappingFile(mapping) print "Mapping file '%s' loaded: %d code-location mappings have been found." % (mapping, len(dataSets)) os.chdir(store) organization = options.organization projectDirs = getProjectDirs(getProjectParentDir(organization)) size = len(projectDirs) if size > 0: print "%d project directories have been found in '%s'." % (size, store) else: warning("No project directory has been found in '%s'." % store) sys.exit(1) # Process each project directory and, finally, move it into the group directory. for projectDir in projectDirs: newProjectDir = os.path.join(groupDir, os.path.basename(projectDir)) print "Processing and renaming project directory '%s'." % os.path.abspath(projectDir) processProjectDir(dataSets, projectDir, organization) os.rename(projectDir, newProjectDir) if organization is not None: # Remove organization folder, should be empty reaching this point os.rmdir(organization) counter = printUnprocessedMappings(dataSets, store) printUnmappedLocations(groupDir) if counter > 0: newMapping = os.path.join(os.path.dirname(mapping), 'new_' + os.path.basename(mapping)) writeMappingFile(dataSets, groupDir, newMapping) print ("%s data sets have been renamed. New code-location mappings " + "file '%s' has been written out.") % (counter, os.path.abspath(newMapping)) else: print "No data set has been renamed." if __name__ == '__main__': main()