diff options
author | kballou <kballou@devnulllabs.io> | 2014-07-18 00:12:20 -0600 |
---|---|---|
committer | kballou <kballou@devnulllabs.io> | 2014-07-23 00:42:08 -0600 |
commit | 6b6ee5b49232222edf47a75be914859e2e0ac3da (patch) | |
tree | 483aac434d44de6e5ef58cffa78855e042b286ab /xnt | |
parent | 1093509c044d2420e8f659ae304b45a5ed72ca32 (diff) | |
download | xnt-6b6ee5b49232222edf47a75be914859e2e0ac3da.tar.gz xnt-6b6ee5b49232222edf47a75be914859e2e0ac3da.tar.xz |
Refactor: Add NOOP
Change all modules to return a tuple (of tuples) of functions that would be
executed. The resulting tuples look similar to:
((references_to_real_function, dictionary_of_arguments), ...)
The tuple can be applied by calling the function reference, passing the
dictionary (expanded with `**` magic). Such a function is defined as
`xnt.tasks.__apply__`.
The following are affected modules:
* `xnt.tasks`
* `xnt.build.cc`
* `xnt.build.make`
* `xnt.build.tex`
* `xnt.vcs.git`
- Patch includes added tests
* `xnt.vcs.hg`
- Patch includes added tests
* `xnt.vcs.cvs`
- Patch includes added tests
Diffstat (limited to 'xnt')
-rw-r--r-- | xnt/__init__.py | 22 | ||||
-rw-r--r-- | xnt/build/cc.py | 148 | ||||
-rw-r--r-- | xnt/build/make.py | 119 | ||||
-rw-r--r-- | xnt/build/tex.py | 127 | ||||
-rw-r--r-- | xnt/tasks.py | 287 | ||||
-rw-r--r-- | xnt/tests/compilercollectiontests.py | 178 | ||||
-rw-r--r-- | xnt/tests/maketests.py | 193 | ||||
-rw-r--r-- | xnt/tests/taskcompressiontests.py | 22 | ||||
-rw-r--r-- | xnt/tests/taskcopytests.py | 44 | ||||
-rw-r--r-- | xnt/tests/taskmisctests.py | 160 | ||||
-rw-r--r-- | xnt/tests/taskmkdirtests.py | 29 | ||||
-rw-r--r-- | xnt/tests/taskmovetests.py | 34 | ||||
-rw-r--r-- | xnt/tests/taskremovetests.py | 30 | ||||
-rw-r--r-- | xnt/tests/textests.py | 107 | ||||
-rw-r--r-- | xnt/tests/vcscvstests.py | 57 | ||||
-rw-r--r-- | xnt/tests/vcsgittests.py | 62 | ||||
-rw-r--r-- | xnt/tests/vcshgtests.py | 62 | ||||
-rw-r--r-- | xnt/vcs/cvs.py | 50 | ||||
-rw-r--r-- | xnt/vcs/git.py | 42 | ||||
-rw-r--r-- | xnt/vcs/hg.py | 42 |
20 files changed, 1077 insertions, 738 deletions
diff --git a/xnt/__init__.py b/xnt/__init__.py index 507041f..1a700e2 100644 --- a/xnt/__init__.py +++ b/xnt/__init__.py @@ -43,17 +43,17 @@ __license__ = """ VERBOSE = False -from xnt.tasks import cp -from xnt.tasks import mv -from xnt.tasks import mkdir -from xnt.tasks import rm -from xnt.tasks import create_zip -from xnt.tasks import log -from xnt.tasks import xntcall -from xnt.tasks import call -from xnt.tasks import setup -from xnt.tasks import which -from xnt.tasks import in_path +from xnt.tasks import __copy__ +from xnt.tasks import __move__ +from xnt.tasks import __mkdir__ +from xnt.tasks import __remove__ +from xnt.tasks import __zip__ +from xnt.tasks import __log__ +from xnt.tasks import __xntcall__ +from xnt.tasks import __call__ +from xnt.tasks import __setup__ +from xnt.tasks import __which__ +from xnt.tasks import __in_path__ def target(*args, **kwargs): """Decorator function for marking a method in diff --git a/xnt/build/cc.py b/xnt/build/cc.py index d993653..4123777 100644 --- a/xnt/build/cc.py +++ b/xnt/build/cc.py @@ -22,105 +22,87 @@ Definition of commonly used compilers import os import logging -from xnt.tasks import call, which +from xnt.tasks import __apply__ +from xnt.tasks import __call__ +from xnt.tasks import __which__ LOGGER = logging.getLogger(__name__) -def gcc(src, flags=None): - """gcc compiler, non-named output file +def __gcc__(src, output=None, flags=None): + """gcc compiler - :param src: C source file to compile with default `gcc` + :param src: C source file to compile + :param output: Output filename :param flags: List of flags to pass onto the compiler """ - assert which("gcc") - return _gcc(src, flags) - -def gpp(src, flags=None): + def __compile__(**kwargs): + '''Perform GCC compilation''' + assert __apply__(__which__("gcc")) + cmd = ["gcc", kwargs['infile']] + if kwargs.get('flags', None): + for flag in kwargs['flags']: + cmd.append(flag) + if kwargs.get('outfile', None): + cmd.append('-o') + cmd.append(kwargs['outfile']) + return __apply__(__call__(cmd)) + args = {'infile': src, 'outfile': output, 'flags': flags,} + return ((__compile__, args),) + +def __gpp__(src, output=None, flags=None): """g++ compiler, non-named output file - :param src: C++ source file to compile with default `g++` - :param flags: List of flags to pass onto the compiler - """ - assert which("g++") - return _gcc(src, flags, "g++") - -def gcc_o(src, output, flags=None): - """gcc compiler, with output file - - :param src: C source file to compile with default `gcc` - :param output: Name of resulting object or executable + :param src: C++ source file to compile + :param output: Output filename :param flags: List of flags to pass onto the compiler """ - assert which("gcc") - return _gcc_o(src, output, flags) - -def gpp_o(src, output, flags=None): - """g++ compiler, with output file - - :param src: C++ source file to compile with default `g++` - :param output: Name of resulting object or executable - :param flags: List of flags to pass onto the compiler - """ - assert which("g++") - return _gcc_o(src, output, flags, "g++") - -def javac(src, flags=None): + def __compile__(**kwargs): + '''Perform G++ compilation''' + assert __apply__(__which__("g++")) + cmd = ["g++", kwargs['outfile'],] + if kwargs.get('flags', None): + for flag in kwargs['flags']: + cmd.append(flag) + if kwargs.get('outfile', None): + cmd.append('-o') + cmd.append(kwargs['outfile']) + return __apply__(__call__(cmd)) + args = {'infile': src, 'outfile': output, 'flags': flags,} + return ((__compile__, args),) + +def __javac__(src, flags=None): """Javac: compile Java programs :param src: Java source file to compile with default `javac` :param flags: List of flags to pass onto the compiler """ - assert which("javac") - LOGGER.info("Compiling %s", src) - cmd = __generate_command(src, flags, "javac") - return __compile(cmd) - -def nvcc(src, flags=None): + def __compile__(**kwargs): + '''Perform javac compilation''' + assert __apply__(__which__("javac")) + cmd = ['javac', kwargs['sourcefiles'],] + if kwargs.get('flags', None): + for flag in kwargs['flags']: + cmd.append(flag) + return __apply__(__call__(cmd)) + return ((__compile__, {'sourcefiles': src, 'flags': flags,}),) + +def __nvcc__(src, output=None, flags=None): """NVCC: compile CUDA C/C++ programs - :param src: CUDA source file to compile with default `nvcc` + :param src: CUDA source file to compile + :param output: Output filename :param flags: List of flags to pass onto the compiler """ - assert which('nvcc') - return _gcc(src, flags, compiler='nvcc') - -def nvcc_o(src, output, flags=None): - """NVCC: compile with named output - - :param src: CUDA source file to compile with default `nvcc` - :param output: Name of resulting object or executable - :param flags: List of flags to pass onto the compiler - """ - assert which('nvcc') - return _gcc_o(src, output, flags, compiler='nvcc') - -def _gcc(src, flags=None, compiler="gcc"): - """Compile using gcc""" - LOGGER.info("Compiling %s", src) - return __compile(__generate_command(src, flags, compiler)) - -def _gcc_o(src, output, flags=None, compiler="gcc"): - """Compile with gcc specifying output file name""" - LOGGER.info("Compiling %s to %s", src, output) - cmd = __generate_command(src, flags, compiler) - cmd.append("-o") - cmd.append(output) - return __compile(cmd) - -def __generate_command(src, flags=None, compiler="gcc"): - """Generate cmd list for call""" - cmd = [compiler, src] - if flags: - for flag in flags: - cmd.append(flag) - return cmd - -def __compile(cmd): - """Run Compile, using `xnt.tasks.call`""" - return call(cmd) - -def __is_newer(file_a, file_b): - """Compare mmtimes of files - Return True if `file_a` is modified later than `file_b` - """ - return os.path.getmtime(file_a) > os.path.getmtime(file_b) + def __compile__(**kwargs): + '''Perform NVCC compilation''' + assert __apply__(__call__('nvcc')) + cmd = ['nvcc', kwargs['infile'],] + if kwargs.get('flags', None): + for flag in kwargs['flags']: + cmd.append(flag) + if kwargs.get('outfile', None): + cmd.append('-o') + cmd.append(kwargs['outfile']) + return __apply__(__call__(cmd)) + args = {'infile': src, 'outfile': output, 'flags': flags,} + return ((__compile__, args),) diff --git a/xnt/build/make.py b/xnt/build/make.py index 9319d05..da569a9 100644 --- a/xnt/build/make.py +++ b/xnt/build/make.py @@ -18,10 +18,11 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -import subprocess -from xnt.tasks import which +from xnt.tasks import __apply__ +from xnt.tasks import __call__ +from xnt.tasks import __which__ -def ant(target, path="", flags=None, pkeys=None, pvalues=None): +def __ant__(target, path=None, flags=None, pkeys=None, pvalues=None): """Wrapper around Apache Ant Invoke Apache Ant in either the current working directory or the specified @@ -37,15 +38,27 @@ def ant(target, path="", flags=None, pkeys=None, pvalues=None): :param pkeys: List of keys to combine with pvalues to pass to Ant :param pvalues: List of values to combine with pkeys to pass to Ant """ - assert which("ant") - cmd = __add_params(["ant"], - __build_param_list(pkeys, pvalues), - lambda x: "-D%s" % x) - cmd = __add_flags(cmd, flags) - cmd.append(target) - return __run_in(path, lambda: subprocess.call(cmd)) - -def make(target, path="", flags=None, pkeys=None, pvalues=None): + def __execute__(**kwargs): + '''Perform execution of ant''' + assert __apply__(__which__("ant")) + cmd = ['ant',] + if 'flags' in kwargs: + for flag in kwargs['flags']: + cmd.append(flag) + if 'pkeys' in kwargs and 'pvalues' in kwargs: + for param in zip(kwargs['pkeys'], kwargs['pvalues']): + cmd.append('-D%s=%s' % param) + cmd.append(target) + # TODO: this will need to wait for an upstream refactor + return __run_in(path, lambda: __apply__(__call__(cmd))) + args = {'target': target, + 'flags': flags, + 'pkeys': pkeys, + 'pvalues': pvalues, + 'path': path,} + return ((__execute__, args),) + +def __make__(target, path=None, flags=None, pkeys=None, pvalues=None): """Wrapper around GNU Make Invoke Gnu Make (*make*) in either the current working directory or the @@ -61,13 +74,27 @@ def make(target, path="", flags=None, pkeys=None, pvalues=None): :param pkeys: List of keys, zipped with pvalues, to pass to Make :param pvalues: List of values, zipped with pkeys, to pass to Make """ - assert which("make") - cmd = __add_params(["make"], __build_param_list(pkeys, pvalues)) - cmd = __add_flags(cmd, flags) - cmd.append(target) - return __run_in(path, lambda: subprocess.call(cmd)) - -def nant(target, path="", flags=None, pkeys=None, pvalues=None): + def __execute__(**kwargs): + '''Perform invocation of make''' + assert __apply__(__which__("make")) + cmd = ['make',] + if 'flags' in kwargs: + for flag in kwargs['flags']: + cmd.append(flag) + if 'pkeys' in kwargs and 'pvalues' in kwargs: + for param in zip(kwargs['pkeys'], kwargs['pvalues']): + cmd.append('%s=%s' % param) + cmd.append(target) + # TODO: this will need to wait for an upstream refactor + return __run_in(path, lambda: __apply__(__call__(cmd))) + args = {'target': target, + 'flags': flags, + 'pkeys': pkeys, + 'pvalues': pvalues, + 'path': path,} + return ((__execute__, args),) + +def __nant__(target, path=None, flags=None, pkeys=None, pvalues=None): """Wrapper around .NET Ant Invoke NAnt in either the current working directory or the specified @@ -83,41 +110,25 @@ def nant(target, path="", flags=None, pkeys=None, pvalues=None): :param pkeys: List of keys, zipped with pvalues, to pass to NAnt :param pvalues: List of values, zipped with pkeys, to pass to NAnt """ - assert which("nant") - cmd = __add_params(["nant"], - __build_param_list(pkeys, pvalues), - lambda x: "-D:%s" % x) - cmd = __add_flags(cmd, flags) - cmd.append(target) - return __run_in(path, lambda: subprocess.call(cmd)) - -def __add_flags(cmd, flags): - """Add flags to command and return new list""" - if not flags: - return cmd - command = list(cmd) - for flag in flags: - command.append(flag) - return command - -def __build_param_list(keys, values): - """Build a list of key-value for appending to the command list""" - parameters = [] - if not keys or not values: - return parameters - params = zip(keys, values) - for param in params: - parameters.append("%s=%s" % param) - return parameters - -def __add_params(cmd, params, param_map=lambda x: x): - """Append parameters to cmd list using fn""" - if not params: - return cmd - command = list(cmd) - for param in params: - command.append(param_map(param)) - return command + def __execute__(**kwargs): + '''Perform execution of ant''' + assert __apply__(__which__("nant")) + cmd = ['nant',] + if 'flags' in kwargs: + for flag in kwargs['flags']: + cmd.append(flag) + if 'pkeys' in kwargs and 'pvalues' in kwargs: + for param in zip(kwargs['pkeys'], kwargs['pvalues']): + cmd.append('-D%s=%s' % param) + cmd.append(target) + # TODO: this will need to wait for an upstream refactor + return __run_in(path, lambda: __apply__(__call__(cmd))) + args = {'target': target, + 'flags': flags, + 'pkeys': pkeys, + 'pvalues': pvalues, + 'path': path,} + return ((__execute__, args),) def __run_in(path, function): """Execute function while in a different running directory""" diff --git a/xnt/build/tex.py b/xnt/build/tex.py index 5ff5e3e..7eb95a4 100644 --- a/xnt/build/tex.py +++ b/xnt/build/tex.py @@ -24,10 +24,10 @@ from xnt import VERBOSE LOGGER = logging.getLogger(__name__) -def pdflatex(document, - path="./", - bibtex=False, - makeglossary=False): +def __pdflatex__(document, + directory=None, + bibtex=False, + makeglossary=False): """Generate PDF LaTeX Document Use `pdflatex` to build a LaTeX PDF document. Can optionally execute steps @@ -38,64 +38,81 @@ def pdflatex(document, :param bibtex: Flag to or not to add bibtex. Default: False :param makeglossary: Flag to or not to add a glossary. Default: False """ - devnull = None if VERBOSE else open(os.devnull, 'w') - documentbase = os.path.splitext(document)[0] - cwd = os.getcwd() - os.chdir(path) - def pdf(draftmode=False): - """Generate PDF""" - cmd = ["pdflatex", document, "-halt-on-error",] - if draftmode: - cmd.append('-draftmode') - return xnt.tasks.call(cmd, stdout=devnull) + def __execute__(**kwargs): + '''Perform pdflatex build''' + devnull = None if VERBOSE else open(os.devnull, 'w') + documentbase = os.path.splitext(document)[0] + cwd = os.getcwd() + os.chdir(kwargs['directory']) + def pdf(draftmode=False): + """Generate PDF""" + from xnt.tasks import __call__, __apply__ + cmd = ["pdflatex", document, "-halt-on-error",] + if draftmode: + cmd.append('-draftmode') + return __apply__(__call__(cmd, stdout=devnull)) - def run_bibtex(): - """Generate BibTex References""" - return xnt.tasks.call(["bibtex", documentbase + ".aux"], - stdout=devnull) + def run_bibtex(): + """Generate BibTex References""" + from xnt.tasks import __call__, __apply__ + return __apply__(__call__(["bibtex", documentbase + ".aux"], + stdout=devnull)) - def makeglossaries(): - """Generate Glossary""" - return xnt.tasks.call(["makeglossaries", documentbase], stdout=devnull) + def makeglossaries(): + """Generate Glossary""" + from xnt.tasks import __call__, __apply__ + return __apply__(__call__(["makeglossaries", documentbase], + stdout=devnull)) - error_codes = [] - error_codes.append(pdf(draftmode=True)) - if makeglossary: - error_codes.append(makeglossaries()) - if bibtex: - error_codes.append(run_bibtex()) + error_codes = [] error_codes.append(pdf(draftmode=True)) - error_codes.append(pdf(draftmode=False)) - os.chdir(cwd) - if devnull: - devnull.close() - return sum(error_codes) + if kwargs['makeglossary']: + error_codes.append(makeglossaries()) + if kwargs['bibtex']: + error_codes.append(run_bibtex()) + error_codes.append(pdf(draftmode=True)) + error_codes.append(pdf(draftmode=False)) + os.chdir(cwd) + if devnull: + devnull.close() + return sum(error_codes) + args = {'texdocument': document, + 'directory': directory if directory else os.getcwd(), + 'bibtex': bibtex, + 'makeglossary': makeglossary,} + return ((__execute__, args),) -def clean(path="./", remove_pdf=False): + +def __clean__(directory=None, remove_pdf=False): """Clean up generated files of PDF compilation :param path: Directory of output files, if different than current directory :param remove_pdf: Flag to remove the PDF. Default: False """ - cwd = os.getcwd() - os.chdir(path) - xnt.tasks.rm("*.out", - "*.log", - "*.aux", - "*.toc", - "*.tol", - "*.tof", - "*.tot", - "*.bbl", - "*.blg", - "*.nav", - "*.snm", - "*.mtc", - "*.mtc0", - "*.glo", - "*.ist", - "*.glg", - "*.gls") - if remove_pdf: - xnt.tasks.rm("*.pdf") - os.chdir(cwd) + def __execute__(**kwargs): + '''Perform clean operation''' + cwd = os.getcwd() + os.chdir(kwargs['directory']) + xnt.tasks.rm("*.out", + "*.log", + "*.aux", + "*.toc", + "*.tol", + "*.tof", + "*.tot", + "*.bbl", + "*.blg", + "*.nav", + "*.snm", + "*.mtc", + "*.mtc0", + "*.glo", + "*.ist", + "*.glg", + "*.gls") + if kwargs['remove_pdf']: + xnt.tasks.rm("*.pdf") + os.chdir(cwd) + args = {'directory': directory if directory else os.getcwd(), + 'remove_pdf': remove_pdf,} + return ((__execute__, args),) diff --git a/xnt/tasks.py b/xnt/tasks.py index b8a7ad8..2a761f4 100644 --- a/xnt/tasks.py +++ b/xnt/tasks.py @@ -42,36 +42,31 @@ def expandpath(path_pattern): """ return glob.iglob(path_pattern) -def cp(src="", dst="", files=None): - """Copy `src` to `dst` or copy `files` to `dst` +def __copy__(srcdir=None, dstdir=None, files=None): + """Copy `srcdir` to `dstdir` or copy `files` to `dstdir` Copy a file or folder to a different file/folder - If no `src` file is specified, will attempt to copy `files` to `dst` + If no `srcdir` file is specified, will attempt to copy `files` to `dstdir` *Notice*, elements of `files` will not be expanded before copying. - :param src: source directory or file - :param dst: destination file or folder (in the case of `files`) + :param srcdir: source directory or file + :param dstdir: destination file or folder (in the case of `files`) :param files: list of files (strings) to copy to `src` """ - assert dst and src or len(files) > 0 - LOGGER.info("Copying %s to %s", src, dst) - def copy(source, destination): - """Copy file or folder to destination. - - Depending on the type of source, call the appropriate method - """ - if os.path.isdir(source): - shutil.copytree(source, destination) - else: - shutil.copy2(source, destination) - if src: - copy(src, dst) - else: - for file_to_copy in files: - copy(file_to_copy, dst) - -def mv(src, dst): + def __execute__(**kwargs): + """Perform copy""" + assert 'dstdir' in kwargs + if 'srcdir' in kwargs: + shutil.copytree(kwargs['srcdir'], kwargs['dstdir']) + elif 'files' in kwargs: + for srcfile in kwargs['files']: + shutil.copy(srcfile, kwargs['dstdir']) + return ( + (__execute__, {'srcdir': srcdir, 'dstdir': dstdir, 'files': files,}), + ) + +def __move__(src, dst): """Move `src` to `dst` Move (copy and remove) the source file or directory (*src*) to the @@ -80,10 +75,14 @@ def mv(src, dst): :param src: Source file or folder to move :param dst: Destination file or folder """ - LOGGER.info("Moving %s to %s", src, dst) - shutil.move(src, dst) - -def mkdir(directory, mode=0o777): + def __execute__(**kwargs): + '''Perform move''' + LOGGER.info("Moving %s to %s", kwargs['src'], kwargs['dst']) + shutil.move(kwargs['src'], kwargs['dst']) + args = {'src': src, 'dst': dst,} + return ((__execute__, args),) + +def __mkdir__(directory, mode=0o755): """Make a directory with mode Create a directory specified by *dir* with default mode (where supported) @@ -95,18 +94,26 @@ def mkdir(directory, mode=0o777): :param directory: New directory to create :param mode: Mode to create the directory (where supported). Default: `777` """ - if os.path.exists(directory): - LOGGER.warning("Given directory (%s) already exists", directory) - return - LOGGER.info("Making directory %s with mode %o", directory, mode) - try: - os.mkdir(directory, mode) - except IOError as io_error: - log(io_error, logging.ERROR) - except: - raise - -def rm(*fileset): + def __execute__(**kwargs): + '''Perform directory creation''' + if os.path.exists(directory): + LOGGER.warning( + "Given directory (%s) already exists", + kwargs['directory']) + return + LOGGER.info( + "Making directory %s with mode %o", + kwargs['directory'], + kwargs['mode']) + try: + os.mkdir(kwargs['directory'], kwargs['mode']) + except IOError as io_error: + LOGGER.error(io_error) + except: + raise + return ((__execute__, {'directory': directory, 'mode': mode,}),) + +def __remove__(*fileset): """Remove a set of files Attempt to remove all the directories given by the fileset. Before *rm* @@ -116,23 +123,26 @@ def rm(*fileset): :param fileset: List of files to remove """ - - try: - for glob_set in fileset: - for file_to_delete in expandpath(glob_set): - if not os.path.exists(file_to_delete): - continue - LOGGER.info("Removing %s", file_to_delete) - if os.path.isdir(file_to_delete): - shutil.rmtree(file_to_delete) - else: - os.remove(file_to_delete) - except OSError as os_error: - log(os_error, logging.ERROR) - except: - raise - -def create_zip(directory, zipfilename): + def __execute__(**kwargs): + '''Perform file/ folder removal''' + try: + for glob_set in kwargs['fileset']: + for file_to_delete in expandpath(glob_set): + if not os.path.exists(file_to_delete): + continue + LOGGER.info("Removing %s", file_to_delete) + if os.path.isdir(file_to_delete): + shutil.rmtree(file_to_delete) + else: + os.remove(file_to_delete) + except OSError as os_error: + LOGGER.error(os_error) + except: + raise + args = {'fileset': fileset,} + return ((__execute__, args),) + +def __zip__(directory, zipfilename): """Compress (Zip) folder Zip the specified *directory* into the zip file named *zipfilename* @@ -140,20 +150,23 @@ def create_zip(directory, zipfilename): :param directory: Directory to zip :param zipfilename: Name of resulting compression """ - LOGGER.info("Zipping %s as %s", directory, zipfilename) - assert os.path.isdir(directory) and zipfilename - with contextlib.closing(zipfile.ZipFile( - zipfilename, - "w", - zipfile.ZIP_DEFLATED)) as zip_file: - for paths in os.walk(directory): - for file_name in paths[2]: - absfn = os.path.join(paths[0], file_name) - zip_file_name = absfn[len(directory) + len(os.sep):] - zip_file.write(absfn, zip_file_name) + def __execute__(**kwargs): + '''Perform zip''' + assert os.path.isdir(kwargs['directory']) and kwargs['zipfile'] + LOGGER.info("Zipping %s as %s", kwargs['directory'], kwargs['zipfile']) + with contextlib.closing(zipfile.ZipFile( + kwargs['zipfile'], + "w", + zipfile.ZIP_DEFLATED)) as zip_file: + for paths in os.walk(kwargs['directory']): + for file_name in paths[2]: + absfn = os.path.join(paths[0], file_name) + zip_file_name = absfn[len(directory) + len(os.sep):] + zip_file.write(absfn, zip_file_name) + return ((__execute__, {'directory': directory, 'zipfile': zipfilename,}),) #Misc Tasks -def echo(msg, tofile): +def __echo__(msg, tofile): """Write a string to file Write the given *msg* to a file named *tofile* @@ -163,10 +176,13 @@ def echo(msg, tofile): :param msg: Message to write to file :param tofile: file to which the message is written """ - with open(tofile, "w") as file_to_write: - file_to_write.write(msg) + def __execute__(**kwargs): + '''Perform echo to file''' + with open(kwargs['tofile'], "w") as file_to_write: + file_to_write.write(kwargs['msg']) + return ((__execute__, {'msg': msg, 'tofile': tofile,}),) -def log(msg="", lvl=logging.INFO): +def __log__(msg, lvl=logging.INFO): """Log *msg* using tasks global logger Emit the message (*msg*) to the *xnt.tasks* logger using either the default @@ -175,25 +191,35 @@ def log(msg="", lvl=logging.INFO): :param msg: Message to log :param lvl: Log Level of message. Default `INFO` """ - LOGGER.log(lvl, msg) + def __execute__(**kwargs): + '''Perform logging operation''' + LOGGER.log(kwargs['lvl'], kwargs['msg']) + return ((__execute__, {'msg': msg, 'lvl': lvl,}),) -def xntcall(buildfile, targets=None, props=None): +def __xntcall__(buildfile, targets=None, props=None): """Invoke xnt on another build file in a different directory :param: path - to the build file (including build file) :param: targets - list of targets to execute :param: props - dictionary of properties to pass to the build module """ - from xnt.xenant import invoke_build, load_build - build = load_build(buildfile) - path = os.path.dirname(buildfile) - cwd = os.getcwd() - os.chdir(path) - error_code = invoke_build(build, targets=targets, props=props) - os.chdir(cwd) - return error_code - -def call(command, stdout=None, stderr=None): + def __execute__(**kwargs): + '''Perform xntcall''' + from xnt.xenant import invoke_build, load_build + build = load_build(kwargs['buildfile']) + path = os.path.dirname(kwargs['buildfile']) + cwd = os.getcwd() + os.chdir(path) + error_code = invoke_build( + build, + targets=kwargs['targets'], + props=kwargs['props']) + os.chdir(cwd) + return error_code + args = {'buildfile': buildfile, 'targets': targets, 'props': props, } + return ((__execute__, args),) + +def __call__(command, stdout=None, stderr=None): """ Execute the given command, redirecting stdout and stderr to optionally given files @@ -202,51 +228,82 @@ def call(command, stdout=None, stderr=None): :param: stderr - file to redirect standard error to, if given :return: the error code of the subbed out call, `$?` """ - return subprocess.call(args=command, stdout=stdout, stderr=stderr) - -def setup(commands, directory=""): + def __execute__(**kwargs): + '''Perform subprocess call''' + return subprocess.call( + args=kwargs['command'], + stdout=kwargs['stdout'], stderr=kwargs['stderr']) + args = {'command': command, 'stdout': stdout, 'stderr': stderr,} + return ((__execute__, args),) + +def __setup__(command=None, commands=None, directory=None): """Invoke the ``setup.py`` file in the current or specified directory + :param: command - a single command to run :param: commands - list of commands and options to run/ append :param: dir - (optional) directory to run from :return: the error code of the execution, `$?` """ - cmd = [sys.executable, "setup.py",] - for command in commands: - cmd.append(command) - cwd = os.getcwd() - if directory: - os.chdir(directory) - error_code = call(cmd) - os.chdir(cwd) - return error_code - -def which(program): + def __execute__(**kwargs): + '''Perform python setup.py commands''' + cmd = [sys.executable, "setup.py",] + for command in kwargs['commands']: + cmd.append(command) + cwd = os.getcwd() + if kwargs['directory']: + os.chdir(kwargs['directory']) + call = __call__(cmd)[0] + error_code = call[0](**call[1]) + os.chdir(cwd) + return error_code + if not commands: + commands = [] + if command: + commands.append(command) + assert len(commands) > 0 + args = {'commands': commands, 'directory': directory,} + return ((__execute__, args),) + +def __which__(program): """Similar to Linux/Unix `which`: return (first) path of executable :param program: program name to search for in PATH :return: Return the PATH of `program` or None """ - def is_exe(fpath): - """Determine if argument exists and is executable""" - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - fpath = os.path.split(program) - if fpath[0]: - if is_exe(program): - return program - else: - for path in os.environ["PATH"].split(os.pathsep): - path = path.strip('"') - exe_file = os.path.join(path, program) - if is_exe(exe_file): - return exe_file - return None - -def in_path(program): + def __execute__(**kwargs): + '''Perform which lookup''' + def is_exe(fpath): + """Determine if argument exists and is executable""" + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) + + fpath = os.path.split(kwargs['program']) + if fpath[0]: + if is_exe(kwargs['program']): + return kwargs['program'] + else: + for path in os.environ["PATH"].split(os.pathsep): + path = path.strip('"') + exe_file = os.path.join(path, kwargs['program']) + if is_exe(exe_file): + return exe_file + return None + + return ((__execute__, {'program': program,}),) + +def __in_path__(program): """Return boolean result if program is in PATH environment variable :param program: Program name to search for in PATH :return: Return the PATH of `program` or None """ - return which(program) + def __execute__(**kwargs): + '''Perform which test''' + return __apply__(__which__(kwargs['program'])) + return ((__execute__, {'program': program,}),) + +def __apply__(func_tuple): + '''Execute function tuple''' + for statement in func_tuple: + func = statement[0] + args = statement[1] + func(**args) diff --git a/xnt/tests/compilercollectiontests.py b/xnt/tests/compilercollectiontests.py index 20d894f..0a7139e 100644 --- a/xnt/tests/compilercollectiontests.py +++ b/xnt/tests/compilercollectiontests.py @@ -17,144 +17,142 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import shutil -import xnt -import xnt.build.cc as cc +from xnt.build.cc import __gcc__ +from xnt.build.cc import __gpp__ +from xnt.build.cc import __nvcc__ +from xnt.build.cc import __javac__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 -@unittest.skipUnless(xnt.in_path("gcc"), "gcc is not in your path") class GccTests(unittest.TestCase): """Test GCC""" def setUp(self): """Test Case Setup""" - os.mkdir("temp") - with open("temp/hello.c", "w") as test_code: - test_code.write(""" - #include <stdio.h> - int main() { - printf("Hello, World!\\n"); - return 0; - } - """) + pass def tearDown(self): """Test Case Teardown""" - shutil.rmtree("temp") + pass def test_gcc(self): """Test Default GCC""" - cwd = os.getcwd() - os.chdir("temp") - cc.gcc("hello.c") - self.assertTrue(os.path.isfile("a.out")) - os.chdir(cwd) - - def test_gcc_o(self): + result = __gcc__("hello.c") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) + + def test_gcc_with_output(self): """Test GCC with output""" - cc.gcc_o("temp/hello.c", "temp/hello") - self.assertTrue(os.path.isfile("temp/hello")) + result = __gcc__("hello.c", output="hello") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue("outfile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) -@unittest.skipUnless(xnt.in_path("g++"), "g++ is not in your path") +#pylint: disable-msg=C0103 class GppTests(unittest.TestCase): """Test G++ (C++ GCC)""" def setUp(self): """Test Case Setup""" - os.mkdir("temp") - with open("temp/hello.cpp", "w") as test_code: - test_code.write(""" - #include <iostream> - int main() { - std::cout << "Hello, World!" << std::endl; - return 0; - } - """) + pass def tearDown(self): """Test Case Teardown""" - shutil.rmtree("temp") + pass def test_gpp(self): """Test Default G++""" - cwd = os.getcwd() - os.chdir("temp") - cc.gpp("hello.cpp") - self.assertTrue("a.out") - os.chdir(cwd) - - def test_gpp_o(self): + result = __gpp__("hello.cpp") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) + + def test_gpp_with_output(self): """Test G++ with output""" - cc.gpp_o("temp/hello.cpp", "temp/hello") - self.assertTrue(os.path.isfile("temp/hello")) + result = __gpp__("hello.cpp", output="hello") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue("outfile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) -@unittest.skipUnless(xnt.in_path('nvcc'), 'nvcc is not in your path') +#pylint: disable-msg=C0103 class NvccTests(unittest.TestCase): """Test NVCC""" def setUp(self): """Test Case Setup""" - os.mkdir("temp") - with open("temp/hello.cu", "w") as test_code: - test_code.write(""" - __global__ void kernel(float *x) { - int idx = blockIdx.x; - x[idx] = 42; - } - int main() { - int size = sizeof(float) * 128; - float *x = (float*)malloc(size); - float *dev_x; - cudaMalloc((void**)&dev_x, size); - cudaMemcpy(dev_x, x, size, cudaMemcpyHostToDevice); - kernel<<<128, 1>>>(dev_x); - cudaMemcpy(x, dev_x, size, cudaMemcpyDeviceToHost); - cudaFree(dev_x); - delete(x); - x = NULL; - }""") + pass + def tearDown(self): """Test Case Teardown""" - shutil.rmtree("temp") + pass def test_nvcc(self): """Test Default NVCC""" - cwd = os.getcwd() - os.chdir("temp") - cc.nvcc("hello.cu") - self.assertTrue(os.path.isfile("a.out")) - os.chdir(cwd) - - def test_nvcc_o(self): + result = __nvcc__("hello.cu") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) + + def test_nvcc_with_output(self): """Test Named Output NVCC""" - cc.nvcc_o("temp/hello.cu", "temp/hello") - self.assertTrue(os.path.isfile("temp/hello")) + result = __nvcc__("hello.cu", output="hello") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("infile" in result[0][1]) + self.assertTrue("outfile" in result[0][1]) + self.assertTrue('flags' in result[0][1]) -@unittest.skipUnless(xnt.in_path("javac"), "javac is not in your path") class JavacTests(unittest.TestCase): """Test Javac""" def setUp(self): """Test Case Setup""" - os.mkdir("temp") - with open("temp/hello.java", "w") as test_code: - test_code.write(""" - class hello { - public static void main(String[] args) { - System.out.println("Hello, World!"); - } - } - """) + pass def tearDown(self): """Test Case Teardown""" - shutil.rmtree("temp") + pass def test_javac(self): """Test Default Javac""" - cwd = os.getcwd() - os.chdir("temp") - cc.javac("hello.java") - self.assertTrue(os.path.isfile("hello.class")) - os.chdir(cwd) + result = __javac__("HelloWorld.java") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("sourcefiles" in result[0][1]) + self.assertTrue('flags' in result[0][1]) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/maketests.py b/xnt/tests/maketests.py index c022b12..a3d17d0 100644 --- a/xnt/tests/maketests.py +++ b/xnt/tests/maketests.py @@ -17,127 +17,162 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from xnt.build.make import __make__ +from xnt.build.make import __ant__ +from xnt.build.make import __nant__ +from types import FunctionType import unittest -import xnt -import xnt.build.make -import xnt.tests -@unittest.skipUnless(xnt.in_path("ant") or xnt.in_path("ant.exe"), - "Apache ant is not in your path") class AntTests(unittest.TestCase): """Test Case for Ant Build""" def setUp(self): """Test Setup""" - xnt.tests.set_up() - with open("temp/build.xml", "w") as build: - build.write("<?xml version=\"1.0\" ?>\n") - build.write("<project name=\"test\" default=\"test\">\n") - build.write("<target name=\"test\">\n") - build.write("<echo>${test_var}</echo>\n") - build.write("</target>\n") - build.write("</project>\n") + pass def tearDown(self): """Test Teardown""" - xnt.tests.tear_down() + pass def test_default_build(self): """Test the default target of ant""" - result = xnt.build.make.ant(target="test", path="temp") - self.assertEqual(result, 0) - - def test_passing_flags(self): - """Test ant with passing flags""" - result = xnt.build.make.ant(target="test", - path="temp", - flags=["-verbose"]) - self.assertEqual(result, 0) - - def test_pass_var(self): + result = __ant__(target="test") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('target' in result[0][1]) + self.assertEqual('test', result[0][1]['target']) + + def test_ant_when_given_path(self): + """Test ant when passing given a path""" + result = __ant__(target="test", path="some/other/path/build.xml") + self.assertIsNotNone(result) + self.assertTrue('path' in result[0][1]) + self.assertEqual('some/other/path/build.xml', result[0][1]['path']) + + def test_ant_when_given_flags(self): + """Test passing flags to ant""" + result = __ant__(target='test', flags=['-verbose']) + self.assertIsNotNone(result) + self.assertTrue('flags' in result[0][1]) + self.assertEqual(1, len(result[0][1]['flags'])) + self.assertEqual('-verbose', result[0][1]['flags'][0]) + + def test_ant_when_given_vars(self): """Test passing variables to ant""" - result = xnt.build.make.ant(target="test", path="temp", - pkeys=["test_var"], - pvalues=["testing"]) - self.assertEqual(result, 0) + result = __ant__(target="test", + pkeys=["test_var"], + pvalues=["testing"]) + self.assertIsNotNone(result) + self.assertTrue('pkeys' in result[0][1]) + self.assertTrue('pvalues' in result[0][1]) + self.assertEqual(1, len(result[0][1]['pkeys'])) + self.assertEqual(1, len(result[0][1]['pvalues'])) + self.assertEqual('test_var', result[0][1]['pkeys'][0]) + self.assertEqual('testing', result[0][1]['pvalues'][0]) -@unittest.skipUnless(xnt.in_path("make"), "make is not in your path") class MakeTests(unittest.TestCase): """GNU Make Tests""" def setUp(self): """Test Setup""" - xnt.tests.set_up() - with open("temp/Makefile", "w") as makefile: - makefile.write("build:\n") - makefile.write("\techo 'testing'\n") + pass def tearDown(self): """Test Teardown""" - xnt.tests.tear_down() + pass def test_default_make(self): """Test Default make""" - result = xnt.build.make.make(target="build", path="temp") - self.assertEqual(result, 0) + result = __make__(target="build") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(len(result[0]), 2) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('target' in result[0][1]) + self.assertEqual('build', result[0][1]['target']) + + def test_make_with_path(self): + '''Test make given a path''' + result = __make__(target="build", path="some/other/path/Makefile") + self.assertIsNotNone(result) + self.assertTrue('path' in result[0][1]) + self.assertEqual('some/other/path/Makefile', result[0][1]['path']) def test_passing_vars(self): """Test Parameter Passing with Make""" - result = xnt.build.make.make(target="build", - path="temp", - pkeys=["test_var"], - pvalues=["testing"]) - self.assertEqual(result, 0) + result = __make__(target='build', + pkeys=["test_var"], + pvalues=["testing"]) + self.assertIsNotNone(result) + self.assertTrue('pkeys' in result[0][1]) + self.assertTrue('pvalues' in result[0][1]) + self.assertEqual(1, len(result[0][1]['pkeys'])) + self.assertEqual(1, len(result[0][1]['pvalues'])) + self.assertEqual('test_var', result[0][1]['pkeys'][0]) + self.assertEqual('testing', result[0][1]['pvalues'][0]) def test_passing_flags(self): """Test Flag Passing with Make""" - result = xnt.build.make.make(target="build", - path="temp", - flags=["-B"]) - self.assertEqual(result, 0) + result = __make__(target='build', flags=['-B']) + self.assertIsNotNone(result) + self.assertTrue('flags' in result[0][1]) + self.assertEqual(1, len(result[0][1]['flags'])) + self.assertEqual('-B', result[0][1]['flags'][0]) -@unittest.skipUnless(xnt.in_path("nant") or xnt.in_path("nant.exe"), - "nant is not in your path") class NAntTests(unittest.TestCase): """.NET Ant Tests""" def setUp(self): """Test Setup""" - xnt.tests.set_up() - with open("temp/default.build", "w") as default_build: - default_build.write("<?xml version=\"1.0\"?>\n") - default_build.write("<project name=\"test\">\n") - default_build.write("<target name=\"test\">\n") - default_build.write("<if \n") - default_build.write("test=\"${property::exists('test_var')}\">\n") - default_build.write("<echo>${test_var}</echo>\n") - default_build.write("</if>\n") - default_build.write("</target>\n") - default_build.write("</project>") + pass def tearDown(self): """Test Teardown""" - xnt.tests.tear_down() + pass - def test_default_nant(self): + def test_nant_with_target(self): """Test Deault nant""" - result = xnt.build.make.nant(target="test", path="temp") - self.assertEqual(result, 0) - - def test_parameters_passing(self): - """Test Parameter Passing with NAnt""" - result = xnt.build.make.nant(target="test", - path="temp", - pkeys=["test_var"], - pvalues=["testing"]) - self.assertEqual(result, 0) - - def test_flag_passing(self): - """Test Flag Passing with NAnt""" - result = xnt.build.make.nant(target="test", - path="temp", - flags=["-v"]) - self.assertEqual(result, 0) - + result = __nant__(target='test') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('target' in result[0][1]) + self.assertEqual('test', result[0][1]['target']) + + def test_nant_with_path(self): + '''Test NAnt with path''' + result = __nant__(target='test', path='some/other/path/build.xml') + self.assertIsNotNone(result) + self.assertTrue('path' in result[0][1]) + self.assertEqual('some/other/path/build.xml', result[0][1]['path']) + + def test_nant_with_parameters(self): + '''Test NAnt with parameters''' + result = __nant__(target="test", + pkeys=["test_var"], + pvalues=["testing"]) + self.assertIsNotNone(result) + self.assertTrue('pkeys' in result[0][1]) + self.assertTrue('pvalues' in result[0][1]) + self.assertEqual(1, len(result[0][1]['pkeys'])) + self.assertEqual(1, len(result[0][1]['pvalues'])) + self.assertEqual('test_var', result[0][1]['pkeys'][0]) + + def test_nant_with_flags(self): + '''Test NAnt with flags''' + result = __nant__(target="test", flags=["-v"]) + self.assertIsNotNone(result) + self.assertTrue('flags' in result[0][1]) + self.assertEqual(1, len(result[0][1]['flags'])) + self.assertEqual('-v', result[0][1]['flags'][0]) if __name__ == '__main__': unittest.main() diff --git a/xnt/tests/taskcompressiontests.py b/xnt/tests/taskcompressiontests.py index 3640287..0d36b49 100644 --- a/xnt/tests/taskcompressiontests.py +++ b/xnt/tests/taskcompressiontests.py @@ -17,9 +17,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __zip__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 @@ -27,16 +26,25 @@ class TaskCompressionTests(unittest.TestCase): """Test Cases for Compression""" def setUp(self): """Test Case Setup""" - xnt.tests.set_up() + pass def tearDown(self): """Test Case Teardown""" - xnt.tests.tear_down() + pass def test_zip(self): """Test zip method""" - xnt.tasks.create_zip("temp/testfolder1", "temp/myzip.zip") - self.assertTrue(os.path.exists("temp/myzip.zip")) + result = __zip__(directory="testfolder", zipfilename="myzip.zip") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue("directory" in result[0][1]) + self.assertEqual("testfolder", result[0][1]['directory']) + self.assertTrue("zipfile" in result[0][1]) + self.assertEqual("myzip.zip", result[0][1]['zipfile']) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/taskcopytests.py b/xnt/tests/taskcopytests.py index 8955f1e..abe0eea 100644 --- a/xnt/tests/taskcopytests.py +++ b/xnt/tests/taskcopytests.py @@ -17,9 +17,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __copy__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 @@ -27,35 +26,34 @@ class TaskCopyTests(unittest.TestCase): """Test Case for Copy Tasks Method""" def setUp(self): """Test Setup""" - xnt.tests.set_up() + pass def tearDown(self): """Test Teardown""" - xnt.tests.tear_down() + pass def test_cp(self): """Test default use of cp""" - xnt.tasks.cp("temp/testfolder1", "temp/testfolder2") - self.assertTrue(os.path.exists("temp/testfolder2")) - self.assertTrue(os.path.exists("temp/testfolder1")) - xnt.tasks.cp("temp/testfile1", "temp/testfile5") - self.assertTrue(os.path.exists("temp/testfile5")) - self.assertTrue(os.path.exists("temp/testfile1")) - with open("temp/testfile5", "r") as testfile: - self.assertEqual("this is a test file", testfile.read()) + result = __copy__(srcdir="test0", dstdir="test1") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('srcdir' in result[0][1]) + self.assertEqual('test0', result[0][1]['srcdir']) + self.assertTrue('dstdir' in result[0][1]) + self.assertEqual('test1', result[0][1]['dstdir']) def test_cp_filelist(self): """Test filelist copy""" - xnt.tasks.cp(dst="temp/testfolder2", - files=["temp/testfile1", - "temp/testfile2", - "temp/testfile3"]) - self.assertTrue(os.path.exists("temp/testfile1")) - self.assertTrue(os.path.exists("temp/testfile2")) - self.assertTrue(os.path.exists("temp/testfile3")) - with open("temp/testfile2", "r") as testfile: - self.assertEqual("this is a test file", testfile.read()) - + result = __copy__(files=['file1', 'file2', 'file3'], dstdir='test1') + self.assertIsNotNone(result) + self.assertTrue('files' in result[0][1]) + self.assertEqual(3, len(result[0][1]['files'])) + self.assertTrue('dstdir' in result[0][1]) + self.assertEqual('test1', result[0][1]['dstdir']) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/taskmisctests.py b/xnt/tests/taskmisctests.py index c20e24b..4864e41 100644 --- a/xnt/tests/taskmisctests.py +++ b/xnt/tests/taskmisctests.py @@ -17,9 +17,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __echo__ +from xnt.tasks import __call__ +from xnt.tasks import __setup__ +from xnt.tasks import __xntcall__ +from xnt.tasks import __which__ +from xnt.tasks import __in_path__ +from xnt.tasks import __log__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 @@ -27,60 +32,121 @@ class TaskMiscTests(unittest.TestCase): """Test Misc Tasks""" def setUp(self): """Test Case Setup""" - xnt.tests.set_up() + pass def tearDown(self): """Test Case Teardown""" - xnt.tests.tear_down() + pass def test_echo(self): """Test Echo Task""" - xnt.tasks.echo("this is my cool echo", "temp/mynewcoolfile") - self.assertTrue(os.path.exists("temp/mynewcoolfile")) - with open("temp/mynewcoolfile", "r") as temp_file: - self.assertEqual("this is my cool echo", temp_file.read()) + result = __echo__(msg="foobar", tofile="mytestfile") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('msg' in result[0][1]) + self.assertEqual('foobar', result[0][1]['msg']) + self.assertTrue('tofile' in result[0][1]) + self.assertEqual('mytestfile', result[0][1]['tofile']) + + def test_log(self): + """Test log function""" + result = __log__(msg="testing the logging", lvl=40) + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('msg' in result[0][1]) + self.assertEqual('testing the logging', result[0][1]['msg']) + self.assertTrue('lvl' in result[0][1]) + self.assertEqual(40, result[0][1]['lvl']) def test_call(self): """Test Call, testing redirection""" - out = open("temp/testout", "w") - err = open("temp/testerr", "w") - xnt.tasks.call(["python", - os.path.abspath("temp/test.py"), - "42", "hello"], - out, - err) - out.close() - err.close() - self.assertTrue(os.path.exists("temp/testout")) - self.assertTrue(os.path.exists("temp/testerr")) - with open("temp/testout", "r") as std_out: - self.assertEqual("42", std_out.read()) - with open("temp/testerr", "r") as std_err: - self.assertEqual("hello", std_err.read()) - - def test_which_finds_python(self): - """Test which can find python""" - if os.name == 'posix': - path = xnt.tasks.which("python") - else: - path = xnt.tasks.which("python.exe") - self.assertIsNotNone(path) - - def test_which_dne(self): - """Test which cannot find not existent program""" - path = xnt.tasks.which("arst") - self.assertIsNone(path) - - def test_python_in_path(self): + result = __call__(['echo', 'blah']) + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('command' in result[0][1]) + self.assertEqual(2, len(result[0][1]['command'])) + self.assertEqual('echo', result[0][1]['command'][0]) + self.assertEqual('blah', result[0][1]['command'][1]) + self.assertTrue('stdout' in result[0][1]) + self.assertIsNone(result[0][1]['stdout']) + self.assertTrue('stderr' in result[0][1]) + self.assertIsNone(result[0][1]['stderr']) + + def test_setup_with_single_command(self): + '''Test setup function with a single command''' + result = __setup__(command='test') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('commands' in result[0][1]) + self.assertEqual(1, len(result[0][1]['commands'])) + + def test_setup_with_commands(self): + '''Test setup function commands''' + result = __setup__(commands=['build', 'test']) + self.assertIsNotNone(result) + self.assertTrue('commands' in result[0][1]) + self.assertEqual(2, len(result[0][1]['commands'])) + + def test_setup_with_directory(self): + '''Test setup function with directory''' + result = __setup__(command='test', directory='test/') + self.assertIsNotNone(result) + self.assertTrue('directory' in result[0][1]) + self.assertEqual('test/', result[0][1]['directory']) + + def test_xntcall(self): + """Test xntcall""" + result = __xntcall__(buildfile='test/build.py') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('buildfile' in result[0][1]) + self.assertEqual('test/build.py', result[0][1]['buildfile']) + self.assertTrue('targets' in result[0][1]) + self.assertTrue('props' in result[0][1]) + + def test_which(self): + """Test which""" + result = __which__('python') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('program' in result[0][1]) + self.assertEqual('python', result[0][1]['program']) + + def test_in_path(self): """Test in_path task""" - if os.name == 'posix': - self.assertTrue(xnt.tasks.in_path("python")) - else: - self.assertTrue(xnt.tasks.in_path("python.exe")) - - def test_arst_not_in_path(self): - """Test not in_path""" - self.assertFalse(xnt.tasks.in_path("arst")) + result = __in_path__('python') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('program' in result[0][1]) + self.assertEqual('python', result[0][1]['program']) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/taskmkdirtests.py b/xnt/tests/taskmkdirtests.py index fdfdddc..3674611 100644 --- a/xnt/tests/taskmkdirtests.py +++ b/xnt/tests/taskmkdirtests.py @@ -17,29 +17,26 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __mkdir__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 class TaskMkdirTests(unittest.TestCase): """Test Cases for Mkdir""" - def setUp(self): - """Test Case Setup""" - xnt.tests.set_up() - - def tearDown(self): - """Test Case Teardown""" - xnt.tests.tear_down() - def test_mkdir(self): """Test mkdir method""" - xnt.tasks.mkdir("temp/mynewtestfolder") - self.assertTrue(os.path.exists("temp/mynewtestfolder")) - self.assertTrue(os.path.exists("temp/testfolder1")) - xnt.tasks.mkdir("temp/testfolder1") - + result = __mkdir__("my/new/directory") + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('directory' in result[0][1]) + self.assertEqual('my/new/directory', result[0][1]['directory']) + self.assertTrue('mode' in result[0][1]) + self.assertEqual(0o755, result[0][1]['mode']) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/taskmovetests.py b/xnt/tests/taskmovetests.py index 3aed96c..7daaf9e 100644 --- a/xnt/tests/taskmovetests.py +++ b/xnt/tests/taskmovetests.py @@ -17,32 +17,26 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __move__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 class TaskMoveTests(unittest.TestCase): """Test cases for move""" - def setUp(self): - """Test Case Setup""" - xnt.tests.set_up() - - def tearDown(self): - """Test Case Teardown""" - xnt.tests.tear_down() - - def test_mv(self): + def test_move(self): """Test Moving files and folders""" - xnt.tasks.mv("temp/testfolder1", "temp/testfolder2") - self.assertTrue(os.path.exists("temp/testfolder2")) - self.assertFalse(os.path.exists("temp/testfolder1")) - xnt.tasks.mv("temp/testfile1", "temp/testfile5") - self.assertTrue(os.path.exists("temp/testfile5")) - self.assertFalse(os.path.exists("temp/testfile1")) - with open("temp/testfile5", "r") as testfile: - self.assertEqual("this is a test file", testfile.read()) + result = __move__('test0', 'test1') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('src' in result[0][1]) + self.assertTrue('dst' in result[0][1]) + self.assertEqual('test0', result[0][1]['src']) + self.assertEqual('test1', result[0][1]['dst']) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/taskremovetests.py b/xnt/tests/taskremovetests.py index cb0582b..6768852 100644 --- a/xnt/tests/taskremovetests.py +++ b/xnt/tests/taskremovetests.py @@ -17,28 +17,24 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import xnt.tasks -import xnt.tests +from xnt.tasks import __remove__ +from types import FunctionType import unittest #pylint: disable-msg=C0103 class TaskRemoveTests(unittest.TestCase): """Test Case for Remove""" - def setUp(self): - """Test case Setup""" - xnt.tests.set_up() - - def tearDown(self): - """Test case Teardown""" - xnt.tests.tear_down() - - def test_rm(self): - """Remove cases""" - xnt.tasks.rm("temp/testfolder1") - self.assertFalse(os.path.exists("temp/testfolder1")) - xnt.tasks.rm("temp/testfile1") - + def test_remove(self): + """Test removing files and folders""" + result = __remove__('test0', 'test1', '*swp') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('fileset' in result[0][1]) + self.assertEqual(3, len(result[0][1]['fileset'])) if __name__ == "__main__": unittest.main() diff --git a/xnt/tests/textests.py b/xnt/tests/textests.py index d555915..6c14c96 100644 --- a/xnt/tests/textests.py +++ b/xnt/tests/textests.py @@ -17,95 +17,62 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os +from xnt.build.tex import __pdflatex__ +from xnt.build.tex import __clean__ +from types import FunctionType import unittest -import xnt -import xnt.build.tex -import xnt.tests -@unittest.skipUnless(xnt.in_path("pdflatex") or xnt.in_path("pdflatex.exe"), - "pdflatex is not in your path") class TexTests(unittest.TestCase): """Test Case for TeX Document Building""" - def setUp(self): - """Test Setup""" - xnt.tests.set_up() - with open("temp/test.tex", "w") as test_tex: - test_tex.write('\\documentclass{article}\n') - test_tex.write('\\usepackage{glossaries}\n') - test_tex.write('\\author{python test}\n') - test_tex.write('\\date{\\today}\n') - test_tex.write('\\makeglossaries\n') - test_tex.write('\\begin{document}\n') - test_tex.write('\\nocite{*}\n') - test_tex.write('\\newglossaryentry{test}\n') - test_tex.write('{name={test},description={this}}\n') - test_tex.write('This is a \\gls{test} \\LaTeX document.\n') - test_tex.write('\\printglossaries\n') - test_tex.write('\\begin{thebibliography}{1}\n') - test_tex.write('\\bibitem{test_bib_item}\n') - test_tex.write('Leslie Lamport,\n') - test_tex.write('\\emph{\\LaTeX: A Document Preperation System}.\n') - test_tex.write('Addison Wesley, Massachusetts,\n') - test_tex.write('2nd Edition,\n') - test_tex.write('1994.\n') - test_tex.write('\\end{thebibliography}\n') - test_tex.write('\\bibliography{1}\n') - test_tex.write('\\end{document}\n') - - def tearDown(self): - """Test Teardown""" - xnt.tests.tear_down() - def test_pdflatex_build(self): """Test default pdflatex build""" - xnt.build.tex.pdflatex("test.tex", - path="temp") - self.assertTrue(os.path.exists("temp/test.pdf")) - self.assertTrue(os.path.exists("temp/test.aux")) - self.assertTrue(os.path.exists("temp/test.log")) + result = __pdflatex__('test.tex', directory='tex') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('texdocument' in result[0][1]) + self.assertEqual('test.tex', result[0][1]['texdocument']) + self.assertTrue('directory' in result[0][1]) + self.assertTrue('bibtex' in result[0][1]) + self.assertFalse(result[0][1]['bibtex']) + self.assertTrue('makeglossary' in result[0][1]) + self.assertFalse(result[0][1]['makeglossary']) def test_pdflatex_with_bibtex(self): """Test pdflatex with bibtex""" - xnt.build.tex.pdflatex("test.tex", - path="temp", - bibtex=True) - self.assertTrue(os.path.exists("temp/test.pdf")) - self.assertTrue(os.path.exists("temp/test.bbl")) - self.assertTrue(os.path.exists("temp/test.blg")) + result = __pdflatex__('test.tex', bibtex=True) + self.assertIsNotNone(result) + self.assertTrue(result[0][1]['bibtex']) def test_pdflatex_with_glossary(self): """Test pdflatex with glossary output""" - xnt.build.tex.pdflatex("test.tex", - path="temp", - makeglossary=True) - self.assertTrue(os.path.exists("temp/test.pdf")) - self.assertTrue(os.path.exists("temp/test.glo")) - self.assertTrue(os.path.exists("temp/test.glg")) - self.assertTrue(os.path.exists("temp/test.gls")) + result = __pdflatex__("test.tex", makeglossary=True) + self.assertIsNotNone(result) + self.assertTrue(result[0][1]['makeglossary']) def test_tex_clean(self): """Test the default clean method removes generated files except pdf""" - xnt.build.tex.pdflatex("test.tex", - path="temp", - bibtex=True, - makeglossary=True) - xnt.build.tex.clean(path="temp") - self.assertTrue(os.path.exists("temp/test.pdf")) - self.assertFalse(os.path.exists("temp/test.aux")) - self.assertFalse(os.path.exists("temp/test.log")) + result = __clean__(directory='tex') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('directory' in result[0][1]) + self.assertEqual('tex', result[0][1]['directory']) + self.assertTrue('remove_pdf' in result[0][1]) + self.assertFalse(result[0][1]['remove_pdf']) def test_tex_clean_include_pdf(self): """Test Clean; including PDF""" - xnt.build.tex.pdflatex("test.tex", - path="temp", - bibtex=True, - makeglossary=True) - xnt.build.tex.clean(path="temp", remove_pdf=True) - self.assertFalse(os.path.exists("temp/test.pdf")) - self.assertFalse(os.path.exists("temp/test.aux")) - self.assertFalse(os.path.exists("temp/test.log")) + result = __clean__(directory='tex', remove_pdf=True) + self.assertIsNotNone(result) + self.assertTrue(result[0][1]['remove_pdf']) if __name__ == '__main__': unittest.main() diff --git a/xnt/tests/vcscvstests.py b/xnt/tests/vcscvstests.py new file mode 100644 index 0000000..ad16d15 --- /dev/null +++ b/xnt/tests/vcscvstests.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +"""Test `xnt.vcs.cvs`""" + +# Xnt -- A Wrapper Build Tool +# Copyright (C) 2014 Kenny Ballou + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from xnt.vcs.cvs import __cvsco__ +from xnt.vcs.cvs import __cvsupdate__ +from types import FunctionType +import unittest + +class VcsCvsTests(unittest.TestCase): + '''VCS CVS Tests''' + + def test_cvsco(self): + '''Test CVS checkout''' + result = __cvsco__('mytestmodule') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('module' in result[0][1]) + self.assertEqual('mytestmodule', result[0][1]['module']) + self.assertTrue('rev' in result[0][1]) + self.assertIsNone(result[0][1]['rev']) + self.assertTrue('dest' in result[0][1]) + self.assertIsNone(result[0][1]['dest']) + + def test_cvsupdate(self): + '''Test CVS Update''' + result = __cvsupdate__('./') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('path' in result[0][1]) + self.assertEqual('./', result[0][1]['path']) + +if __name__ == "__main__": + unittest.main() diff --git a/xnt/tests/vcsgittests.py b/xnt/tests/vcsgittests.py new file mode 100644 index 0000000..5fce129 --- /dev/null +++ b/xnt/tests/vcsgittests.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Test `xnt.vcs.git`""" + +# Xnt -- A Wrapper Build Tool +# Copyright (C) 2014 Kenny Ballou + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from xnt.vcs.git import __gitclone__ +from xnt.vcs.git import __gitpull__ +from types import FunctionType +import unittest + +class VcsGitTests(unittest.TestCase): + '''Test VCS GIT''' + + def test_gitclone(self): + '''Test GIT Clone''' + url = 'git://github.com/devnulltao/Xnt.git' + result = __gitclone__(url) + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('url' in result[0][1]) + self.assertTrue('dest' in result[0][1]) + self.assertTrue('branch' in result[0][1]) + self.assertEqual(url, result[0][1]['url']) + self.assertIsNone(result[0][1]['dest']) + self.assertIsNone(result[0][1]['branch']) + + def test_gitpull(self): + '''Test GIT Pull''' + result = __gitpull__('./', remote='upstream', branch='develop') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('path' in result[0][1]) + self.assertEqual('./', result[0][1]['path']) + self.assertTrue('remote' in result[0][1]) + self.assertEqual('upstream', result[0][1]['remote']) + self.assertTrue('branch' in result[0][1]) + self.assertEqual('develop', result[0][1]['branch']) + +if __name__ == "__main__": + unittest.main() diff --git a/xnt/tests/vcshgtests.py b/xnt/tests/vcshgtests.py new file mode 100644 index 0000000..8008752 --- /dev/null +++ b/xnt/tests/vcshgtests.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +"""Test `xnt.vcs.hg`""" + +# Xnt -- A Wrapper Build Tool +# Copyright (C) 2014 Kenny Ballou + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from xnt.vcs.hg import __hgclone__ +from xnt.vcs.hg import __hgfetch__ +from types import FunctionType +import unittest + +class VcsHgTests(unittest.TestCase): + '''VCS Mercurial Tests''' + + def test_hgclone(self): + '''Test hg clone''' + url = 'https://vim.googlecode.com/hg' + result = __hgclone__(url, dest='vim') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('url' in result[0][1]) + self.assertEqual(url, result[0][1]['url']) + self.assertTrue('dest' in result[0][1]) + self.assertEqual('vim', result[0][1]['dest']) + self.assertTrue('rev' in result[0][1]) + self.assertIsNone(result[0][1]['rev']) + self.assertTrue('branch' in result[0][1]) + self.assertIsNone(result[0][1]['branch']) + + def test_hgfetch(self): + '''Test hg fetch''' + result = __hgfetch__('./', source='upstream') + self.assertIsNotNone(result) + self.assertIsInstance(result, tuple) + self.assertIsInstance(result[0], tuple) + self.assertEqual(2, len(result[0])) + self.assertIsInstance(result[0][0], FunctionType) + self.assertIsInstance(result[0][1], dict) + self.assertTrue('path' in result[0][1]) + self.assertEqual('./', result[0][1]['path']) + self.assertTrue('source' in result[0][1]) + self.assertTrue('upstream', result[0][1]['source']) + +if __name__ == "__main__": + unittest.main() diff --git a/xnt/vcs/cvs.py b/xnt/vcs/cvs.py index ccdabcd..8a0902b 100644 --- a/xnt/vcs/cvs.py +++ b/xnt/vcs/cvs.py @@ -18,35 +18,43 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -import subprocess -from xnt.tasks import which +from xnt.tasks import __apply__ +from xnt.tasks import __call__ +from xnt.tasks import __which__ -def cvsco(module, rev="", dest=""): +def __cvsco__(module, rev=None, dest=None): """Run CVS Checkout :param module: CVS Module name to checkout :param rev: Revision to checkout :param dest: Destination directory or name of checked out module """ - assert which("cvs") - cmd = ["cvs", "co", "-P"] - if rev: - cmd.append("-r") - cmd.append(rev) - if dest: - cmd.append("-d") - cmd.append(dest) - cmd.append(module) - subprocess.call(cmd) - -def cvsupdate(path): + def __execute__(**kwargs): + '''Perform CVS Checkout''' + assert __apply__(__which__("cvs")) + cmd = ["cvs", "co", "-P"] + if kwargs['rev']: + cmd.append("-r") + cmd.append(kwargs['rev']) + if kwargs['dest']: + cmd.append("-d") + cmd.append(kwargs['dest']) + cmd.append(kwargs['module']) + return __apply__(__call__(cmd)) + args = {'module': module, 'rev': rev, 'dest': dest,} + return ((__execute__, args),) + +def __cvsupdate__(path): """Run CVS Update :param path: Directory path to module to update """ - assert which("cvs") - cwd = os.path.abspath(os.getcwd()) - os.chdir(path) - cmd = ["cvs", "update"] - subprocess.call(cmd) - os.chdir(cwd) + def __execute__(**kwargs): + '''Perform CVS Update''' + assert __apply__(__which__("cvs")) + cwd = os.path.abspath(os.getcwd()) + os.chdir(kwargs['path']) + cmd = ["cvs", "update"] + __apply__(__call__(cmd)) + os.chdir(cwd) + return ((__execute__, {'path': path,}),) diff --git a/xnt/vcs/git.py b/xnt/vcs/git.py index 222ebda..34d9841 100644 --- a/xnt/vcs/git.py +++ b/xnt/vcs/git.py @@ -20,29 +20,43 @@ import os import xnt.tasks import xnt.vcs +from xnt.tasks import __apply__, __call__, __which__ -def gitclone(url, dest=None, branch=None): +def __gitclone__(url, dest=None, branch=None): """Clone a repository :param url: URI of the repository to clone :param dest: Destination directory or name of the cloned repository :param branch: Branch to clone """ - assert xnt.tasks.which("git") - command = ["git", "clone"] - command = xnt.vcs.clone_options(command, url, branch, dest) - xnt.tasks.call(command) - -def gitpull(path, source="origin", branch="master"): + def __execute__(**kwargs): + '''Perform git clone''' + assert __apply__(__which__("git")) + command = ["git", "clone"] + command = xnt.vcs.clone_options( + command, kwargs['url'], kwargs['branch'], kwargs['dest']) + return __apply__(__call__(command)) + args = {'url': url, 'dest': dest, 'branch': branch,} + return ((__execute__, args),) + +def __gitpull__(path, remote=None, branch=None): """Pull/Update a cloned repository :param path: Directory of the repository for which to pull and update - :param source: Repository's upstream source + :param remote: Repository's upstream source :param branch: Repository's upstream branch to pull from """ - assert xnt.tasks.which("git") - cwd = os.getcwd() - os.chdir(path) - command = ["git", "pull", source, branch] - xnt.tasks.call(command) - os.chdir(cwd) + def __execute__(**kwargs): + '''Perform git pull''' + assert __apply__(__which__("git")) + cwd = os.getcwd() + os.chdir(kwargs['path']) + command = ["git", "pull", kwargs['remote'], kwargs['branch']] + __apply__(__call__(command)) + os.chdir(cwd) + args = { + 'path': path, + 'remote': remote if remote else 'origin', + 'branch': branch if branch else 'master', + } + return ((__execute__, args),) diff --git a/xnt/vcs/hg.py b/xnt/vcs/hg.py index 6956d57..3633fe3 100644 --- a/xnt/vcs/hg.py +++ b/xnt/vcs/hg.py @@ -20,8 +20,9 @@ import os import xnt.tasks import xnt.vcs +from xnt.tasks import __apply__, __call__, __which__ -def hgclone(url, dest=None, rev=None, branch=None): +def __hgclone__(url, dest=None, rev=None, branch=None): """Clone a Mercurial Repository :param url: URI of repository to clone @@ -29,23 +30,32 @@ def hgclone(url, dest=None, rev=None, branch=None): :param rev: Revision to clone :param branch: Branch to clone """ - assert xnt.tasks.which("hg") - command = ["hg", "clone"] - if rev: - command.append("--rev") - command.append(rev) - command = xnt.vcs.clone_options(command, url, branch, dest) - xnt.tasks.call(command) - -def hgfetch(path, source='default'): + def __execute__(**kwargs): + '''Perform hg clone''' + assert __apply__(__which__("hg")) + command = ["hg", "clone"] + if kwargs['rev']: + command.append("--rev") + command.append(kwargs['rev']) + command = xnt.vcs.clone_options( + command, kwargs['url'], kwargs['branch'], kwargs['dest']) + return __apply__(__call__(command)) + args = {'url': url, 'dest': dest, 'rev': rev, 'branch': branch,} + return ((__execute__, args),) + +def __hgfetch__(path, source=None): """Pull and Update an already cloned Mercurial Repository :param path: Directory to the repository for which to pull changes :param source: Repository's upstream source """ - assert xnt.tasks.which("hg") - command = ["hg", "pull", "-u", source] - cwd = os.getcwd() - os.chdir(path) - xnt.tasks.call(command) - os.chdir(cwd) + def __execute__(**kwargs): + '''Perform hg pull''' + assert __apply__(__which__("hg")) + command = ["hg", "pull", "-u", kwargs['source']] + cwd = os.getcwd() + os.chdir(kwargs['path']) + __apply__(__call__(command)) + os.chdir(cwd) + args = {'path': path, 'source': source if source else 'default',} + return ((__execute__, args),) |