summaryrefslogtreecommitdiff
path: root/xnt/tasks/core_tasks.py
blob: 9a5e1e8eb9b4b8557cb461e6930c6c56eae3ed43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
#!/usr/bin/env python
"""Common Tasks Module

Defines a set of operations that are common enough but also are tedious to
define
"""

#   Xnt -- A Wrapper Build Tool
#   Copyright (C) 2013  Kenny Ballou

#   This program is free software: you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.

#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.

#   You should have received a copy of the GNU General Public License
#   along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import subprocess
import shutil
import zipfile
import contextlib
import glob
import logging
from xnt.status_codes import ERROR, SUCCESS, UNKNOWN_ERROR

LOGGER = logging.getLogger(__name__)

#File associated tasks
def __expandpath__(path_pattern):
    """Return a glob expansion generator of *path_pattern*

    :param path_pattern: pattern to expand
    :rtype: generator of strings
    :return: List of paths and/ or files
    """
    def __execute__(**kwargs):
        return glob.iglob(kwargs['path_pattern'])
    return ((__execute__, {'path_pattern': path_pattern}),)

def __copy__(src=None, dstdir=None, files=None):
    """Copy `src` to `dstdir` or copy `files` to `dstdir`

    Copy a file or folder to a different file/folder
    If no `srcdir` file is specified, will attempt to copy `files` to `dstdir`

    *Notice*, elements of `files` will not be expanded before copying.

    :param src: source directory or file
    :param dstdir: destination file or folder (in the case of `files`)
    :param files: list of files (strings) to copy to `src`
    """
    def __execute__(**kwargs):
        """Perform copy"""
        assert 'dstdir' in kwargs
        if 'src' in kwargs:
            if os.path.isfile(kwargs['src']):
                shutil.copyfile(kwargs['src'], kwargs['dstdir'])
            else:
                shutil.copytree(kwargs['src'], kwargs['dstdir'])
        elif 'files' in kwargs:
            for srcfile in kwargs['files']:
                shutil.copy(srcfile, kwargs['dstdir'])
    return (
        (__execute__, {'src': src, 'dstdir': dstdir, 'files': files,}),
    )

def __move__(src, dst):
    """Move `src` to `dst`

    Move (copy and remove) the source file or directory (*src*) to the
    destination file or directory (*dst*)

    :param src: Source file or folder to move
    :param dst: Destination file or folder
    """
    def __execute__(**kwargs):
        '''Perform move'''
        LOGGER.info("Moving %s to %s", kwargs['src'], kwargs['dst'])
        shutil.move(kwargs['src'], kwargs['dst'])
    args = {'src': src, 'dst': dst,}
    return ((__execute__, args),)

def __mkdir__(directory, mode=0o755):
    """Make a directory with mode

    Create a directory specified by *dir* with default mode (where supported)
    or with the specified *mode*

    *Notice*, if the directory already exists, *mkdir* will log a warning and
    return

    :param directory: New directory to create
    :param mode: Mode to create the directory (where supported). Default: `777`
    """
    def __execute__(**kwargs):
        '''Perform directory creation'''
        if os.path.exists(directory):
            LOGGER.warning(
                "Given directory (%s) already exists",
                kwargs['directory'])
            return
        LOGGER.info(
            "Making directory %s with mode %o",
            kwargs['directory'],
            kwargs['mode'])
        try:
            os.mkdir(kwargs['directory'], kwargs['mode'])
        except IOError as io_error:
            LOGGER.error(io_error)
        except:
            raise
    return ((__execute__, {'directory': directory, 'mode': mode,}),)

def __remove__(*fileset):
    """Remove a set of files

    Attempt to remove all the directories given by the fileset. Before *rm*
    tries to delete each element of *fileset*, it attempts to expand it first
    using glob expansion (:func:`xnt.tasks.expandpath`), thus allowing the
    passing of glob elements

    :param fileset: List of files to remove
    """
    def __execute__(**kwargs):
        '''Perform file/ folder removal'''
        try:
            for glob_set in kwargs['fileset']:
                for file_to_delete in __apply__(__expandpath__(glob_set)):
                    if not os.path.exists(file_to_delete):
                        continue
                    LOGGER.info("Removing %s", file_to_delete)
                    if os.path.isdir(file_to_delete):
                        shutil.rmtree(file_to_delete)
                    else:
                        os.remove(file_to_delete)
        except OSError as os_error:
            LOGGER.error(os_error)
        except:
            raise
    args = {'fileset': fileset,}
    return ((__execute__, args),)

