summaryrefslogtreecommitdiff
path: root/xnt
diff options
context:
space:
mode:
authorkballou <kballou@devnulllabs.io>2014-07-18 00:12:20 -0600
committerkballou <kballou@devnulllabs.io>2014-07-23 00:42:08 -0600
commit6b6ee5b49232222edf47a75be914859e2e0ac3da (patch)
tree483aac434d44de6e5ef58cffa78855e042b286ab /xnt
parent1093509c044d2420e8f659ae304b45a5ed72ca32 (diff)
downloadxnt-6b6ee5b49232222edf47a75be914859e2e0ac3da.tar.gz
xnt-6b6ee5b49232222edf47a75be914859e2e0ac3da.tar.xz
Refactor: Add NOOP
Change all modules to return a tuple (of tuples) of functions that would be executed. The resulting tuples look similar to: ((references_to_real_function, dictionary_of_arguments), ...) The tuple can be applied by calling the function reference, passing the dictionary (expanded with `**` magic). Such a function is defined as `xnt.tasks.__apply__`. The following are affected modules: * `xnt.tasks` * `xnt.build.cc` * `xnt.build.make` * `xnt.build.tex` * `xnt.vcs.git` - Patch includes added tests * `xnt.vcs.hg` - Patch includes added tests * `xnt.vcs.cvs` - Patch includes added tests
Diffstat (limited to 'xnt')
-rw-r--r--xnt/__init__.py22
-rw-r--r--xnt/build/cc.py148
-rw-r--r--xnt/build/make.py119
-rw-r--r--xnt/build/tex.py127
-rw-r--r--xnt/tasks.py287
-rw-r--r--xnt/tests/compilercollectiontests.py178
-rw-r--r--xnt/tests/maketests.py193
-rw-r--r--xnt/tests/taskcompressiontests.py22
-rw-r--r--xnt/tests/taskcopytests.py44
-rw-r--r--xnt/tests/taskmisctests.py160
-rw-r--r--xnt/tests/taskmkdirtests.py29
-rw-r--r--xnt/tests/taskmovetests.py34
-rw-r--r--xnt/tests/taskremovetests.py30
-rw-r--r--xnt/tests/textests.py107
-rw-r--r--xnt/tests/vcscvstests.py57
-rw-r--r--xnt/tests/vcsgittests.py62
-rw-r--r--xnt/tests/vcshgtests.py62
-rw-r--r--xnt/vcs/cvs.py50
-rw-r--r--xnt/vcs/git.py42
-rw-r--r--xnt/vcs/hg.py42
20 files changed, 1077 insertions, 738 deletions
diff --git a/xnt/__init__.py b/xnt/__init__.py
index 507041f..1a700e2 100644
--- a/xnt/__init__.py
+++ b/xnt/__init__.py
@@ -43,17 +43,17 @@ __license__ = """
VERBOSE = False
-from xnt.tasks import cp
-from xnt.tasks import mv
-from xnt.tasks import mkdir
-from xnt.tasks import rm
-from xnt.tasks import create_zip
-from xnt.tasks import log
-from xnt.tasks import xntcall
-from xnt.tasks import call
-from xnt.tasks import setup
-from xnt.tasks import which
-from xnt.tasks import in_path
+from xnt.tasks import __copy__
+from xnt.tasks import __move__
+from xnt.tasks import __mkdir__
+from xnt.tasks import __remove__
+from xnt.tasks import __zip__
+from xnt.tasks import __log__
+from xnt.tasks import __xntcall__
+from xnt.tasks import __call__
+from xnt.tasks import __setup__
+from xnt.tasks import __which__
+from xnt.tasks import __in_path__
def target(*args, **kwargs):
"""Decorator function for marking a method in
diff --git a/xnt/build/cc.py b/xnt/build/cc.py
index d993653..4123777 100644
--- a/xnt/build/cc.py
+++ b/xnt/build/cc.py
@@ -22,105 +22,87 @@ Definition of commonly used compilers
import os
import logging
-from xnt.tasks import call, which
+from xnt.tasks import __apply__
+from xnt.tasks import __call__
+from xnt.tasks import __which__
LOGGER = logging.getLogger(__name__)
-def gcc(src, flags=None):
- """gcc compiler, non-named output file
+def __gcc__(src, output=None, flags=None):
+ """gcc compiler
- :param src: C source file to compile with default `gcc`
+ :param src: C source file to compile
+ :param output: Output filename
:param flags: List of flags to pass onto the compiler
"""
- assert which("gcc")
- return _gcc(src, flags)
-
-def gpp(src, flags=None):
+ def __compile__(**kwargs):
+ '''Perform GCC compilation'''
+ assert __apply__(__which__("gcc"))
+ cmd = ["gcc", kwargs['infile']]
+ if kwargs.get('flags', None):
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if kwargs.get('outfile', None):
+ cmd.append('-o')
+ cmd.append(kwargs['outfile'])
+ return __apply__(__call__(cmd))
+ args = {'infile': src, 'outfile': output, 'flags': flags,}
+ return ((__compile__, args),)
+
+def __gpp__(src, output=None, flags=None):
"""g++ compiler, non-named output file
- :param src: C++ source file to compile with default `g++`
- :param flags: List of flags to pass onto the compiler
- """
- assert which("g++")
- return _gcc(src, flags, "g++")
-
-def gcc_o(src, output, flags=None):
- """gcc compiler, with output file
-
- :param src: C source file to compile with default `gcc`
- :param output: Name of resulting object or executable
+ :param src: C++ source file to compile
+ :param output: Output filename
:param flags: List of flags to pass onto the compiler
"""
- assert which("gcc")
- return _gcc_o(src, output, flags)
-
-def gpp_o(src, output, flags=None):
- """g++ compiler, with output file
-
- :param src: C++ source file to compile with default `g++`
- :param output: Name of resulting object or executable
- :param flags: List of flags to pass onto the compiler
- """
- assert which("g++")
- return _gcc_o(src, output, flags, "g++")
-
-def javac(src, flags=None):
+ def __compile__(**kwargs):
+ '''Perform G++ compilation'''
+ assert __apply__(__which__("g++"))
+ cmd = ["g++", kwargs['outfile'],]
+ if kwargs.get('flags', None):
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if kwargs.get('outfile', None):
+ cmd.append('-o')
+ cmd.append(kwargs['outfile'])
+ return __apply__(__call__(cmd))
+ args = {'infile': src, 'outfile': output, 'flags': flags,}
+ return ((__compile__, args),)
+
+def __javac__(src, flags=None):
"""Javac: compile Java programs
:param src: Java source file to compile with default `javac`
:param flags: List of flags to pass onto the compiler
"""
- assert which("javac")
- LOGGER.info("Compiling %s", src)
- cmd = __generate_command(src, flags, "javac")
- return __compile(cmd)
-
-def nvcc(src, flags=None):
+ def __compile__(**kwargs):
+ '''Perform javac compilation'''
+ assert __apply__(__which__("javac"))
+ cmd = ['javac', kwargs['sourcefiles'],]
+ if kwargs.get('flags', None):
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ return __apply__(__call__(cmd))
+ return ((__compile__, {'sourcefiles': src, 'flags': flags,}),)
+
+def __nvcc__(src, output=None, flags=None):
"""NVCC: compile CUDA C/C++ programs
- :param src: CUDA source file to compile with default `nvcc`
+ :param src: CUDA source file to compile
+ :param output: Output filename
:param flags: List of flags to pass onto the compiler
"""
- assert which('nvcc')
- return _gcc(src, flags, compiler='nvcc')
-
-def nvcc_o(src, output, flags=None):
- """NVCC: compile with named output
-
- :param src: CUDA source file to compile with default `nvcc`
- :param output: Name of resulting object or executable
- :param flags: List of flags to pass onto the compiler
- """
- assert which('nvcc')
- return _gcc_o(src, output, flags, compiler='nvcc')
-
-def _gcc(src, flags=None, compiler="gcc"):
- """Compile using gcc"""
- LOGGER.info("Compiling %s", src)
- return __compile(__generate_command(src, flags, compiler))
-
-def _gcc_o(src, output, flags=None, compiler="gcc"):
- """Compile with gcc specifying output file name"""
- LOGGER.info("Compiling %s to %s", src, output)
- cmd = __generate_command(src, flags, compiler)
- cmd.append("-o")
- cmd.append(output)
- return __compile(cmd)
-
-def __generate_command(src, flags=None, compiler="gcc"):
- """Generate cmd list for call"""
- cmd = [compiler, src]
- if flags:
- for flag in flags:
- cmd.append(flag)
- return cmd
-
-def __compile(cmd):
- """Run Compile, using `xnt.tasks.call`"""
- return call(cmd)
-
-def __is_newer(file_a, file_b):
- """Compare mmtimes of files
- Return True if `file_a` is modified later than `file_b`
- """
- return os.path.getmtime(file_a) > os.path.getmtime(file_b)
+ def __compile__(**kwargs):
+ '''Perform NVCC compilation'''
+ assert __apply__(__call__('nvcc'))
+ cmd = ['nvcc', kwargs['infile'],]
+ if kwargs.get('flags', None):
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if kwargs.get('outfile', None):
+ cmd.append('-o')
+ cmd.append(kwargs['outfile'])
+ return __apply__(__call__(cmd))
+ args = {'infile': src, 'outfile': output, 'flags': flags,}
+ return ((__compile__, args),)
diff --git a/xnt/build/make.py b/xnt/build/make.py
index 9319d05..da569a9 100644
--- a/xnt/build/make.py
+++ b/xnt/build/make.py
@@ -18,10 +18,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
-import subprocess
-from xnt.tasks import which
+from xnt.tasks import __apply__
+from xnt.tasks import __call__
+from xnt.tasks import __which__
-def ant(target, path="", flags=None, pkeys=None, pvalues=None):
+def __ant__(target, path=None, flags=None, pkeys=None, pvalues=None):
"""Wrapper around Apache Ant
Invoke Apache Ant in either the current working directory or the specified
@@ -37,15 +38,27 @@ def ant(target, path="", flags=None, pkeys=None, pvalues=None):
:param pkeys: List of keys to combine with pvalues to pass to Ant
:param pvalues: List of values to combine with pkeys to pass to Ant
"""
- assert which("ant")
- cmd = __add_params(["ant"],
- __build_param_list(pkeys, pvalues),
- lambda x: "-D%s" % x)
- cmd = __add_flags(cmd, flags)
- cmd.append(target)
- return __run_in(path, lambda: subprocess.call(cmd))
-
-def make(target, path="", flags=None, pkeys=None, pvalues=None):
+ def __execute__(**kwargs):
+ '''Perform execution of ant'''
+ assert __apply__(__which__("ant"))
+ cmd = ['ant',]
+ if 'flags' in kwargs:
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if 'pkeys' in kwargs and 'pvalues' in kwargs:
+ for param in zip(kwargs['pkeys'], kwargs['pvalues']):
+ cmd.append('-D%s=%s' % param)
+ cmd.append(target)
+ # TODO: this will need to wait for an upstream refactor
+ return __run_in(path, lambda: __apply__(__call__(cmd)))
+ args = {'target': target,
+ 'flags': flags,
+ 'pkeys': pkeys,
+ 'pvalues': pvalues,
+ 'path': path,}
+ return ((__execute__, args),)
+
+def __make__(target, path=None, flags=None, pkeys=None, pvalues=None):
"""Wrapper around GNU Make
Invoke Gnu Make (*make*) in either the current working directory or the
@@ -61,13 +74,27 @@ def make(target, path="", flags=None, pkeys=None, pvalues=None):
:param pkeys: List of keys, zipped with pvalues, to pass to Make
:param pvalues: List of values, zipped with pkeys, to pass to Make
"""
- assert which("make")
- cmd = __add_params(["make"], __build_param_list(pkeys, pvalues))
- cmd = __add_flags(cmd, flags)
- cmd.append(target)
- return __run_in(path, lambda: subprocess.call(cmd))
-
-def nant(target, path="", flags=None, pkeys=None, pvalues=None):
+ def __execute__(**kwargs):
+ '''Perform invocation of make'''
+ assert __apply__(__which__("make"))
+ cmd = ['make',]
+ if 'flags' in kwargs:
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if 'pkeys' in kwargs and 'pvalues' in kwargs:
+ for param in zip(kwargs['pkeys'], kwargs['pvalues']):
+ cmd.append('%s=%s' % param)
+ cmd.append(target)
+ # TODO: this will need to wait for an upstream refactor
+ return __run_in(path, lambda: __apply__(__call__(cmd)))
+ args = {'target': target,
+ 'flags': flags,
+ 'pkeys': pkeys,
+ 'pvalues': pvalues,
+ 'path': path,}
+ return ((__execute__, args),)
+
+def __nant__(target, path=None, flags=None, pkeys=None, pvalues=None):
"""Wrapper around .NET Ant
Invoke NAnt in either the current working directory or the specified
@@ -83,41 +110,25 @@ def nant(target, path="", flags=None, pkeys=None, pvalues=None):
:param pkeys: List of keys, zipped with pvalues, to pass to NAnt
:param pvalues: List of values, zipped with pkeys, to pass to NAnt
"""
- assert which("nant")
- cmd = __add_params(["nant"],
- __build_param_list(pkeys, pvalues),
- lambda x: "-D:%s" % x)
- cmd = __add_flags(cmd, flags)
- cmd.append(target)
- return __run_in(path, lambda: subprocess.call(cmd))
-
-def __add_flags(cmd, flags):
- """Add flags to command and return new list"""
- if not flags:
- return cmd
- command = list(cmd)
- for flag in flags:
- command.append(flag)
- return command
-
-def __build_param_list(keys, values):
- """Build a list of key-value for appending to the command list"""
- parameters = []
- if not keys or not values:
- return parameters
- params = zip(keys, values)
- for param in params:
- parameters.append("%s=%s" % param)
- return parameters
-
-def __add_params(cmd, params, param_map=lambda x: x):
- """Append parameters to cmd list using fn"""
- if not params:
- return cmd
- command = list(cmd)
- for param in params:
- command.append(param_map(param))
- return command
+ def __execute__(**kwargs):
+ '''Perform execution of ant'''
+ assert __apply__(__which__("nant"))
+ cmd = ['nant',]
+ if 'flags' in kwargs:
+ for flag in kwargs['flags']:
+ cmd.append(flag)
+ if 'pkeys' in kwargs and 'pvalues' in kwargs:
+ for param in zip(kwargs['pkeys'], kwargs['pvalues']):
+ cmd.append('-D%s=%s' % param)
+ cmd.append(target)
+ # TODO: this will need to wait for an upstream refactor
+ return __run_in(path, lambda: __apply__(__call__(cmd)))
+ args = {'target': target,
+ 'flags': flags,
+ 'pkeys': pkeys,
+ 'pvalues': pvalues,
+ 'path': path,}
+ return ((__execute__, args),)
def __run_in(path, function):
"""Execute function while in a different running directory"""
diff --git a/xnt/build/tex.py b/xnt/build/tex.py
index 5ff5e3e..7eb95a4 100644
--- a/xnt/build/tex.py
+++ b/xnt/build/tex.py
@@ -24,10 +24,10 @@ from xnt import VERBOSE
LOGGER = logging.getLogger(__name__)
-def pdflatex(document,
- path="./",
- bibtex=False,
- makeglossary=False):
+def __pdflatex__(document,
+ directory=None,
+ bibtex=False,
+ makeglossary=False):
"""Generate PDF LaTeX Document
Use `pdflatex` to build a LaTeX PDF document. Can optionally execute steps
@@ -38,64 +38,81 @@ def pdflatex(document,
:param bibtex: Flag to or not to add bibtex. Default: False
:param makeglossary: Flag to or not to add a glossary. Default: False
"""
- devnull = None if VERBOSE else open(os.devnull, 'w')
- documentbase = os.path.splitext(document)[0]
- cwd = os.getcwd()
- os.chdir(path)
- def pdf(draftmode=False):
- """Generate PDF"""
- cmd = ["pdflatex", document, "-halt-on-error",]
- if draftmode:
- cmd.append('-draftmode')
- return xnt.tasks.call(cmd, stdout=devnull)
+ def __execute__(**kwargs):
+ '''Perform pdflatex build'''
+ devnull = None if VERBOSE else open(os.devnull, 'w')
+ documentbase = os.path.splitext(document)[0]
+ cwd = os.getcwd()
+ os.chdir(kwargs['directory'])
+ def pdf(draftmode=False):
+ """Generate PDF"""
+ from xnt.tasks import __call__, __apply__
+ cmd = ["pdflatex", document, "-halt-on-error",]
+ if draftmode:
+ cmd.append('-draftmode')
+ return __apply__(__call__(cmd, stdout=devnull))
- def run_bibtex():
- """Generate BibTex References"""
- return xnt.tasks.call(["bibtex", documentbase + ".aux"],
- stdout=devnull)
+ def run_bibtex():
+ """Generate BibTex References"""
+ from xnt.tasks import __call__, __apply__
+ return __apply__(__call__(["bibtex", documentbase + ".aux"],
+ stdout=devnull))
- def makeglossaries():
- """Generate Glossary"""
- return xnt.tasks.call(["makeglossaries", documentbase], stdout=devnull)
+ def makeglossaries():
+ """Generate Glossary"""
+ from xnt.tasks import __call__, __apply__
+ return __apply__(__call__(["makeglossaries", documentbase],
+ stdout=devnull))
- error_codes = []
- error_codes.append(pdf(draftmode=True))
- if makeglossary:
- error_codes.append(makeglossaries())
- if bibtex:
- error_codes.append(run_bibtex())
+ error_codes = []
error_codes.append(pdf(draftmode=True))
- error_codes.append(pdf(draftmode=False))
- os.chdir(cwd)
- if devnull:
- devnull.close()
- return sum(error_codes)
+ if kwargs['makeglossary']:
+ error_codes.append(makeglossaries())
+ if kwargs['bibtex']:
+ error_codes.append(run_bibtex())
+ error_codes.append(pdf(draftmode=True))
+ error_codes.append(pdf(draftmode=False))
+ os.chdir(cwd)
+ if devnull:
+ devnull.close()
+ return sum(error_codes)
+ args = {'texdocument': document,
+ 'directory': directory if directory else os.getcwd(),
+ 'bibtex': bibtex,
+ 'makeglossary': makeglossary,}
+ return ((__execute__, args),)
-def clean(path="./", remove_pdf=False):
+
+def __clean__(directory=None, remove_pdf=False):
"""Clean up generated files of PDF compilation
:param path: Directory of output files, if different than current directory
:param remove_pdf: Flag to remove the PDF. Default: False
"""
- cwd = os.getcwd()
- os.chdir(path)
- xnt.tasks.rm("*.out",
- "*.log",
- "*.aux",
- "*.toc",
- "*.tol",
- "*.tof",
- "*.tot",
- "*.bbl",
- "*.blg",
- "*.nav",
- "*.snm",
- "*.mtc",
- "*.mtc0",
- "*.glo",
- "*.ist",
- "*.glg",
- "*.gls")
- if remove_pdf:
- xnt.tasks.rm("*.pdf")
- os.chdir(cwd)
+ def __execute__(**kwargs):
+ '''Perform clean operation'''
+ cwd = os.getcwd()
+ os.chdir(kwargs['directory'])
+ xnt.tasks.rm("*.out",
+ "*.log",
+ "*.aux",
+ "*.toc",
+ "*.tol",
+ "*.tof",
+ "*.tot",
+ "*.bbl",
+ "*.blg",
+ "*.nav",
+ "*.snm",
+ "*.mtc",
+ "*.mtc0",
+ "*.glo",
+ "*.ist",
+ "*.glg",
+ "*.gls")
+ if kwargs['remove_pdf']:
+ xnt.tasks.rm("*.pdf")
+ os.chdir(cwd)
+ args = {'directory': directory if directory else os.getcwd(),
+ 'remove_pdf': remove_pdf,}
+ return ((__execute__, args),)
diff --git a/xnt/tasks.py b/xnt/tasks.py
index b8a7ad8..2a761f4 100644
--- a/xnt/tasks.py
+++ b/xnt/tasks.py
@@ -42,36 +42,31 @@ def expandpath(path_pattern):
"""
return glob.iglob(path_pattern)
-def cp(src="", dst="", files=None):
- """Copy `src` to `dst` or copy `files` to `dst`
+def __copy__(srcdir=None, dstdir=None, files=None):
+ """Copy `srcdir` to `dstdir` or copy `files` to `dstdir`
Copy a file or folder to a different file/folder
- If no `src` file is specified, will attempt to copy `files` to `dst`
+ If no `srcdir` file is specified, will attempt to copy `files` to `dstdir`
*Notice*, elements of `files` will not be expanded before copying.
- :param src: source directory or file
- :param dst: destination file or folder (in the case of `files`)
+ :param srcdir: source directory or file
+ :param dstdir: destination file or folder (in the case of `files`)
:param files: list of files (strings) to copy to `src`
"""
- assert dst and src or len(files) > 0
- LOGGER.info("Copying %s to %s", src, dst)
- def copy(source, destination):
- """Copy file or folder to destination.
-
- Depending on the type of source, call the appropriate method
- """
- if os.path.isdir(source):
- shutil.copytree(source, destination)
- else:
- shutil.copy2(source, destination)
- if src:
- copy(src, dst)
- else:
- for file_to_copy in files:
- copy(file_to_copy, dst)
-
-def mv(src, dst):
+ def __execute__(**kwargs):
+ """Perform copy"""
+ assert 'dstdir' in kwargs
+ if 'srcdir' in kwargs:
+ shutil.copytree(kwargs['srcdir'], kwargs['dstdir'])
+ elif 'files' in kwargs:
+ for srcfile in kwargs['files']:
+ shutil.copy(srcfile, kwargs['dstdir'])
+ return (
+ (__execute__, {'srcdir': srcdir, 'dstdir': dstdir, 'files': files,}),
+ )
+
+def __move__(src, dst):
"""Move `src` to `dst`
Move (copy and remove) the source file or directory (*src*) to the
@@ -80,10 +75,14 @@ def mv(src, dst):
:param src: Source file or folder to move
:param dst: Destination file or folder
"""
- LOGGER.info("Moving %s to %s", src, dst)
- shutil.move(src, dst)
-
-def mkdir(directory, mode=0o777):
+ def __execute__(**kwargs):
+ '''Perform move'''
+ LOGGER.info("Moving %s to %s", kwargs['src'], kwargs['dst'])
+ shutil.move(kwargs['src'], kwargs['dst'])
+ args = {'src': src, 'dst': dst,}
+ return ((__execute__, args),)
+
+def __mkdir__(directory, mode=0o755):
"""Make a directory with mode
Create a directory specified by *dir* with default mode (where supported)
@@ -95,18 +94,26 @@ def mkdir(directory, mode=0o777):
:param directory: New directory to create
:param mode: Mode to create the directory (where supported). Default: `777`
"""
- if os.path.exists(directory):
- LOGGER.warning("Given directory (%s) already exists", directory)
- return
- LOGGER.info("Making directory %s with mode %o", directory, mode)
- try:
- os.mkdir(directory, mode)
- except IOError as io_error:
- log(io_error, logging.ERROR)
- except:
- raise
-
-def rm(*fileset):
+ def __execute__(**kwargs):
+ '''Perform directory creation'''
+ if os.path.exists(directory):
+ LOGGER.warning(
+ "Given directory (%s) already exists",
+ kwargs['directory'])
+ return
+ LOGGER.info(
+ "Making directory %s with mode %o",
+ kwargs['directory'],
+ kwargs['mode'])
+ try:
+ os.mkdir(kwargs['directory'], kwargs['mode'])
+ except IOError as io_error:
+ LOGGER.error(io_error)
+ except:
+ raise
+ return ((__execute__, {'directory': directory, 'mode': mode,}),)
+
+def __remove__(*fileset):
"""Remove a set of files
Attempt to remove all the directories given by the fileset. Before *rm*
@@ -116,23 +123,26 @@ def rm(*fileset):
:param fileset: List of files to remove
"""
-
- try:
- for glob_set in fileset:
- for file_to_delete in expandpath(glob_set):
- if not os.path.exists(file_to_delete):
- continue
- LOGGER.info("Removing %s", file_to_delete)
- if os.path.isdir(file_to_delete):
- shutil.rmtree(file_to_delete)
- else:
- os.remove(file_to_delete)
- except OSError as os_error:
- log(os_error, logging.ERROR)
- except:
- raise
-
-def create_zip(directory, zipfilename):
+ def __execute__(**kwargs):
+ '''Perform file/ folder removal'''
+ try:
+ for glob_set in kwargs['fileset']:
+ for file_to_delete in expandpath(glob_set):
+ if not os.path.exists(file_to_delete):
+ continue
+ LOGGER.info("Removing %s", file_to_delete)
+ if os.path.isdir(file_to_delete):
+ shutil.rmtree(file_to_delete)
+ else:
+ os.remove(file_to_delete)
+ except OSError as os_error:
+ LOGGER.error(os_error)
+ except:
+ raise
+ args = {'fileset': fileset,}
+ return ((__execute__, args),)
+
+def __zip__(directory, zipfilename):
"""Compress (Zip) folder
Zip the specified *directory* into the zip file named *zipfilename*
@@ -140,20 +150,23 @@ def create_zip(directory, zipfilename):
:param directory: Directory to zip
:param zipfilename: Name of resulting compression
"""
- LOGGER.info("Zipping %s as %s", directory, zipfilename)
- assert os.path.isdir(directory) and zipfilename
- with contextlib.closing(zipfile.ZipFile(
- zipfilename,
- "w",
- zipfile.ZIP_DEFLATED)) as zip_file:
- for paths in os.walk(directory):
- for file_name in paths[2]:
- absfn = os.path.join(paths[0], file_name)
- zip_file_name = absfn[len(directory) + len(os.sep):]
- zip_file.write(absfn, zip_file_name)
+ def __execute__(**kwargs):
+ '''Perform zip'''
+ assert os.path.isdir(kwargs['directory']) and kwargs['zipfile']
+ LOGGER.info("Zipping %s as %s", kwargs['directory'], kwargs['zipfile'])
+ with contextlib.closing(zipfile.ZipFile(
+ kwargs['zipfile'],
+ "w",
+ zipfile.ZIP_DEFLATED)) as zip_file:
+ for paths in os.walk(kwargs['directory']):
+ for file_name in paths[2]:
+ absfn = os.path.join(paths[0], file_name)
+ zip_file_name = absfn[len(directory) + len(os.sep):]
+ zip_file.write(absfn, zip_file_name)
+ return ((__execute__, {'directory': directory, 'zipfile': zipfilename,}),)
#Misc Tasks
-def echo(msg, tofile):
+def __echo__(msg, tofile):
"""Write a string to file
Write the given *msg* to a file named *tofile*
@@ -163,10 +176,13 @@ def echo(msg, tofile):
:param msg: Message to write to file
:param tofile: file to which the message is written
"""
- with open(tofile, "w") as file_to_write:
- file_to_write.write(msg)
+ def __execute__(**kwargs):
+ '''Perform echo to file'''
+ with open(kwargs['tofile'], "w") as file_to_write:
+ file_to_write.write(kwargs['msg'])
+ return ((__execute__, {'msg': msg, 'tofile': tofile,}),)
-def log(msg="", lvl=logging.INFO):
+def __log__(msg, lvl=logging.INFO):
"""Log *msg* using tasks global logger
Emit the message (*msg*) to the *xnt.tasks* logger using either the default
@@ -175,25 +191,35 @@ def log(msg="", lvl=logging.INFO):
:param msg: Message to log
:param lvl: Log Level of message. Default `INFO`
"""
- LOGGER.log(lvl, msg)
+ def __execute__(**kwargs):
+ '''Perform logging operation'''
+ LOGGER.log(kwargs['lvl'], kwargs['msg'])
+ return ((__execute__, {'msg': msg, 'lvl': lvl,}),)
-def xntcall(buildfile, targets=None, props=None):
+def __xntcall__(buildfile, targets=None, props=None):
"""Invoke xnt on another build file in a different directory
:param: path - to the build file (including build file)
:param: targets - list of targets to execute
:param: props - dictionary of properties to pass to the build module
"""
- from xnt.xenant import invoke_build, load_build
- build = load_build(buildfile)
- path = os.path.dirname(buildfile)
- cwd = os.getcwd()
- os.chdir(path)
- error_code = invoke_build(build, targets=targets, props=props)
- os.chdir(cwd)
- return error_code
-
-def call(command, stdout=None, stderr=None):
+ def __execute__(**kwargs):
+ '''Perform xntcall'''
+ from xnt.xenant import invoke_build, load_build
+ build = load_build(kwargs['buildfile'])
+ path = os.path.dirname(kwargs['buildfile'])
+ cwd = os.getcwd()
+ os.chdir(path)
+ error_code = invoke_build(
+ build,
+ targets=kwargs['targets'],
+ props=kwargs['props'])
+ os.chdir(cwd)
+ return error_code
+ args = {'buildfile': buildfile, 'targets': targets, 'props': props, }
+ return ((__execute__, args),)
+
+def __call__(command, stdout=None, stderr=None):
""" Execute the given command, redirecting stdout and stderr
to optionally given files
@@ -202,51 +228,82 @@ def call(command, stdout=None, stderr=None):
:param: stderr - file to redirect standard error to, if given
:return: the error code of the subbed out call, `$?`
"""
- return subprocess.call(args=command, stdout=stdout, stderr=stderr)
-
-def setup(commands, directory=""):
+ def __execute__(**kwargs):
+ '''Perform subprocess call'''
+ return subprocess.call(
+ args=kwargs['command'],
+ stdout=kwargs['stdout'], stderr=kwargs['stderr'])
+ args = {'command': command, 'stdout': stdout, 'stderr': stderr,}
+ return ((__execute__, args),)
+
+def __setup__(command=None, commands=None, directory=None):
"""Invoke the ``setup.py`` file in the current or specified directory
+ :param: command - a single command to run
:param: commands - list of commands and options to run/ append
:param: dir - (optional) directory to run from
:return: the error code of the execution, `$?`
"""
- cmd = [sys.executable, "setup.py",]
- for command in commands:
- cmd.append(command)
- cwd = os.getcwd()
- if directory:
- os.chdir(directory)
- error_code = call(cmd)
- os.chdir(cwd)
- return error_code
-
-def which(program):
+ def __execute__(**kwargs):
+ '''Perform python setup.py commands'''
+ cmd = [sys.executable, "setup.py",]
+ for command in kwargs['commands']:
+ cmd.append(command)
+ cwd = os.getcwd()
+ if kwargs['directory']:
+ os.chdir(kwargs['directory'])
+ call = __call__(cmd)[0]
+ error_code = call[0](**call[1])
+ os.chdir(cwd)
+ return error_code
+ if not commands:
+ commands = []
+ if command:
+ commands.append(command)
+ assert len(commands) > 0
+ args = {'commands': commands, 'directory': directory,}
+ return ((__execute__, args),)
+
+def __which__(program):
"""Similar to Linux/Unix `which`: return (first) path of executable
:param program: program name to search for in PATH
:return: Return the PATH of `program` or None
"""
- def is_exe(fpath):
- """Determine if argument exists and is executable"""
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
- fpath = os.path.split(program)
- if fpath[0]:
- if is_exe(program):
- return program
- else:
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
- return None
-
-def in_path(program):
+ def __execute__(**kwargs):
+ '''Perform which lookup'''
+ def is_exe(fpath):
+ """Determine if argument exists and is executable"""
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+
+ fpath = os.path.split(kwargs['program'])
+ if fpath[0]:
+ if is_exe(kwargs['program']):
+ return kwargs['program']
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ path = path.strip('"')
+ exe_file = os.path.join(path, kwargs['program'])
+ if is_exe(exe_file):
+ return exe_file
+ return None
+
+ return ((__execute__, {'program': program,}),)
+
+def __in_path__(program):
"""Return boolean result if program is in PATH environment variable
:param program: Program name to search for in PATH
:return: Return the PATH of `program` or None
"""
- return which(program)
+ def __execute__(**kwargs):
+ '''Perform which test'''
+ return __apply__(__which__(kwargs['program']))
+ return ((__execute__, {'program': program,}),)
+
+def __apply__(func_tuple):
+ '''Execute function tuple'''
+ for statement in func_tuple:
+ func = statement[0]
+ args = statement[1]
+ func(**args)
diff --git a/xnt/tests/compilercollectiontests.py b/xnt/tests/compilercollectiontests.py
index 20d894f..0a7139e 100644
--- a/xnt/tests/compilercollectiontests.py
+++ b/xnt/tests/compilercollectiontests.py
@@ -17,144 +17,142 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import shutil
-import xnt
-import xnt.build.cc as cc
+from xnt.build.cc import __gcc__
+from xnt.build.cc import __gpp__
+from xnt.build.cc import __nvcc__
+from xnt.build.cc import __javac__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
-@unittest.skipUnless(xnt.in_path("gcc"), "gcc is not in your path")
class GccTests(unittest.TestCase):
"""Test GCC"""
def setUp(self):
"""Test Case Setup"""
- os.mkdir("temp")
- with open("temp/hello.c", "w") as test_code:
- test_code.write("""
- #include <stdio.h>
- int main() {
- printf("Hello, World!\\n");
- return 0;
- }
- """)
+ pass
def tearDown(self):
"""Test Case Teardown"""
- shutil.rmtree("temp")
+ pass
def test_gcc(self):
"""Test Default GCC"""
- cwd = os.getcwd()
- os.chdir("temp")
- cc.gcc("hello.c")
- self.assertTrue(os.path.isfile("a.out"))
- os.chdir(cwd)
-
- def test_gcc_o(self):
+ result = __gcc__("hello.c")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
+
+ def test_gcc_with_output(self):
"""Test GCC with output"""
- cc.gcc_o("temp/hello.c", "temp/hello")
- self.assertTrue(os.path.isfile("temp/hello"))
+ result = __gcc__("hello.c", output="hello")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue("outfile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
-@unittest.skipUnless(xnt.in_path("g++"), "g++ is not in your path")
+#pylint: disable-msg=C0103
class GppTests(unittest.TestCase):
"""Test G++ (C++ GCC)"""
def setUp(self):
"""Test Case Setup"""
- os.mkdir("temp")
- with open("temp/hello.cpp", "w") as test_code:
- test_code.write("""
- #include <iostream>
- int main() {
- std::cout << "Hello, World!" << std::endl;
- return 0;
- }
- """)
+ pass
def tearDown(self):
"""Test Case Teardown"""
- shutil.rmtree("temp")
+ pass
def test_gpp(self):
"""Test Default G++"""
- cwd = os.getcwd()
- os.chdir("temp")
- cc.gpp("hello.cpp")
- self.assertTrue("a.out")
- os.chdir(cwd)
-
- def test_gpp_o(self):
+ result = __gpp__("hello.cpp")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
+
+ def test_gpp_with_output(self):
"""Test G++ with output"""
- cc.gpp_o("temp/hello.cpp", "temp/hello")
- self.assertTrue(os.path.isfile("temp/hello"))
+ result = __gpp__("hello.cpp", output="hello")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue("outfile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
-@unittest.skipUnless(xnt.in_path('nvcc'), 'nvcc is not in your path')
+#pylint: disable-msg=C0103
class NvccTests(unittest.TestCase):
"""Test NVCC"""
def setUp(self):
"""Test Case Setup"""
- os.mkdir("temp")
- with open("temp/hello.cu", "w") as test_code:
- test_code.write("""
- __global__ void kernel(float *x) {
- int idx = blockIdx.x;
- x[idx] = 42;
- }
- int main() {
- int size = sizeof(float) * 128;
- float *x = (float*)malloc(size);
- float *dev_x;
- cudaMalloc((void**)&dev_x, size);
- cudaMemcpy(dev_x, x, size, cudaMemcpyHostToDevice);
- kernel<<<128, 1>>>(dev_x);
- cudaMemcpy(x, dev_x, size, cudaMemcpyDeviceToHost);
- cudaFree(dev_x);
- delete(x);
- x = NULL;
- }""")
+ pass
+
def tearDown(self):
"""Test Case Teardown"""
- shutil.rmtree("temp")
+ pass
def test_nvcc(self):
"""Test Default NVCC"""
- cwd = os.getcwd()
- os.chdir("temp")
- cc.nvcc("hello.cu")
- self.assertTrue(os.path.isfile("a.out"))
- os.chdir(cwd)
-
- def test_nvcc_o(self):
+ result = __nvcc__("hello.cu")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
+
+ def test_nvcc_with_output(self):
"""Test Named Output NVCC"""
- cc.nvcc_o("temp/hello.cu", "temp/hello")
- self.assertTrue(os.path.isfile("temp/hello"))
+ result = __nvcc__("hello.cu", output="hello")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("infile" in result[0][1])
+ self.assertTrue("outfile" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
-@unittest.skipUnless(xnt.in_path("javac"), "javac is not in your path")
class JavacTests(unittest.TestCase):
"""Test Javac"""
def setUp(self):
"""Test Case Setup"""
- os.mkdir("temp")
- with open("temp/hello.java", "w") as test_code:
- test_code.write("""
- class hello {
- public static void main(String[] args) {
- System.out.println("Hello, World!");
- }
- }
- """)
+ pass
def tearDown(self):
"""Test Case Teardown"""
- shutil.rmtree("temp")
+ pass
def test_javac(self):
"""Test Default Javac"""
- cwd = os.getcwd()
- os.chdir("temp")
- cc.javac("hello.java")
- self.assertTrue(os.path.isfile("hello.class"))
- os.chdir(cwd)
+ result = __javac__("HelloWorld.java")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("sourcefiles" in result[0][1])
+ self.assertTrue('flags' in result[0][1])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/maketests.py b/xnt/tests/maketests.py
index c022b12..a3d17d0 100644
--- a/xnt/tests/maketests.py
+++ b/xnt/tests/maketests.py
@@ -17,127 +17,162 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from xnt.build.make import __make__
+from xnt.build.make import __ant__
+from xnt.build.make import __nant__
+from types import FunctionType
import unittest
-import xnt
-import xnt.build.make
-import xnt.tests
-@unittest.skipUnless(xnt.in_path("ant") or xnt.in_path("ant.exe"),
- "Apache ant is not in your path")
class AntTests(unittest.TestCase):
"""Test Case for Ant Build"""
def setUp(self):
"""Test Setup"""
- xnt.tests.set_up()
- with open("temp/build.xml", "w") as build:
- build.write("<?xml version=\"1.0\" ?>\n")
- build.write("<project name=\"test\" default=\"test\">\n")
- build.write("<target name=\"test\">\n")
- build.write("<echo>${test_var}</echo>\n")
- build.write("</target>\n")
- build.write("</project>\n")
+ pass
def tearDown(self):
"""Test Teardown"""
- xnt.tests.tear_down()
+ pass
def test_default_build(self):
"""Test the default target of ant"""
- result = xnt.build.make.ant(target="test", path="temp")
- self.assertEqual(result, 0)
-
- def test_passing_flags(self):
- """Test ant with passing flags"""
- result = xnt.build.make.ant(target="test",
- path="temp",
- flags=["-verbose"])
- self.assertEqual(result, 0)
-
- def test_pass_var(self):
+ result = __ant__(target="test")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('target' in result[0][1])
+ self.assertEqual('test', result[0][1]['target'])
+
+ def test_ant_when_given_path(self):
+ """Test ant when passing given a path"""
+ result = __ant__(target="test", path="some/other/path/build.xml")
+ self.assertIsNotNone(result)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('some/other/path/build.xml', result[0][1]['path'])
+
+ def test_ant_when_given_flags(self):
+ """Test passing flags to ant"""
+ result = __ant__(target='test', flags=['-verbose'])
+ self.assertIsNotNone(result)
+ self.assertTrue('flags' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['flags']))
+ self.assertEqual('-verbose', result[0][1]['flags'][0])
+
+ def test_ant_when_given_vars(self):
"""Test passing variables to ant"""
- result = xnt.build.make.ant(target="test", path="temp",
- pkeys=["test_var"],
- pvalues=["testing"])
- self.assertEqual(result, 0)
+ result = __ant__(target="test",
+ pkeys=["test_var"],
+ pvalues=["testing"])
+ self.assertIsNotNone(result)
+ self.assertTrue('pkeys' in result[0][1])
+ self.assertTrue('pvalues' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['pkeys']))
+ self.assertEqual(1, len(result[0][1]['pvalues']))
+ self.assertEqual('test_var', result[0][1]['pkeys'][0])
+ self.assertEqual('testing', result[0][1]['pvalues'][0])
-@unittest.skipUnless(xnt.in_path("make"), "make is not in your path")
class MakeTests(unittest.TestCase):
"""GNU Make Tests"""
def setUp(self):
"""Test Setup"""
- xnt.tests.set_up()
- with open("temp/Makefile", "w") as makefile:
- makefile.write("build:\n")
- makefile.write("\techo 'testing'\n")
+ pass
def tearDown(self):
"""Test Teardown"""
- xnt.tests.tear_down()
+ pass
def test_default_make(self):
"""Test Default make"""
- result = xnt.build.make.make(target="build", path="temp")
- self.assertEqual(result, 0)
+ result = __make__(target="build")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(len(result[0]), 2)
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('target' in result[0][1])
+ self.assertEqual('build', result[0][1]['target'])
+
+ def test_make_with_path(self):
+ '''Test make given a path'''
+ result = __make__(target="build", path="some/other/path/Makefile")
+ self.assertIsNotNone(result)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('some/other/path/Makefile', result[0][1]['path'])
def test_passing_vars(self):
"""Test Parameter Passing with Make"""
- result = xnt.build.make.make(target="build",
- path="temp",
- pkeys=["test_var"],
- pvalues=["testing"])
- self.assertEqual(result, 0)
+ result = __make__(target='build',
+ pkeys=["test_var"],
+ pvalues=["testing"])
+ self.assertIsNotNone(result)
+ self.assertTrue('pkeys' in result[0][1])
+ self.assertTrue('pvalues' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['pkeys']))
+ self.assertEqual(1, len(result[0][1]['pvalues']))
+ self.assertEqual('test_var', result[0][1]['pkeys'][0])
+ self.assertEqual('testing', result[0][1]['pvalues'][0])
def test_passing_flags(self):
"""Test Flag Passing with Make"""
- result = xnt.build.make.make(target="build",
- path="temp",
- flags=["-B"])
- self.assertEqual(result, 0)
+ result = __make__(target='build', flags=['-B'])
+ self.assertIsNotNone(result)
+ self.assertTrue('flags' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['flags']))
+ self.assertEqual('-B', result[0][1]['flags'][0])
-@unittest.skipUnless(xnt.in_path("nant") or xnt.in_path("nant.exe"),
- "nant is not in your path")
class NAntTests(unittest.TestCase):
""".NET Ant Tests"""
def setUp(self):
"""Test Setup"""
- xnt.tests.set_up()
- with open("temp/default.build", "w") as default_build:
- default_build.write("<?xml version=\"1.0\"?>\n")
- default_build.write("<project name=\"test\">\n")
- default_build.write("<target name=\"test\">\n")
- default_build.write("<if \n")
- default_build.write("test=\"${property::exists('test_var')}\">\n")
- default_build.write("<echo>${test_var}</echo>\n")
- default_build.write("</if>\n")
- default_build.write("</target>\n")
- default_build.write("</project>")
+ pass
def tearDown(self):
"""Test Teardown"""
- xnt.tests.tear_down()
+ pass
- def test_default_nant(self):
+ def test_nant_with_target(self):
"""Test Deault nant"""
- result = xnt.build.make.nant(target="test", path="temp")
- self.assertEqual(result, 0)
-
- def test_parameters_passing(self):
- """Test Parameter Passing with NAnt"""
- result = xnt.build.make.nant(target="test",
- path="temp",
- pkeys=["test_var"],
- pvalues=["testing"])
- self.assertEqual(result, 0)
-
- def test_flag_passing(self):
- """Test Flag Passing with NAnt"""
- result = xnt.build.make.nant(target="test",
- path="temp",
- flags=["-v"])
- self.assertEqual(result, 0)
-
+ result = __nant__(target='test')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('target' in result[0][1])
+ self.assertEqual('test', result[0][1]['target'])
+
+ def test_nant_with_path(self):
+ '''Test NAnt with path'''
+ result = __nant__(target='test', path='some/other/path/build.xml')
+ self.assertIsNotNone(result)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('some/other/path/build.xml', result[0][1]['path'])
+
+ def test_nant_with_parameters(self):
+ '''Test NAnt with parameters'''
+ result = __nant__(target="test",
+ pkeys=["test_var"],
+ pvalues=["testing"])
+ self.assertIsNotNone(result)
+ self.assertTrue('pkeys' in result[0][1])
+ self.assertTrue('pvalues' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['pkeys']))
+ self.assertEqual(1, len(result[0][1]['pvalues']))
+ self.assertEqual('test_var', result[0][1]['pkeys'][0])
+
+ def test_nant_with_flags(self):
+ '''Test NAnt with flags'''
+ result = __nant__(target="test", flags=["-v"])
+ self.assertIsNotNone(result)
+ self.assertTrue('flags' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['flags']))
+ self.assertEqual('-v', result[0][1]['flags'][0])
if __name__ == '__main__':
unittest.main()
diff --git a/xnt/tests/taskcompressiontests.py b/xnt/tests/taskcompressiontests.py
index 3640287..0d36b49 100644
--- a/xnt/tests/taskcompressiontests.py
+++ b/xnt/tests/taskcompressiontests.py
@@ -17,9 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __zip__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
@@ -27,16 +26,25 @@ class TaskCompressionTests(unittest.TestCase):
"""Test Cases for Compression"""
def setUp(self):
"""Test Case Setup"""
- xnt.tests.set_up()
+ pass
def tearDown(self):
"""Test Case Teardown"""
- xnt.tests.tear_down()
+ pass
def test_zip(self):
"""Test zip method"""
- xnt.tasks.create_zip("temp/testfolder1", "temp/myzip.zip")
- self.assertTrue(os.path.exists("temp/myzip.zip"))
+ result = __zip__(directory="testfolder", zipfilename="myzip.zip")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue("directory" in result[0][1])
+ self.assertEqual("testfolder", result[0][1]['directory'])
+ self.assertTrue("zipfile" in result[0][1])
+ self.assertEqual("myzip.zip", result[0][1]['zipfile'])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/taskcopytests.py b/xnt/tests/taskcopytests.py
index 8955f1e..abe0eea 100644
--- a/xnt/tests/taskcopytests.py
+++ b/xnt/tests/taskcopytests.py
@@ -17,9 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __copy__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
@@ -27,35 +26,34 @@ class TaskCopyTests(unittest.TestCase):
"""Test Case for Copy Tasks Method"""
def setUp(self):
"""Test Setup"""
- xnt.tests.set_up()
+ pass
def tearDown(self):
"""Test Teardown"""
- xnt.tests.tear_down()
+ pass
def test_cp(self):
"""Test default use of cp"""
- xnt.tasks.cp("temp/testfolder1", "temp/testfolder2")
- self.assertTrue(os.path.exists("temp/testfolder2"))
- self.assertTrue(os.path.exists("temp/testfolder1"))
- xnt.tasks.cp("temp/testfile1", "temp/testfile5")
- self.assertTrue(os.path.exists("temp/testfile5"))
- self.assertTrue(os.path.exists("temp/testfile1"))
- with open("temp/testfile5", "r") as testfile:
- self.assertEqual("this is a test file", testfile.read())
+ result = __copy__(srcdir="test0", dstdir="test1")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('srcdir' in result[0][1])
+ self.assertEqual('test0', result[0][1]['srcdir'])
+ self.assertTrue('dstdir' in result[0][1])
+ self.assertEqual('test1', result[0][1]['dstdir'])
def test_cp_filelist(self):
"""Test filelist copy"""
- xnt.tasks.cp(dst="temp/testfolder2",
- files=["temp/testfile1",
- "temp/testfile2",
- "temp/testfile3"])
- self.assertTrue(os.path.exists("temp/testfile1"))
- self.assertTrue(os.path.exists("temp/testfile2"))
- self.assertTrue(os.path.exists("temp/testfile3"))
- with open("temp/testfile2", "r") as testfile:
- self.assertEqual("this is a test file", testfile.read())
-
+ result = __copy__(files=['file1', 'file2', 'file3'], dstdir='test1')
+ self.assertIsNotNone(result)
+ self.assertTrue('files' in result[0][1])
+ self.assertEqual(3, len(result[0][1]['files']))
+ self.assertTrue('dstdir' in result[0][1])
+ self.assertEqual('test1', result[0][1]['dstdir'])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/taskmisctests.py b/xnt/tests/taskmisctests.py
index c20e24b..4864e41 100644
--- a/xnt/tests/taskmisctests.py
+++ b/xnt/tests/taskmisctests.py
@@ -17,9 +17,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __echo__
+from xnt.tasks import __call__
+from xnt.tasks import __setup__
+from xnt.tasks import __xntcall__
+from xnt.tasks import __which__
+from xnt.tasks import __in_path__
+from xnt.tasks import __log__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
@@ -27,60 +32,121 @@ class TaskMiscTests(unittest.TestCase):
"""Test Misc Tasks"""
def setUp(self):
"""Test Case Setup"""
- xnt.tests.set_up()
+ pass
def tearDown(self):
"""Test Case Teardown"""
- xnt.tests.tear_down()
+ pass
def test_echo(self):
"""Test Echo Task"""
- xnt.tasks.echo("this is my cool echo", "temp/mynewcoolfile")
- self.assertTrue(os.path.exists("temp/mynewcoolfile"))
- with open("temp/mynewcoolfile", "r") as temp_file:
- self.assertEqual("this is my cool echo", temp_file.read())
+ result = __echo__(msg="foobar", tofile="mytestfile")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('msg' in result[0][1])
+ self.assertEqual('foobar', result[0][1]['msg'])
+ self.assertTrue('tofile' in result[0][1])
+ self.assertEqual('mytestfile', result[0][1]['tofile'])
+
+ def test_log(self):
+ """Test log function"""
+ result = __log__(msg="testing the logging", lvl=40)
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('msg' in result[0][1])
+ self.assertEqual('testing the logging', result[0][1]['msg'])
+ self.assertTrue('lvl' in result[0][1])
+ self.assertEqual(40, result[0][1]['lvl'])
def test_call(self):
"""Test Call, testing redirection"""
- out = open("temp/testout", "w")
- err = open("temp/testerr", "w")
- xnt.tasks.call(["python",
- os.path.abspath("temp/test.py"),
- "42", "hello"],
- out,
- err)
- out.close()
- err.close()
- self.assertTrue(os.path.exists("temp/testout"))
- self.assertTrue(os.path.exists("temp/testerr"))
- with open("temp/testout", "r") as std_out:
- self.assertEqual("42", std_out.read())
- with open("temp/testerr", "r") as std_err:
- self.assertEqual("hello", std_err.read())
-
- def test_which_finds_python(self):
- """Test which can find python"""
- if os.name == 'posix':
- path = xnt.tasks.which("python")
- else:
- path = xnt.tasks.which("python.exe")
- self.assertIsNotNone(path)
-
- def test_which_dne(self):
- """Test which cannot find not existent program"""
- path = xnt.tasks.which("arst")
- self.assertIsNone(path)
-
- def test_python_in_path(self):
+ result = __call__(['echo', 'blah'])
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('command' in result[0][1])
+ self.assertEqual(2, len(result[0][1]['command']))
+ self.assertEqual('echo', result[0][1]['command'][0])
+ self.assertEqual('blah', result[0][1]['command'][1])
+ self.assertTrue('stdout' in result[0][1])
+ self.assertIsNone(result[0][1]['stdout'])
+ self.assertTrue('stderr' in result[0][1])
+ self.assertIsNone(result[0][1]['stderr'])
+
+ def test_setup_with_single_command(self):
+ '''Test setup function with a single command'''
+ result = __setup__(command='test')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('commands' in result[0][1])
+ self.assertEqual(1, len(result[0][1]['commands']))
+
+ def test_setup_with_commands(self):
+ '''Test setup function commands'''
+ result = __setup__(commands=['build', 'test'])
+ self.assertIsNotNone(result)
+ self.assertTrue('commands' in result[0][1])
+ self.assertEqual(2, len(result[0][1]['commands']))
+
+ def test_setup_with_directory(self):
+ '''Test setup function with directory'''
+ result = __setup__(command='test', directory='test/')
+ self.assertIsNotNone(result)
+ self.assertTrue('directory' in result[0][1])
+ self.assertEqual('test/', result[0][1]['directory'])
+
+ def test_xntcall(self):
+ """Test xntcall"""
+ result = __xntcall__(buildfile='test/build.py')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('buildfile' in result[0][1])
+ self.assertEqual('test/build.py', result[0][1]['buildfile'])
+ self.assertTrue('targets' in result[0][1])
+ self.assertTrue('props' in result[0][1])
+
+ def test_which(self):
+ """Test which"""
+ result = __which__('python')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('program' in result[0][1])
+ self.assertEqual('python', result[0][1]['program'])
+
+ def test_in_path(self):
"""Test in_path task"""
- if os.name == 'posix':
- self.assertTrue(xnt.tasks.in_path("python"))
- else:
- self.assertTrue(xnt.tasks.in_path("python.exe"))
-
- def test_arst_not_in_path(self):
- """Test not in_path"""
- self.assertFalse(xnt.tasks.in_path("arst"))
+ result = __in_path__('python')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('program' in result[0][1])
+ self.assertEqual('python', result[0][1]['program'])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/taskmkdirtests.py b/xnt/tests/taskmkdirtests.py
index fdfdddc..3674611 100644
--- a/xnt/tests/taskmkdirtests.py
+++ b/xnt/tests/taskmkdirtests.py
@@ -17,29 +17,26 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __mkdir__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
class TaskMkdirTests(unittest.TestCase):
"""Test Cases for Mkdir"""
- def setUp(self):
- """Test Case Setup"""
- xnt.tests.set_up()
-
- def tearDown(self):
- """Test Case Teardown"""
- xnt.tests.tear_down()
-
def test_mkdir(self):
"""Test mkdir method"""
- xnt.tasks.mkdir("temp/mynewtestfolder")
- self.assertTrue(os.path.exists("temp/mynewtestfolder"))
- self.assertTrue(os.path.exists("temp/testfolder1"))
- xnt.tasks.mkdir("temp/testfolder1")
-
+ result = __mkdir__("my/new/directory")
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('directory' in result[0][1])
+ self.assertEqual('my/new/directory', result[0][1]['directory'])
+ self.assertTrue('mode' in result[0][1])
+ self.assertEqual(0o755, result[0][1]['mode'])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/taskmovetests.py b/xnt/tests/taskmovetests.py
index 3aed96c..7daaf9e 100644
--- a/xnt/tests/taskmovetests.py
+++ b/xnt/tests/taskmovetests.py
@@ -17,32 +17,26 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __move__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
class TaskMoveTests(unittest.TestCase):
"""Test cases for move"""
- def setUp(self):
- """Test Case Setup"""
- xnt.tests.set_up()
-
- def tearDown(self):
- """Test Case Teardown"""
- xnt.tests.tear_down()
-
- def test_mv(self):
+ def test_move(self):
"""Test Moving files and folders"""
- xnt.tasks.mv("temp/testfolder1", "temp/testfolder2")
- self.assertTrue(os.path.exists("temp/testfolder2"))
- self.assertFalse(os.path.exists("temp/testfolder1"))
- xnt.tasks.mv("temp/testfile1", "temp/testfile5")
- self.assertTrue(os.path.exists("temp/testfile5"))
- self.assertFalse(os.path.exists("temp/testfile1"))
- with open("temp/testfile5", "r") as testfile:
- self.assertEqual("this is a test file", testfile.read())
+ result = __move__('test0', 'test1')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('src' in result[0][1])
+ self.assertTrue('dst' in result[0][1])
+ self.assertEqual('test0', result[0][1]['src'])
+ self.assertEqual('test1', result[0][1]['dst'])
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/taskremovetests.py b/xnt/tests/taskremovetests.py
index cb0582b..6768852 100644
--- a/xnt/tests/taskremovetests.py
+++ b/xnt/tests/taskremovetests.py
@@ -17,28 +17,24 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-import xnt.tasks
-import xnt.tests
+from xnt.tasks import __remove__
+from types import FunctionType
import unittest
#pylint: disable-msg=C0103
class TaskRemoveTests(unittest.TestCase):
"""Test Case for Remove"""
- def setUp(self):
- """Test case Setup"""
- xnt.tests.set_up()
-
- def tearDown(self):
- """Test case Teardown"""
- xnt.tests.tear_down()
-
- def test_rm(self):
- """Remove cases"""
- xnt.tasks.rm("temp/testfolder1")
- self.assertFalse(os.path.exists("temp/testfolder1"))
- xnt.tasks.rm("temp/testfile1")
-
+ def test_remove(self):
+ """Test removing files and folders"""
+ result = __remove__('test0', 'test1', '*swp')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('fileset' in result[0][1])
+ self.assertEqual(3, len(result[0][1]['fileset']))
if __name__ == "__main__":
unittest.main()
diff --git a/xnt/tests/textests.py b/xnt/tests/textests.py
index d555915..6c14c96 100644
--- a/xnt/tests/textests.py
+++ b/xnt/tests/textests.py
@@ -17,95 +17,62 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
+from xnt.build.tex import __pdflatex__
+from xnt.build.tex import __clean__
+from types import FunctionType
import unittest
-import xnt
-import xnt.build.tex
-import xnt.tests
-@unittest.skipUnless(xnt.in_path("pdflatex") or xnt.in_path("pdflatex.exe"),
- "pdflatex is not in your path")
class TexTests(unittest.TestCase):
"""Test Case for TeX Document Building"""
- def setUp(self):
- """Test Setup"""
- xnt.tests.set_up()
- with open("temp/test.tex", "w") as test_tex:
- test_tex.write('\\documentclass{article}\n')
- test_tex.write('\\usepackage{glossaries}\n')
- test_tex.write('\\author{python test}\n')
- test_tex.write('\\date{\\today}\n')
- test_tex.write('\\makeglossaries\n')
- test_tex.write('\\begin{document}\n')
- test_tex.write('\\nocite{*}\n')
- test_tex.write('\\newglossaryentry{test}\n')
- test_tex.write('{name={test},description={this}}\n')
- test_tex.write('This is a \\gls{test} \\LaTeX document.\n')
- test_tex.write('\\printglossaries\n')
- test_tex.write('\\begin{thebibliography}{1}\n')
- test_tex.write('\\bibitem{test_bib_item}\n')
- test_tex.write('Leslie Lamport,\n')
- test_tex.write('\\emph{\\LaTeX: A Document Preperation System}.\n')
- test_tex.write('Addison Wesley, Massachusetts,\n')
- test_tex.write('2nd Edition,\n')
- test_tex.write('1994.\n')
- test_tex.write('\\end{thebibliography}\n')
- test_tex.write('\\bibliography{1}\n')
- test_tex.write('\\end{document}\n')
-
- def tearDown(self):
- """Test Teardown"""
- xnt.tests.tear_down()
-
def test_pdflatex_build(self):
"""Test default pdflatex build"""
- xnt.build.tex.pdflatex("test.tex",
- path="temp")
- self.assertTrue(os.path.exists("temp/test.pdf"))
- self.assertTrue(os.path.exists("temp/test.aux"))
- self.assertTrue(os.path.exists("temp/test.log"))
+ result = __pdflatex__('test.tex', directory='tex')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('texdocument' in result[0][1])
+ self.assertEqual('test.tex', result[0][1]['texdocument'])
+ self.assertTrue('directory' in result[0][1])
+ self.assertTrue('bibtex' in result[0][1])
+ self.assertFalse(result[0][1]['bibtex'])
+ self.assertTrue('makeglossary' in result[0][1])
+ self.assertFalse(result[0][1]['makeglossary'])
def test_pdflatex_with_bibtex(self):
"""Test pdflatex with bibtex"""
- xnt.build.tex.pdflatex("test.tex",
- path="temp",
- bibtex=True)
- self.assertTrue(os.path.exists("temp/test.pdf"))
- self.assertTrue(os.path.exists("temp/test.bbl"))
- self.assertTrue(os.path.exists("temp/test.blg"))
+ result = __pdflatex__('test.tex', bibtex=True)
+ self.assertIsNotNone(result)
+ self.assertTrue(result[0][1]['bibtex'])
def test_pdflatex_with_glossary(self):
"""Test pdflatex with glossary output"""
- xnt.build.tex.pdflatex("test.tex",
- path="temp",
- makeglossary=True)
- self.assertTrue(os.path.exists("temp/test.pdf"))
- self.assertTrue(os.path.exists("temp/test.glo"))
- self.assertTrue(os.path.exists("temp/test.glg"))
- self.assertTrue(os.path.exists("temp/test.gls"))
+ result = __pdflatex__("test.tex", makeglossary=True)
+ self.assertIsNotNone(result)
+ self.assertTrue(result[0][1]['makeglossary'])
def test_tex_clean(self):
"""Test the default clean method removes generated files except pdf"""
- xnt.build.tex.pdflatex("test.tex",
- path="temp",
- bibtex=True,
- makeglossary=True)
- xnt.build.tex.clean(path="temp")
- self.assertTrue(os.path.exists("temp/test.pdf"))
- self.assertFalse(os.path.exists("temp/test.aux"))
- self.assertFalse(os.path.exists("temp/test.log"))
+ result = __clean__(directory='tex')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('directory' in result[0][1])
+ self.assertEqual('tex', result[0][1]['directory'])
+ self.assertTrue('remove_pdf' in result[0][1])
+ self.assertFalse(result[0][1]['remove_pdf'])
def test_tex_clean_include_pdf(self):
"""Test Clean; including PDF"""
- xnt.build.tex.pdflatex("test.tex",
- path="temp",
- bibtex=True,
- makeglossary=True)
- xnt.build.tex.clean(path="temp", remove_pdf=True)
- self.assertFalse(os.path.exists("temp/test.pdf"))
- self.assertFalse(os.path.exists("temp/test.aux"))
- self.assertFalse(os.path.exists("temp/test.log"))
+ result = __clean__(directory='tex', remove_pdf=True)
+ self.assertIsNotNone(result)
+ self.assertTrue(result[0][1]['remove_pdf'])
if __name__ == '__main__':
unittest.main()
diff --git a/xnt/tests/vcscvstests.py b/xnt/tests/vcscvstests.py
new file mode 100644
index 0000000..ad16d15
--- /dev/null
+++ b/xnt/tests/vcscvstests.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+"""Test `xnt.vcs.cvs`"""
+
+# Xnt -- A Wrapper Build Tool
+# Copyright (C) 2014 Kenny Ballou
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from xnt.vcs.cvs import __cvsco__
+from xnt.vcs.cvs import __cvsupdate__
+from types import FunctionType
+import unittest
+
+class VcsCvsTests(unittest.TestCase):
+ '''VCS CVS Tests'''
+
+ def test_cvsco(self):
+ '''Test CVS checkout'''
+ result = __cvsco__('mytestmodule')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('module' in result[0][1])
+ self.assertEqual('mytestmodule', result[0][1]['module'])
+ self.assertTrue('rev' in result[0][1])
+ self.assertIsNone(result[0][1]['rev'])
+ self.assertTrue('dest' in result[0][1])
+ self.assertIsNone(result[0][1]['dest'])
+
+ def test_cvsupdate(self):
+ '''Test CVS Update'''
+ result = __cvsupdate__('./')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('./', result[0][1]['path'])
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/xnt/tests/vcsgittests.py b/xnt/tests/vcsgittests.py
new file mode 100644
index 0000000..5fce129
--- /dev/null
+++ b/xnt/tests/vcsgittests.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+"""Test `xnt.vcs.git`"""
+
+# Xnt -- A Wrapper Build Tool
+# Copyright (C) 2014 Kenny Ballou
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from xnt.vcs.git import __gitclone__
+from xnt.vcs.git import __gitpull__
+from types import FunctionType
+import unittest
+
+class VcsGitTests(unittest.TestCase):
+ '''Test VCS GIT'''
+
+ def test_gitclone(self):
+ '''Test GIT Clone'''
+ url = 'git://github.com/devnulltao/Xnt.git'
+ result = __gitclone__(url)
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('url' in result[0][1])
+ self.assertTrue('dest' in result[0][1])
+ self.assertTrue('branch' in result[0][1])
+ self.assertEqual(url, result[0][1]['url'])
+ self.assertIsNone(result[0][1]['dest'])
+ self.assertIsNone(result[0][1]['branch'])
+
+ def test_gitpull(self):
+ '''Test GIT Pull'''
+ result = __gitpull__('./', remote='upstream', branch='develop')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('./', result[0][1]['path'])
+ self.assertTrue('remote' in result[0][1])
+ self.assertEqual('upstream', result[0][1]['remote'])
+ self.assertTrue('branch' in result[0][1])
+ self.assertEqual('develop', result[0][1]['branch'])
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/xnt/tests/vcshgtests.py b/xnt/tests/vcshgtests.py
new file mode 100644
index 0000000..8008752
--- /dev/null
+++ b/xnt/tests/vcshgtests.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+"""Test `xnt.vcs.hg`"""
+
+# Xnt -- A Wrapper Build Tool
+# Copyright (C) 2014 Kenny Ballou
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from xnt.vcs.hg import __hgclone__
+from xnt.vcs.hg import __hgfetch__
+from types import FunctionType
+import unittest
+
+class VcsHgTests(unittest.TestCase):
+ '''VCS Mercurial Tests'''
+
+ def test_hgclone(self):
+ '''Test hg clone'''
+ url = 'https://vim.googlecode.com/hg'
+ result = __hgclone__(url, dest='vim')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('url' in result[0][1])
+ self.assertEqual(url, result[0][1]['url'])
+ self.assertTrue('dest' in result[0][1])
+ self.assertEqual('vim', result[0][1]['dest'])
+ self.assertTrue('rev' in result[0][1])
+ self.assertIsNone(result[0][1]['rev'])
+ self.assertTrue('branch' in result[0][1])
+ self.assertIsNone(result[0][1]['branch'])
+
+ def test_hgfetch(self):
+ '''Test hg fetch'''
+ result = __hgfetch__('./', source='upstream')
+ self.assertIsNotNone(result)
+ self.assertIsInstance(result, tuple)
+ self.assertIsInstance(result[0], tuple)
+ self.assertEqual(2, len(result[0]))
+ self.assertIsInstance(result[0][0], FunctionType)
+ self.assertIsInstance(result[0][1], dict)
+ self.assertTrue('path' in result[0][1])
+ self.assertEqual('./', result[0][1]['path'])
+ self.assertTrue('source' in result[0][1])
+ self.assertTrue('upstream', result[0][1]['source'])
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/xnt/vcs/cvs.py b/xnt/vcs/cvs.py
index ccdabcd..8a0902b 100644
--- a/xnt/vcs/cvs.py
+++ b/xnt/vcs/cvs.py
@@ -18,35 +18,43 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
-import subprocess
-from xnt.tasks import which
+from xnt.tasks import __apply__
+from xnt.tasks import __call__
+from xnt.tasks import __which__
-def cvsco(module, rev="", dest=""):
+def __cvsco__(module, rev=None, dest=None):
"""Run CVS Checkout
:param module: CVS Module name to checkout
:param rev: Revision to checkout
:param dest: Destination directory or name of checked out module
"""
- assert which("cvs")
- cmd = ["cvs", "co", "-P"]
- if rev:
- cmd.append("-r")
- cmd.append(rev)
- if dest:
- cmd.append("-d")
- cmd.append(dest)
- cmd.append(module)
- subprocess.call(cmd)
-
-def cvsupdate(path):
+ def __execute__(**kwargs):
+ '''Perform CVS Checkout'''
+ assert __apply__(__which__("cvs"))
+ cmd = ["cvs", "co", "-P"]
+ if kwargs['rev']:
+ cmd.append("-r")
+ cmd.append(kwargs['rev'])
+ if kwargs['dest']:
+ cmd.append("-d")
+ cmd.append(kwargs['dest'])
+ cmd.append(kwargs['module'])
+ return __apply__(__call__(cmd))
+ args = {'module': module, 'rev': rev, 'dest': dest,}
+ return ((__execute__, args),)
+
+def __cvsupdate__(path):
"""Run CVS Update
:param path: Directory path to module to update
"""
- assert which("cvs")
- cwd = os.path.abspath(os.getcwd())
- os.chdir(path)
- cmd = ["cvs", "update"]
- subprocess.call(cmd)
- os.chdir(cwd)
+ def __execute__(**kwargs):
+ '''Perform CVS Update'''
+ assert __apply__(__which__("cvs"))
+ cwd = os.path.abspath(os.getcwd())
+ os.chdir(kwargs['path'])
+ cmd = ["cvs", "update"]
+ __apply__(__call__(cmd))
+ os.chdir(cwd)
+ return ((__execute__, {'path': path,}),)
diff --git a/xnt/vcs/git.py b/xnt/vcs/git.py
index 222ebda..34d9841 100644
--- a/xnt/vcs/git.py
+++ b/xnt/vcs/git.py
@@ -20,29 +20,43 @@
import os
import xnt.tasks
import xnt.vcs
+from xnt.tasks import __apply__, __call__, __which__
-def gitclone(url, dest=None, branch=None):
+def __gitclone__(url, dest=None, branch=None):
"""Clone a repository
:param url: URI of the repository to clone
:param dest: Destination directory or name of the cloned repository
:param branch: Branch to clone
"""
- assert xnt.tasks.which("git")
- command = ["git", "clone"]
- command = xnt.vcs.clone_options(command, url, branch, dest)
- xnt.tasks.call(command)
-
-def gitpull(path, source="origin", branch="master"):
+ def __execute__(**kwargs):
+ '''Perform git clone'''
+ assert __apply__(__which__("git"))
+ command = ["git", "clone"]
+ command = xnt.vcs.clone_options(
+ command, kwargs['url'], kwargs['branch'], kwargs['dest'])
+ return __apply__(__call__(command))
+ args = {'url': url, 'dest': dest, 'branch': branch,}
+ return ((__execute__, args),)
+
+def __gitpull__(path, remote=None, branch=None):
"""Pull/Update a cloned repository
:param path: Directory of the repository for which to pull and update
- :param source: Repository's upstream source
+ :param remote: Repository's upstream source
:param branch: Repository's upstream branch to pull from
"""
- assert xnt.tasks.which("git")
- cwd = os.getcwd()
- os.chdir(path)
- command = ["git", "pull", source, branch]
- xnt.tasks.call(command)
- os.chdir(cwd)
+ def __execute__(**kwargs):
+ '''Perform git pull'''
+ assert __apply__(__which__("git"))
+ cwd = os.getcwd()
+ os.chdir(kwargs['path'])
+ command = ["git", "pull", kwargs['remote'], kwargs['branch']]
+ __apply__(__call__(command))
+ os.chdir(cwd)
+ args = {
+ 'path': path,
+ 'remote': remote if remote else 'origin',
+ 'branch': branch if branch else 'master',
+ }
+ return ((__execute__, args),)
diff --git a/xnt/vcs/hg.py b/xnt/vcs/hg.py
index 6956d57..3633fe3 100644
--- a/xnt/vcs/hg.py
+++ b/xnt/vcs/hg.py
@@ -20,8 +20,9 @@
import os
import xnt.tasks
import xnt.vcs
+from xnt.tasks import __apply__, __call__, __which__
-def hgclone(url, dest=None, rev=None, branch=None):
+def __hgclone__(url, dest=None, rev=None, branch=None):
"""Clone a Mercurial Repository
:param url: URI of repository to clone
@@ -29,23 +30,32 @@ def hgclone(url, dest=None, rev=None, branch=None):
:param rev: Revision to clone
:param branch: Branch to clone
"""
- assert xnt.tasks.which("hg")
- command = ["hg", "clone"]
- if rev:
- command.append("--rev")
- command.append(rev)
- command = xnt.vcs.clone_options(command, url, branch, dest)
- xnt.tasks.call(command)
-
-def hgfetch(path, source='default'):
+ def __execute__(**kwargs):
+ '''Perform hg clone'''
+ assert __apply__(__which__("hg"))
+ command = ["hg", "clone"]
+ if kwargs['rev']:
+ command.append("--rev")
+ command.append(kwargs['rev'])
+ command = xnt.vcs.clone_options(
+ command, kwargs['url'], kwargs['branch'], kwargs['dest'])
+ return __apply__(__call__(command))
+ args = {'url': url, 'dest': dest, 'rev': rev, 'branch': branch,}
+ return ((__execute__, args),)
+
+def __hgfetch__(path, source=None):
"""Pull and Update an already cloned Mercurial Repository
:param path: Directory to the repository for which to pull changes
:param source: Repository's upstream source
"""
- assert xnt.tasks.which("hg")
- command = ["hg", "pull", "-u", source]
- cwd = os.getcwd()
- os.chdir(path)
- xnt.tasks.call(command)
- os.chdir(cwd)
+ def __execute__(**kwargs):
+ '''Perform hg pull'''
+ assert __apply__(__which__("hg"))
+ command = ["hg", "pull", "-u", kwargs['source']]
+ cwd = os.getcwd()
+ os.chdir(kwargs['path'])
+ __apply__(__call__(command))
+ os.chdir(cwd)
+ args = {'path': path, 'source': source if source else 'default',}
+ return ((__execute__, args),)