#!/usr/bin/env python # Copyright 2007 ETH Zuerich, CISD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # fnmatch -- Unix filename pattern matching import fnmatch import os import string import getopt import sys import csv from sets import Set from types import * # column names which contains the information seeked RACK_ID = "Rack Id" SAMPLE_ID = "Sample Id" columns = [RACK_ID, SAMPLE_ID] # command line arguments OUTPUT = "output" DIR = "dir" MAPPING = "mapping" # allowed values for MAPPING MAPPINGS = ["master", "dilution"] class ParseException(Exception): """ Exception raised for errors in the parsing. """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value) def walk(root, recurse = 0, pattern = '*', return_folders = 0): """ Walks given root directory and returns an array of files matching given pattern. """ assert type(root) is not NoneType # initialize result = [] # must have at least root folder try: names = os.listdir(root) except os.error: return result # expand pattern pattern = pattern or '*' pat_list = string.splitfields(pattern , ';') # check each file for name in names: fullname = os.path.normpath(os.path.join(root, name)) # grab if it matches our pattern and entry type for pat in pat_list: if fnmatch.fnmatch(name, pat): if os.path.isfile(fullname) or (return_folders and os.path.isdir(fullname)): result.append(fullname) continue # recursively scan other folders, appending results if recurse: if os.path.isdir(fullname) and not os.path.islink(fullname): result = result + walk(fullname, recurse, pattern, return_folders) return result def parse_header(header): """ Parses given header (which must be a list of column headers) by looking the index of global defined columns. @return: a map of column indices keyed by their column name """ assert type(header) is ListType indices = {} try: for column in columns: # you could use find method instead and check for -1 indices[column] = header.index(column) return indices except ValueError: sys.exit("Given column '%s' could not be found in header '%s'" % (column, header)) def get_barcode(value): """ From given value extract the barcode plate. """ assert type(value) is not NoneType return value.strip().split('_')[0] def parse_file(file): """ Parses given file """ assert os.path.exists(file) and os.path.isfile(file) csv_file = open(file, "r") reader = csv.reader(csv_file) try: try: header = reader.next() indices = parse_header(header) assert type(indices) is DictType # prepare result: key=column header, value=unique values found in corresponding column result = {} for column in indices.iterkeys(): result[column] = Set() # cache first row column count: next rows will be checked against it; so footer that does # not belong to parsing and that does not have the same column count will be skipped column_count = None # iterate rows for row in reader: if column_count is None: column_count = len(row) if len(row) is not column_count: continue for column, index in indices.iteritems(): result[column].add(get_barcode(row[index])) # prepare final result and make some checks for column, values in result.iteritems(): if len(values) > 1: raise ParseException, "Values of column '%s' are not unique: '%s'" % (column, values) item = list(values)[0].strip(); # if the only value we have found is an empty string, then there is nothing we can do here if len(item) == 0: return None result[column] = item return result finally: csv_file.close() except csv.Error, e: raise ParseException, "A CSV parser problem has occurred at line %d: %s" % (file, reader.line_num, e) def output_mapping(output, mapping_type, mappings): """ Print out given mappings in given output file """ assert type(output) is not NoneType and type(mappings) is DictType and mapping_type in MAPPINGS # Will overwrite if called sequentially file = open(output, 'w') print >>file, "# Generated by 'tecan-parser.py' Python script" header = eval("get_" + mapping_type + "_header")() print >>file, "\t".join(header) for parent, children in mappings.iteritems(): assert type(children) is ListType for child in children: print >>file, child + "\t" + parent file.close() def get_master_header(): """ Returns the output header file for master plate - dilution plate mappings """ return ["Dilution Plate", "Master Plate"] def get_dilution_header(): """ Returns the output header file for dilution plate - cell plate mappings """ return ["Cell Plate", "Dilution Plate"] def get_args(): """ Returns the parsed command line arguments """ # default value for output file, working directory and mapping values = {OUTPUT: 'mapping.txt', DIR: None, MAPPING: None} try: opts, args = getopt.getopt(sys.argv[1:], "ho:m:", ["help", OUTPUT + "=", MAPPING + "="]) except getopt.GetoptError: usage() sys.exit(2) # analyze the options for k, v in opts: if k in ("-h", "--help"): usage() sys.exit(0) elif k in ("-o", "--" + OUTPUT): values[OUTPUT] = v elif k in ("-m", "--" + MAPPING): if v not in MAPPINGS: usage() sys.exit(0) values[MAPPING] = v # At least the working directory must be specified and the mapping type if len(args) < 1 or values[MAPPING] is None: usage() sys.exit(2) values[DIR] = args[0] return values def check_dir(dir): """ Checks given directory. Should be called before processing it. """ if os.path.exists(dir) == False: print "Given directory path '%s' does not exist" % dir return False if os.path.isdir(dir) == False: print "Given path '%s' is not a directory" % dir return False return True def usage(): """ Print the usage of this script. """ print """ Usage: tecan-parser.py [-h/--help] -m /--mapping= -o /--output= where --help instructs this script to display this message and exit; --output= gives the file into which this script should place its output; --mapping= mapping type we are going to make: a MASTER plate - dilution plate or a DILUTION plate - cell plate mapping; is the directory where the files that should be parsed could be found. """ def main(): """ Main method. """ args = get_args(); assert type(args) is DictType if check_dir(args[DIR]) == False: sys.exit(2) files = walk(args[DIR], 0, '*', 0) print "%u files have been found in '%s'" % (len(files), args[DIR]) mappings = {} for file in files: print "Parsing file '%s'" % file mapping = parse_file(file) if mapping != None: assert type(mapping) is DictType print "File '%s' provides following mapping '%s'" % (file, mapping) parent = mapping[SAMPLE_ID] child = mapping[RACK_ID] if mappings.has_key(parent): children = mappings[parent] children.append(child) else: children = [child] mappings[parent] = children else: print "No mapping could be detected for file '%s'" % file output_mapping(args[OUTPUT], args[MAPPING], mappings) print "Extracted mappings could be found in '%s' and are ready to load in the LIMS system" % args[OUTPUT] if __name__ == '__main__': try: main() except ParseException, e: print e.value sys.exit(2)