def __zip__(directory, zipfilename):
    """Compress (Zip) folder

    Zip the specified *directory* into the zip file named *zipfilename*

    :param directory: Directory to zip
    :param zipfilename: Name of resulting compression
    """
    def __execute__(**kwargs):
        '''Perform zip'''
        assert os.path.isdir(kwargs['directory']) and kwargs['zipfile']
        LOGGER.info("Zipping %s as %s", kwargs['directory'], kwargs['zipfile'])
        with contextlib.closing(zipfile.ZipFile(
            kwargs['zipfile'],
            "w",
            zipfile.ZIP_DEFLATED)) as zip_file:
            for paths in os.walk(kwargs['directory']):
                for file_name in paths[2]:
                    absfn = os.path.join(paths[0], file_name)
                    zip_file_name = absfn[len(directory) + len(os.sep):]
                    zip_file.write(absfn, zip_file_name)
    return ((__execute__, {'directory': directory, 'zipfile': zipfilename,}),)

#Misc Tasks
def __echo__(msg, tofile):
    """Write a string to file

    Write the given *msg* to a file named *tofile*

    *Notice*, `echo` will overwrite the file if it already exists

    :param msg: Message to write to file
    :param tofile: file to which the message is written
    """
    def __execute__(**kwargs):
        '''Perform echo to file'''
        with open(kwargs['tofile'], "w") as file_to_write:
            file_to_write.write(kwargs['msg'])
    return ((__execute__, {'msg': msg, 'tofile': tofile,}),)

def __log__(msg, lvl=logging.INFO):
    """Log *msg* using tasks global logger

    Emit the message (*msg*) to the *xnt.tasks* logger using either the default
    log level (*INFO*) or any valid specified value of `logging` module

    :param msg: Message to log
    :param lvl: Log Level of message. Default `INFO`
    """
    def __execute__(**kwargs):
        '''Perform logging operation'''
        LOGGER.log(kwargs['lvl'], kwargs['msg'])
    return ((__execute__, {'msg': msg, 'lvl': lvl,}),)

def __load_build__(buildfile="./build.py"):
    """Load build file
    Load the build.py and return the resulting import
    """
    path = os.path.dirname(buildfile)
    build = os.path.basename(buildfile)
    buildmodule = os.path.splitext(build)[0]
    if not path:
        path = os.getcwd()
    else:
        path = os.path.abspath(path)
    sys.path.append(path)
    cwd = os.getcwd()
    os.chdir(path)
    if not os.path.exists(build):
        LOGGER.error("There was no build file")
        sys.exit(1)
    try:
        return __import__(buildmodule, fromlist=[])
    except ImportError:
        LOGGER.error("HOW?!")
        return None
    finally:
        sys.path.remove(path)
        del sys.modules[buildmodule]
        os.chdir(cwd)

#pylint: disable=R0912
def __xntcall__(buildfile, targets=None, props=None):
    """Invoke xnt on another build file in a different directory

    :param: path - to the build file (including build file)
    :param: targets - list of targets to execute
    :param: props - dictionary of properties to pass to the build module
    """
    def __execute__(**kwargs):
        '''Perform xntcall'''
        def invoke_build(build, targets=None, props=None):
            """Invoke Build with `targets` passing `props`"""
            def call_target(target_name, props):
                """Call target on build module"""
                def process_params(params, existing_props=None):
                    """Parse and separate properties"""
                    properties = existing_props if existing_props else {}
                    for param in params:
                        name, value = param.split("=")
                        properties[name] = value
                    return properties
                def __get_properties():
                    """Return the properties dictionary of the build module"""
                    try:
                        return getattr(build, "PROPERTIES")
                    except AttributeError:
                        LOGGER.warning("Build file specifies no properties")
                        return None
                try:
                    if props and len(props) > 0:
                        setattr(build,
                                "PROPERTIES",
                                process_params(props, __get_properties()))
                    target = getattr(build, target_name)
                    error_code = target()
                    return error_code if error_code else SUCCESS
                except AttributeError:
                    LOGGER.error("There was no target: %s", target_name)
                    return ERROR
                except RuntimeError as ex:
                    LOGGER.critical(ex)
                    return UNKNOWN_ERROR
            if not targets:
                targets = ['default',]
            for target in targets:
                error_code = call_target(target, props)
                if error_code:
                    return error_code
            return SUCCESS

        build = __load_build__(kwargs['buildfile'])
        path = os.path.dirname(kwargs['buildfile'])
        def closure():
            '''closure around build invocation'''
            return invoke_build(
                build,
                targets=kwargs['targets'],
                props=kwargs['props'])
        return __run_in__(closure, path)
    args = {'buildfile': buildfile, 'targets': targets, 'props': props, }
    return ((__execute__, args),)

def __xnt_list_targets__(buildfile):
    '''List targets (and doctstrings) of the provided build module'''
    def __execute__(**kwargs):
        '''Perform listing'''
        build = __load_build__(kwargs['buildfile'])
        try:
            for attr in dir(build):
                try:
                    func = getattr(build, attr)
                    if func.decorator == "target":
                        print(attr + ":")
                        if func.__doc__:
                            print(func.__doc__)
                        print("")
                except AttributeError:
                    pass
        except AttributeError as ex:
            LOGGER.error(ex)
            return ERROR
        return SUCCESS
    args = {'buildfile': buildfile,}
    return ((__execute__, args),)

def __run_in__(func, path):
    cwd = os.getcwd()
    os.chdir(path)
    result = func()
    os.chdir(cwd)
    return result

def __call__(command, stdout=None, stderr=None, path=None):
    """ Execute the given command, redirecting stdout and stderr
    to optionally given files

    :param: command - list of command and arguments
    :param: stdout - file to redirect standard output to, if given
    :param: stderr - file to redirect standard error to, if given
    :param: path - directory to execute process
    :return: the error code of the subbed out call, `$?`
    """
    def __execute__(**kwargs):
        '''Perform subprocess call'''
        def closure():
            '''closure around subprocess call'''
            return subprocess.call(
                args=kwargs['command'],
                stdout=kwargs['stdout'],
                stderr=kwargs['stderr'])
        if not kwargs['path']:
            cwd = os.getcwd()
        else:
            cwd = kwargs['path']
        return __run_in__(closure, cwd)
    args = {'command': command,
            'stdout': stdout,
            'stderr': stderr,
            'path': path,}
    return ((__execute__, args),)

def __setup__(command=None, commands=None, directory=None):
    """Invoke the ``setup.py`` file in the current or specified directory

    :param: command - a single command to run
    :param: commands - list of commands and options to run/ append
    :param: dir - (optional) directory to run from
    :return: the error code of the execution, `$?`
    """
    def __execute__(**kwargs):
        '''Perform python setup.py commands'''
        cmd = [sys.executable, "setup.py",]
        for command in kwargs['commands']:
            cmd.append(command)
        return __apply__(__call__(cmd, path=kwargs['directory']))
    if not commands:
        commands = []
    if command:
        commands.append(command)
    assert len(commands) > 0
    args = {'commands': commands, 'directory': directory,}
    return ((__execute__, args),)

def __which__(program):
    """Similar to Linux/Unix `which`: return (first) path of executable

    :param program: program name to search for in PATH
    :return: Return the PATH of `program` or None
    """
    def __execute__(**kwargs):
        '''Perform which lookup'''
        def is_exe(fpath):
            """Determine if argument exists and is executable"""
            return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

        fpath = os.path.split(kwargs['program'])
        if fpath[0]:
            if is_exe(kwargs['program']):
                return kwargs['program']
        else:
            for path in os.environ["PATH"].split(os.pathsep):
                path = path.strip('"')
                exe_file = os.path.join(path, kwargs['program'])
                if is_exe(exe_file):
                    return exe_file
        return None

    return ((__execute__, {'program': program,}),)

def __in_path__(program):
    """Return boolean result if program is in PATH environment variable

    :param program: Program name to search for in PATH
    :return: Return the PATH of `program` or None
    """
    def __execute__(**kwargs):
        '''Perform which test'''
        return __apply__(__which__(kwargs['program']))
    return ((__execute__, {'program': program,}),)

def __apply__(func_tuple):
    '''Execute function tuple'''
    error_codes = []
    for statement in func_tuple:
        func = statement[0]
        args = statement[1]
        error_codes.append(func(**args))
    if error_codes:
        return error_codes[-1]
    return None