[zorg] r320453 - Add debugsign and test suite

Tim Hammerquist via llvm-commits llvm-commits at lists.llvm.org
Mon Dec 11 18:36:18 PST 2017


Author: penryu
Date: Mon Dec 11 18:36:17 2017
New Revision: 320453

URL: http://llvm.org/viewvc/llvm-project?rev=320453&view=rev
Log:
Add debugsign and test suite

Added:
    zorg/trunk/codesign/
    zorg/trunk/codesign/debugsign/
    zorg/trunk/codesign/debugsign/.coveragerc
    zorg/trunk/codesign/debugsign/.gitignore
    zorg/trunk/codesign/debugsign/README.md
    zorg/trunk/codesign/debugsign/dbsign/
    zorg/trunk/codesign/debugsign/dbsign/__init__.py
    zorg/trunk/codesign/debugsign/dbsign/ansi.py
    zorg/trunk/codesign/debugsign/dbsign/commands.py
    zorg/trunk/codesign/debugsign/dbsign/logger.py
    zorg/trunk/codesign/debugsign/dbsign/result.py
    zorg/trunk/codesign/debugsign/dbsign/security.py
    zorg/trunk/codesign/debugsign/dbsign/shell.py
    zorg/trunk/codesign/debugsign/debugsign   (with props)
    zorg/trunk/codesign/debugsign/requirements.txt
    zorg/trunk/codesign/debugsign/unittests/
    zorg/trunk/codesign/debugsign/unittests/__init__.py
    zorg/trunk/codesign/debugsign/unittests/test_ansi.py
    zorg/trunk/codesign/debugsign/unittests/test_authdb.py
    zorg/trunk/codesign/debugsign/unittests/test_commands.py
    zorg/trunk/codesign/debugsign/unittests/test_logger.py
    zorg/trunk/codesign/debugsign/unittests/test_result.py
    zorg/trunk/codesign/debugsign/unittests/test_security.py
    zorg/trunk/codesign/debugsign/unittests/test_shell.py

Added: zorg/trunk/codesign/debugsign/.coveragerc
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/.coveragerc?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/.coveragerc (added)
+++ zorg/trunk/codesign/debugsign/.coveragerc Mon Dec 11 18:36:17 2017
@@ -0,0 +1,24 @@
+[run]
+branch = True
+source =
+  dbsign
+
+[report]
+# Regexes for lines to exclude from consideration
+exclude_lines =
+    # Have to re-enable the standard pragma
+    pragma: no cover
+    # Don't complain about missing debug-only code:
+    def __repr__
+    if self\.debug
+    # Don't complain if tests don't hit defensive assertion code:
+    raise AssertionError
+    raise NotImplementedError
+    # Don't complain if non-runnable code isn't run:
+    if 0:
+    if __name__ == .__main__.:
+ignore_errors = True
+show_missing = True
+
+[html]
+directory = coverage_html_report

Added: zorg/trunk/codesign/debugsign/.gitignore
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/.gitignore?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/.gitignore (added)
+++ zorg/trunk/codesign/debugsign/.gitignore Mon Dec 11 18:36:17 2017
@@ -0,0 +1,8 @@
+*.pyc
+/.coverage
+
+# pycharm noise
+/.idea/
+
+# virtualenv noise
+/venv*

Added: zorg/trunk/codesign/debugsign/README.md
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/README.md?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/README.md (added)
+++ zorg/trunk/codesign/debugsign/README.md Mon Dec 11 18:36:17 2017
@@ -0,0 +1,47 @@
+# LLDB Codesigning Identities
+
+The **`debugsign`** tool contains the logic necessary to configure
+and enable automation-friendly codesigning for LLDB builds on Darwin
+systems.
+
+The **import** sub-command requires a P12 archive containing a
+codesigning identity. A P12 archive compatible with the
+**`debugsign`** tool can produced using the following procedure:
+
+# Generating a Codesigning Identity
+
+* Open **Keychain Access**
+* -> **App Menu**
+* -> **Certificate Assistant**
+* -> **Create a Certificate**
+
+In the dialog that appears:
+
+* Name: **`lldb_codesign`**
+* Certificate Type: **`Code Signing`**
+* use defaults for all other values and complete the dialog sequence
+
+The identity (private key, public key, self-signed certificate) will
+be created in the default login keychain. Export the identity:
+
+* Select **login** keychain
+* Select **My Certificates**
+* Select entire **lldb_codesign** item (not sub-components)
+* **File** -> **Export Items**
+* Select File Format: **`Personal Information Exchange (.p12)`**
+* When asked for a password to protect the exported archive, enter
+  **`lldb_codesign`** for both options.
+* If **Keychain Access** asks for permission to export, click **Allow**
+
+The resulting ``.p12`` file can be passed to **`debugsign`**'s import
+sub-command.
+
+# Using the debugsign Script
+
+Once you've created the **`.p12`** archive, you can use it to enable
+codesigning on any number of machines for as long as the generated
+certificate is valid.
+
+The general use case for ``debugsign`` is described in the script's
+help output: **`debugsign help`**
+

Added: zorg/trunk/codesign/debugsign/dbsign/__init__.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/__init__.py?rev=320453&view=auto
==============================================================================
    (empty)

Added: zorg/trunk/codesign/debugsign/dbsign/ansi.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/ansi.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/ansi.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/ansi.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,41 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.ansi
+"""
+
+from __future__ import print_function
+
+import sys
+
+
+def ANSI(color, msg):  # type: (str, str) -> str
+    if sys.stdout.isatty():
+        pre, post = _ANSI_CODES[color], _ANSI_CODES['clear']
+        msg = "{}{}{}".format(pre, msg, post)
+    return msg
+
+
+def OK(msg):  # type: (str) -> str
+    return ANSI('green', msg)
+
+
+def INFO(msg):  # type: (str) -> str
+    return ANSI('blue', msg)
+
+
+def WARN(msg):  # type: (str) -> str
+    return ANSI('purple', msg)
+
+
+def ERROR(msg):  # type: (str) -> str
+    return ANSI('red', msg)
+
+
+_ANSI_CODES = {
+    'clear':    '\033[0m',
+    'blue':     '\033[1;34m',
+    'green':    '\033[1;32m',
+    'purple':   '\033[1;35m',
+    'red':      '\033[1;31m',
+}

Added: zorg/trunk/codesign/debugsign/dbsign/commands.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/commands.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/commands.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/commands.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,422 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.commands
+"""
+
+from __future__ import print_function
+
+import os
+import sys
+
+from dbsign.ansi import ERROR, INFO, OK, WARN
+import dbsign.logger as L
+import dbsign.security as S
+import dbsign.shell as sh
+
+
+#
+# Globals and configurables
+#
+
+OVERVIEW_TEXT = '''\
+
+{1}
+OVERVIEW:
+
+    To configure code signing on a new system, do the following (in order):
+
+        {0} setup
+        {0} import P12_FILE     # MUST BE DONE FROM GUI CONSOLE!
+
+    To verify the configuration:
+
+        {0} check               # Not foolproof, but catches most issues
+
+    To enable access to the identity for code signing (eg, from Jenkins job):
+
+        {0} prep
+
+    To replace the configured identity with a new one:
+
+        {0} remove              # Removes the whole keychain!
+        {0} import NEW_P12      # Must be done from GUI console!
+
+    Note that this script currently assumes the following:
+
+      * The identity's common name will be "lldb_codesign"
+      * The keychain will be named "lldb_codesign"
+      * The keychain will be locked using the password "lldb_codesign"
+      * The P12 archive will be encrypted with the password "lldb_codesign"
+
+    This is intended to make it trivial to codesign utilities using the
+    imported certificate, without exposing any local account information
+    (eg, user's login keychain password). Please take these factors into
+    account when evaluating security.
+'''
+
+log = L.get_logger(__name__)
+
+CFG = {
+    'debug':        False,
+    'executable':   os.path.basename(sys.argv[0]),
+    'identity':     'lldb_codesign',
+    'id_file':      None,       # from command line argument
+    'keynick':      'lldb',
+    'keydb':        None,
+    'keypass':      'lldb_codesign',
+    'privileges':   ['system.privilege.taskport'],
+}
+
+
+#
+# Top-Level Commands
+#
+
+def cmd_check():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    identity = CFG['identity']
+    exe = CFG['executable']
+
+    for priv in CFG['privileges']:
+        print('Verifying privilege {} ... '.format(priv), end='')
+        res_priv = S.verify_privilege(priv)
+        if res_priv:
+            print(OK('OK'))
+        else:
+            print(WARN('NOT SET'))
+            log.debug(res_priv.value)
+            print(WARN('WARNING'), 'Privileges have not been set.')
+            print(INFO('To set, run: {} --unsafe setup'.format(exe)))
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock.value)
+        print(WARN('WARNING'), 'Keychain not configured.')
+        print(INFO('Please run: {} setup'.format(exe)))
+        return 1
+
+    print("Verifying keychain ... ", end='')
+    res_find = S.keychain_exists(keydb)
+    if res_find:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_find)
+        print(INFO(res_find.value))
+        return 2
+
+    print("Searching for identity in keychain ... ", end='')
+    res_find = S.identity_installed(identity, keydb)
+    if res_find:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_find)
+        print(WARN('WARNING'), res_find.value)
+        print(INFO("Please run: {} import".format(exe)))
+        return 3
+
+    print('Verifying identity ... ', end='')
+    res_id = S.verify_identity(identity, keydb)
+    if res_id:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_id.value)
+        print(WARN('WARNING'), "Unable to verify identity")
+        print(INFO('Please run: {} import'.format(exe)))
+        return 4
+
+    return 0
+
+
+def cmd_clean():  # type: () -> int
+    identity = CFG['identity']
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(WARN('FAILED'))
+        log.debug(res_unlock.value)
+        print(INFO('Failed to unlock keychain.'))
+
+    print('Removing identity and trust settings ... ', end='')
+    res_id = S.delete_identity(identity, keydb)
+    if res_id:
+        print(OK('OK'))
+    else:
+        print(WARN('Failed to remove identity'))
+        log.debug(res_id.value)
+
+    print('Backing up and removing keychain ... ', end='')
+    res_key = S.delete_keychain(keydb, backup=True)
+    if res_key:
+        print(OK('OK'))
+    else:
+        print(WARN('Failed to remove keychain'))
+        log.debug(res_id.value)
+
+    return 0
+
+
+def cmd_help(parser):  # type: (argparse.ArgumentParser) -> int
+    print(OVERVIEW_TEXT.format(
+        CFG['executable'],
+        parser.format_help()))
+    return 0
+
+
+def cmd_import():  # type: () -> int
+    exe = CFG['executable']
+    identity = CFG['identity']
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    id_file = CFG['id_file']
+    id_pass = identity
+
+    _auth_sudo()
+
+    if 'SSH_CONNECTION' in os.environ or 'TERM_SESSION_ID' not in os.environ:
+        print(WARN('WARNING!'), "Remote console session detected!",
+              "This procedure must be performed from the system console.")
+
+    print('Verifying privileges ... ', end='')
+    res_verify_privs = S.verify_privileges(CFG['privileges'])
+    if res_verify_privs:
+        print(OK('OK'))
+    else:
+        print(WARN("WARNING"))
+        log.debug(res_verify_privs)
+        print(WARN("Privileges have not been set. Trust may fail."))
+        print(INFO("To set privileges, run: {} --unsafe setup".format(exe)))
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock)
+        print(INFO("Failed to unlock keychain."),
+              "Run: {} check".format(exe))
+        return 1
+
+    print("Importing new identity {} ... ".format(identity), end='')
+    res_import = S.import_identity(keydb, keypass, identity, id_file, id_pass)
+    if res_import:
+        print(OK('OK'))
+        log.debug(res_import.value)
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_import)
+        print(ERROR('ERROR'), res_import.value)
+        if 'exists' in res_import.value:
+            print(WARN("To remove existing identity:"),
+                  "{} remove".format(exe))
+        return 2
+
+    print(WARN("This will test codesigning with the configured identity"))
+    print(WARN("Please authenticate (if requested) and click 'Always Allow'"))
+
+    print("Trusting identity ... ", end='')
+    res_trust = S.trust_identity(identity, keydb)
+    if res_trust:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_trust)
+        print(INFO("Trust unsuccessful:"), res_trust.value)
+        if 'unknown error' in res_trust.value:
+            print(WARN("Please ensure this step is performed"
+                       " from the system console!"))
+
+        print("Rolling back imported identity ... ", end='')
+        res_remove = S.delete_identity(identity, keydb)
+        if res_remove:
+            print(OK('OK'))
+        else:
+            print(ERROR('FAILED'))
+            log.debug(res_remove)
+            print(res_trust.value)
+            return 4
+        return 3
+
+    return 0
+
+
+def cmd_lint():  # type: () -> int
+    print(OK('Running linters... '))
+    lint_problems = _run_linter()
+    if lint_problems:
+        print(WARN('Lint:'), len(lint_problems))
+        map(log.warn, lint_problems)
+
+    return len(lint_problems)
+
+
+def cmd_prep():  # type: () -> int
+    """Deliberately terse method for use in CI"""
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if not res_unlock:
+        log.debug(res_unlock.value)
+        print(ERROR('ERROR'), 'Unable to access signing identity')
+        return 1
+
+    return 0
+
+
+def cmd_remove():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    identity = CFG['identity']
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        log.debug(res_unlock.value)
+        print(ERROR('ERROR'), 'Failed to unlock keychain')
+
+    print("Removing identity from keychain ... ", end='')
+    res_rm_id = S.delete_identity(identity, keydb)
+    if res_rm_id:
+        print(OK('OK'))
+    else:
+        print(WARN('FAILED'))
+        log.debug(res_rm_id)
+        print(WARN('WARNING'), "Failed to delete identity from keychain.")
+        print(INFO(res_rm_id.value))
+
+    return 0
+
+
+def cmd_setup():  # type: () -> int
+    keydb = CFG['keydb']
+    keypass = CFG['keypass']
+    exe = CFG['executable']
+
+    print("Configuring keychain ... ", end='')
+    res_create = S.create_keychain(keydb, keypass)
+    if res_create:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_create)
+        print(INFO('Keychain creation failed'))
+        return 1
+
+    print("Unlocking keychain ... ", end='')
+    res_unlock = S.unlock_keychain(keydb, keypass)
+    if res_unlock:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_unlock)
+        if 'keychain could not be found' in res_unlock.value:
+            print(INFO("Keychain creation failed"))
+        else:
+            print(INFO("Failed to unlock keychain"))
+        print(INFO(res_unlock.value))
+        return 2
+
+    print("Adding keychain to search list ... ", end='')
+    res_searchable = S.add_to_search_list(keydb)
+    if res_searchable:
+        print(OK('OK'))
+    else:
+        print(ERROR('FAILED'))
+        log.debug(res_searchable)
+        print(INFO("Failed to add keychain to search list"))
+        print(WARN("codesign will not be able to find the signing identity."))
+        return 3
+
+    privs = CFG['privileges']
+    print("Checking privileges ... ", end='')
+    if S.verify_privileges(privs):
+        print(OK('OK'))
+    else:
+        print(INFO('NOT SET'))
+
+        _auth_sudo()
+        if not os.getenv(S.UNSAFE_FLAG, False):
+            print(INFO('NOTE'), 'Altering privileges may not be safe.')
+            print(INFO('NOTE'), 'Re-run with the --unsafe flag to enable.')
+        else:
+            priv_value = 'allow'
+            for priv in CFG['privileges']:
+                print('Setting privilege {} ... '.format(priv), end='')
+                res_priv = S.authdb_privilege_write(priv, priv_value)
+                if res_priv:
+                    print(OK('OK'))
+                else:
+                    print(INFO('not set'))
+                    log.debug(res_priv.value)
+                    print(INFO('Privileges have not been set.'))
+                    print(INFO('Please re-run: {} setup'.format(exe)))
+                    return 4
+
+    return 0
+
+
+def cmd_test():  # type: () -> int
+    _auth_sudo()
+
+    print(OK('Running unittests... '))
+    test_problems = _run_unittests()
+    if test_problems:
+        print(ERROR('Failures:'), len(test_problems))
+        map(log.debug, test_problems)
+
+    return len(test_problems)
+
+
+def _auth_sudo():  # type: () -> Result
+    cmd_sudo_check = sh.sudo_run(['-n'])
+    if not cmd_sudo_check:
+        print(WARN("If prompted, authenticate with sudo ... "))
+        cmd_auth = sh.sudo_run(['ls'])
+        if not cmd_auth:
+            print(WARN("WARNING"), "sudo authentication failed")
+        return cmd_auth
+    else:
+        return cmd_sudo_check
+
+
+def _run_linter():  # type: () -> list(str)
+    report_file = 'flake8_report.pep8.txt'
+    fmt = 'lint: %(path)s:%(row)d:%(col)d: %(code)s %(text)s'
+    lint_paths = ['./debugsign', './dbsign/', './unittests/']
+
+    cmd_flake = sh.run(['flake8', '--tee', report_file,
+                        '--format={}'.format(fmt)] + lint_paths)
+    return cmd_flake.stdout.splitlines()
+
+
+def _run_unittests():  # type: () -> list(str)
+    try:
+        import unittest2 as unittest
+    except ImportError:
+        import unittest
+
+    tests = unittest.TestLoader().discover('unittests')
+    test_result = unittest.TextTestRunner(
+        stream=sys.stdout,
+        verbosity=2,
+    ).run(tests)
+
+    problems = test_result.errors + test_result.failures
+    return [str(problem[0]) for problem in problems]

Added: zorg/trunk/codesign/debugsign/dbsign/logger.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/logger.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/logger.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/logger.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,45 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.logger
+
+logging configuration for debugsign
+"""
+
+
+from __future__ import print_function
+
+import logging
+import os
+
+
+DATE_FORMAT = "%H:%M:%S"
+LOG_FORMAT = "%(asctime)s %(name)13s:%(lineno)-4s %(levelname)7s %(message)s"
+BASE_LOGLEVEL = logging.WARNING
+LOGLEVEL = os.getenv('DEBUGSIGN_LOGLEVEL', BASE_LOGLEVEL)
+
+logging.basicConfig(format=LOG_FORMAT,
+                    datefmt=DATE_FORMAT,
+                    level=LOGLEVEL)
+
+_root = logging.getLogger()
+log = logging.getLogger(__name__)
+
+
+def get_logger(name):  # type: (str) -> logging.Logger
+    log.debug("Fetching logger for '%s'", name)
+    return logging.getLogger(name)
+
+
+def set_level(level):  # type: (int) -> int
+    # must be done on root logger or it won't propagage to other loggers
+    _root.setLevel(level)
+    return _root.getEffectiveLevel()
+
+
+def normalize(level):  # type: (int) -> int
+    if level < logging.DEBUG:
+        return logging.DEBUG
+    if level > logging.CRITICAL:
+        return logging.CRITICAL
+    return level

Added: zorg/trunk/codesign/debugsign/dbsign/result.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/result.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/result.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/result.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,59 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.result
+
+result classes for debugsign
+"""
+
+from __future__ import print_function
+
+import dbsign.logger as logger
+
+
+log = logger.get_logger(__name__)
+
+
+#
+# Result class
+#
+
+class Result(object):
+    def __init__(self, value):  # type: () -> ()
+        self._checked = False
+        self._value = value
+
+    def __del__(self):  # type: () -> ()
+        assert self._checked
+
+    def __nonzero__(self):  # type: () -> bool
+        raise NotImplementedError("{} does not support boolean evaluation".
+                                  format(self.__class__.__name__))
+
+    def __repr__(self):  # type: () -> str
+        return "{0.__class__.__name__}({0._value!r})".format(self)
+
+    @property
+    def checked(self):  # type: () -> bool
+        return self._checked
+
+    @property
+    def value(self):  # type: () -> str
+        self._checked = True
+        return self._value
+
+    def renew(self):  # type: () -> Result
+        self._checked = False
+        return self
+
+
+class Failure(Result):
+    def __nonzero__(self):  # type: () -> bool
+        self._checked = True
+        return False
+
+
+class Success(Result):
+    def __nonzero__(self):  # type: () -> bool
+        self._checked = True
+        return True

Added: zorg/trunk/codesign/debugsign/dbsign/security.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/security.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/security.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/security.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,411 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.security
+"""
+
+import os
+import plistlib
+import re
+import tempfile
+
+import dbsign.logger as L
+from dbsign.result import Failure, Success
+from dbsign.shell import run, sudo_run
+
+
+log = L.get_logger(__name__)
+
+UNSAFE_FLAG = "DEBUGSIGN_UNSAFE"
+
+#
+# Helper functions
+#
+
+
+def derive_keychain_extension():  # type: () -> str
+    log.debug("Determining keychain file extension")
+
+    ext = "keychain-db"     # used by macOS 10.12+
+    log.debug("Starting with default extension: '%s'", ext)
+
+    sw_vers = run('sw_vers -productVersion'.split())
+    if sw_vers:
+        version_string = sw_vers.stdout
+        version = list(map(int, version_string.decode().split('.')))
+        if version < [10, 12]:  # pragma: no cover
+            ext = 'keychain'
+            log.debug("Pre-Sierra OS detected (%s). Using: '%s'",
+                      version_string, ext)
+    else:  # pragma: no cover
+        log.warn("Failed to query OS version: %s", sw_vers)
+
+    sec_default = run("security default-keychain".split())
+    if sec_default:
+        # strip security garbage to get filename
+        keydb = strip_security_noise(sec_default.stdout)
+        ext = os.path.splitext(keydb)[1].lstrip('.')
+        log.debug("Derived extension '%s' from login keychain '%s'",
+                  ext, keydb)
+
+    log.debug("Using keychain file extension: %s", ext)
+    return ext
+
+
+def keychain_to_file(name):  # type: (str) -> str
+    """Converts user keychain name to a full path."""
+    log.debug('keychain_to_file({!r})'.format(name))
+    ext = _KEYCHAIN_EXT
+    key_dir = os.path.expanduser('~/Library/Keychains')
+    keydb = "{}/{}.{}".format(key_dir, name, ext)
+    log.debug('keychain_to_file => {!r}'.format(keydb))
+    return keydb
+
+
+def rules_from(data):  # type: (str) -> list(str)
+    log.debug("rules_from({!r})".format(data))
+    dict_ = plistlib.readPlistFromString(data)
+    log.debug(dict_)
+    return dict_.get('rule', [])
+
+
+def strip_security_noise(s):  # type: (str) -> str
+    return s.decode().strip(''' "'\n''')
+
+
+_KEYCHAIN_EXT = derive_keychain_extension()
+
+
+#
+# authorizationdb Operations
+#
+
+
+def authdb_privilege_read(privilege):  # type: (str) -> Result
+    log.debug('authdb_privilege_read("%s")', privilege)
+
+    res = run(["security", "authorizationdb", "read", privilege])
+    if not res:
+        err_msg = 'Failed to read privilege {}: {}'.format(
+            privilege, res.stdout)
+        log.warn(err_msg)
+        return Failure(err_msg)
+
+    return Success(res.stdout)
+
+
+def authdb_privilege_write(privilege, value):  # type: (str, str) -> Result
+    log.debug('authdb_privilege_write("%s", "%s")', privilege, value)
+
+    if not os.getenv(UNSAFE_FLAG, False):
+        log.info("Unauthorized unsafe operation")
+        log.info("To enable running this command, pass the --unsafe flag or"
+                 " set the %s environment variable.", UNSAFE_FLAG)
+        return Failure("Setting privileges requires --unsafe flag")
+
+    res = sudo_run(['security', 'authorizationdb', 'write', privilege, value])
+    if not res:
+        err_msg = 'Failed to set authdb privilege {} => {}'.format(
+            privilege, value)
+        log.warn(err_msg)
+        return Failure(err_msg)
+
+    return Success('{} => {}'.format(privilege, value))
+
+
+def verify_privilege(priv):  # type: (str) -> Result
+    log.debug('verify_privilege(%s)', repr(priv))
+
+    rule_value = 'allow'
+
+    res_read = authdb_privilege_read(priv)
+    if not res_read:
+        log.debug(res_read)
+        return res_read
+
+    rules = rules_from(res_read.value)
+    if rule_value not in rules:
+        return Failure(rules)
+
+    return Success(priv)
+
+
+def verify_privileges(privs):  # type: (list(str)) -> Result
+    log.debug('verify_privileges(%s)', repr(privs))
+
+    for priv in privs:
+        res = verify_privilege(priv)
+        if not res:
+            log.debug(res)
+            return res.renew()
+
+    return Success(privs)
+
+
+#
+# Keychain Operations
+#
+
+def add_to_search_list(keydb):  # type: (str) -> Result
+    """
+    Adds keychain to security search list.
+    codesign will only search keychains on this list.
+    """
+    res_list = get_search_list()
+    if not res_list:
+        return res_list.renew()
+
+    keylist = res_list.value
+    if keydb in keylist:
+        # keychain already present; noop
+        log.debug(keylist)
+        log.debug("{} already present in search list: {}".
+                  format(keydb, keylist))
+        return Success(keydb)
+
+    keylist.append(keydb)
+    cmd_params = ['security', 'list-keychains', '-d', 'user', '-s']
+    sec_add = run(cmd_params + keylist)
+    if not sec_add:  # pragma: no cover
+        return Failure(sec_add)
+
+    return Success(keydb)
+
+
+def create_keychain(keydb, password):  # type: (str, str) -> Result
+    log.debug("Creating keychain: %s", keydb)
+
+    if not keychain_exists(keydb):
+        sec_make = run(['security', 'create-keychain', '-p', password, keydb])
+        if not sec_make:  # pragma: no cover
+            return Failure(sec_make)
+
+    sec_add = add_to_search_list(keydb)
+    if not sec_add:  # pragma: no cover
+        return Failure(sec_add)
+
+    # Invoking without arguments sets timeout=infinite.
+    # Prevents keychain from locking after timeout.
+    sec_settings = run(['security', 'set-keychain-settings', keydb])
+    if not sec_settings:  # pragma: no cover
+        return Failure(sec_settings)
+
+    return Success(sec_settings)
+
+
+def delete_keychain(keydb, backup=True):  # type: (str, bool) -> Result
+    log.debug('delete_keychain({!r}'.format(keydb))
+
+    if backup:
+        new_keydb = "{}.bak-{}".format(keydb, str(os.getpid()))
+        try:
+            os.rename(keydb, new_keydb)
+            log.info("Backed up {!r} to {!r}".format(keydb, new_keydb))
+        except OSError as ose:  # pragma: no cover
+            log.debug(ose)
+            return Failure(ose)
+
+    sec_delete = run(['security', 'delete-keychain', keydb])
+    if not (sec_delete or backup):  # pragma: no cover
+        # If backed up, security WILL return non-zero.
+        # However, it will also remove the keychain from the search list.
+        log.debug(sec_delete)
+
+    if os.path.exists(keydb):   # pragma: no cover
+        os.unlink(keydb)        # if it's still around, brute force rm it
+
+    return Success(new_keydb)
+
+
+def get_search_list():  # type: () -> Result
+    sec_list = run(['security', 'list-keychains'])
+    if not sec_list:  # pragma: no cover
+        return Failure("Failed to get search list: {}".format(sec_list))
+
+    list_lines = map(strip_security_noise, sec_list.stdout.splitlines())
+    return Success(list_lines)
+
+
+def keychain_exists(keydb):  # type: (str, str) -> bool
+    log.debug("keychain_exists(%s)", repr(keydb))
+
+    if not os.path.exists(keydb):
+        return Failure('No file at keychain path {!r}'.format(keydb))
+
+    log.debug('keychain_exists: found keychain file at `%s`', keydb)
+
+    return Success(keydb)
+
+
+def unlock_keychain(keydb, password):  # type: (str, str) -> Result
+    log.debug('unlock_keychain({!r}, {!r})'.format(keydb, '********'))
+
+    sec_unlock = run(['security', 'unlock-keychain', '-p', password, keydb])
+    if not sec_unlock:
+        return Failure(sec_unlock)
+
+    return Success(keydb)
+
+
+#
+# Identity Operations
+#
+
+def delete_identity(identity, keydb):  # type: (str, str) -> Result
+    log.debug('delete_identity({!r}'.format(identity))
+
+    sec_delete = run(['security', 'delete-identity',
+                      '-tc', identity, keydb])
+    if not sec_delete:
+        return Failure(sec_delete.stderr)
+
+    return Success(identity)
+
+
+def find_identity(identity, keydb, valid=False):  # type: (str, str) -> Result
+    log.debug('find_identity({!r}, {!r}, valid={!r})'.
+              format(identity, keydb, valid))
+
+    if valid:
+        flags = '-vp'
+    else:
+        flags = '-p'
+
+    sec = run(['security', 'find-identity', flags, 'codesigning', keydb])
+    if not sec:
+        log.debug(sec)
+        return Failure(sec)
+
+    m = re.search(r'\b{}\b'.format(identity), sec.stdout)
+    if not m:
+        log.debug("{}: {!r}".format(identity, sec.stdout))
+        return Failure('{!r} not found in {!r}'.format(identity, sec.stdout))
+
+    return Success(identity)
+
+
+def identity_installed(identity, keydb):  # type: (str, str) -> Result
+    log.debug('identity_installed({!r}, {!r})'.format(identity, keydb))
+
+    find_all = find_identity(identity, keydb, valid=False)
+    if not find_all:
+        return Failure('Identity not found')
+
+    find_valid = find_identity(identity, keydb, valid=True)
+    if not find_valid:
+        return Failure('Identity present but invalid')
+
+    return Success(identity)
+
+
+def import_identity(keydb, key_pass, identity, id_file, id_pass):
+    # type: (str, str, str, str, str) -> Result
+    log.debug('import_identity({!r}, {!r}, {!r}, {!r})'.format(
+        keydb, identity, id_file, '********'))
+
+    # ensure keychain exists
+    if not os.path.exists(id_file):
+        return Failure('Identity file {!r} not found'.format(id_file))
+
+    sec_add = add_to_search_list(keydb)
+    if not sec_add:
+        return Failure(sec_add)
+
+    res_find_id = identity_installed(identity, keydb)
+    if res_find_id:
+        return Failure("Identity exists: {!r}".format(res_find_id.value))
+
+    # import identity to keychain
+    cmd_import = run(['security', 'import', id_file,
+                      '-k', keydb, '-f', 'pkcs12',
+                      '-P', id_pass, '-T', '/usr/bin/codesign'])
+    if not cmd_import:
+        return Failure(cmd_import.stderr)
+
+    sec_part = sudo_run(['security', 'set-key-partition-list',
+                         '-S', 'apple', '-k', key_pass, keydb])
+    if not sec_part:
+        return Failure("Failed to authorize identity: {}".
+                       format(sec_part.stderr))
+
+    return Success("Imported identity {} from {}".format(identity, id_file))
+
+
+def trust_identity(identity, keydb):  # type: (str) -> Result
+    fd, tmpfile = tempfile.mkstemp()
+    fh = os.fdopen(fd, 'w')
+    log.debug("Temp filename: %s", tmpfile)
+
+    sec_extract = run(['security', 'find-certificate', '-p',
+                       '-c', identity, keydb])
+    if not sec_extract:
+        log.debug(sec_extract)
+        return Failure(sec_extract)
+
+    fh.write(sec_extract.stdout)
+    fh.close()
+
+    sec_add = sudo_run(['security', 'add-trusted-cert', '-d',
+                        '-p', 'basic', '-p', 'codeSign', tmpfile])
+    if not sec_add:
+        os.unlink(tmpfile)
+        return Failure("Failed to add cert:\n{}".format(sec_extract.stdout))
+
+    res_valid = find_identity(identity, keydb, valid=True)
+    if not res_valid:
+        return Failure("Invalid identity.")
+
+    sec_sign = run(['codesign', '--keychain', keydb, '-s', identity, tmpfile])
+    os.unlink(tmpfile)
+    if not sec_sign:
+        return Failure("Codesigning failed: {}".format(sec_sign.stderr))
+
+    return Success(identity)
+
+
+def verify_identity(identity, keydb):  # type: (str) -> Result
+    log.debug("verify_identity({!r}, {!r})".format(identity, keydb))
+
+    fd, filename = tempfile.mkstemp(suffix='-temp')
+    log.debug('Test file: {}'.format(filename))
+    os.close(fd)
+
+    res = verify_identity_with_file(identity, keydb, filename)
+    if os.path.exists(filename):
+        os.remove(filename)
+
+    return res
+
+
+def verify_identity_with_file(identity, keydb, filename):
+    # type: (str, str) -> Result
+    log.debug("verify_identity_with_file({!r}, {!r}, {!r})".
+              format(identity, keydb, filename))
+
+    # ensure tempfile exists
+    if not os.path.exists(filename):
+        return Failure("File {!r} doesn't exist!".format(filename))
+
+    # ensure signing identity exists and is valid
+    res_find = identity_installed(identity, keydb)
+    if not res_find:
+        log.debug(res_find)
+
+    # use identity to codesign the tempfile
+    cmd_sign = run(['codesign', '-fs', identity, filename])
+    if not cmd_sign:
+        return Failure(cmd_sign)
+
+    # check signing authority of file signature
+    cmd_check = run(['codesign', '-dvv', filename])
+    if not cmd_check:
+        return Failure(cmd_check)
+
+    # ensure signing authority matches the identity name
+    re_auth = r"^Authority=(?P<authority>{})$".format(identity)
+    m = re.search(re_auth, cmd_check.stderr, re.MULTILINE)
+    if not m:
+        log.debug(cmd_check)
+        return Failure('Identity not found in signature')
+
+    return Success(m.group('authority'))

Added: zorg/trunk/codesign/debugsign/dbsign/shell.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/dbsign/shell.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/dbsign/shell.py (added)
+++ zorg/trunk/codesign/debugsign/dbsign/shell.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,81 @@
+# COPYRIGHT LINE: FIXME
+
+"""
+dbsign.shell
+
+shell routines for debugsign
+"""
+
+from __future__ import print_function
+
+import os
+from subprocess import PIPE, Popen
+
+import dbsign.logger
+
+
+log = dbsign.logger.get_logger(__name__)
+
+
+class ShellCommand(object):
+    """
+    Represents the result of a shell command
+    """
+    def __init__(self, args, code, stdout, stderr):
+        # type: (list[str], int, str, str) -> ()
+        self.data = {
+            'args': args,
+            'code': code,
+            'stdout': stdout,
+            'stderr': stderr,
+        }
+
+    def __eq__(self, rhs):  # type: (ShellCommand) -> bool
+        return self.data == rhs.data
+
+    def __getattr__(self, attr):  # type: (str) -> T
+        if attr in self.data:
+            return self.data[attr]
+        raise AttributeError(attr)
+
+    def __nonzero__(self):  # type: () -> bool
+        return self.code == 0
+
+    def __repr__(self):  # type: () -> str
+        repr_fmt = "{0}(args={1.args!r}, code={1.code!r},"
+        repr_fmt += " stdout={1.stdout!r}, stderr={1.stderr!r})"
+        return repr_fmt.format(self.__class__.__name__, self)
+
+
+def __run(args, stdin=None):  # type: (list[str], str) -> ShellCommand
+    """internal function to run shell commands"""
+    try:
+        p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+        stdout, stderr = p.communicate(input=stdin)
+    except OSError as os_err:
+        log.debug('Unable to execute command: %s: %s', args, os_err)
+        raise
+
+    cmd = ShellCommand(args, code=p.returncode, stdout=stdout, stderr=stderr)
+    log.debug(cmd)
+    return cmd
+
+
+def run(args, stdin=None):  # type: (list[str]) -> ShellCommand
+    """Run a regular (non-sudo) command"""
+    log.debug("run(args=%s)", repr(args))
+
+    if os.path.basename(args[0]).startswith('su'):
+        log.info('run() called with illegal command `%s`', args)
+        raise RuntimeError('Unauthorized use of run; use sudo_run')
+
+    return __run(args, stdin)
+
+
+def sudo_run(args, stdin=None):  # type: (list[str]) -> ShellCommand
+    """Run a command with root privileges using sudo"""
+    log.debug("sudo_run(args=%s)", repr(args))
+
+    args.insert(0, 'sudo')
+
+    return __run(args, stdin)

Added: zorg/trunk/codesign/debugsign/debugsign
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/debugsign?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/debugsign (added)
+++ zorg/trunk/codesign/debugsign/debugsign Mon Dec 11 18:36:17 2017
@@ -0,0 +1,97 @@
+#!/usr/bin/env python -tt
+
+import argparse
+import os
+import sys
+
+import dbsign.commands as C
+import dbsign.logger as L
+import dbsign.security as S
+
+from dbsign.commands import CFG
+
+
+log = L.get_logger(CFG['executable'])
+
+
+def make_parser():  # type: () -> argparse.ArgumentParser
+    parser = argparse.ArgumentParser(prog=CFG['executable'], add_help=False)
+    parser.set_defaults(func=lambda: C.cmd_help(parser))
+    parser.add_argument('--debug', '-d', '--verbose', '-v',
+                        action='count', default=0,
+                        help='be more verbose')
+    parser.add_argument('--quiet', '-q',
+                        action='count', default=0,
+                        help='be quieter')
+    parser.add_argument('--unsafe', action='store_true',
+                        help='permit unsafe operations')
+    subparsers = parser.add_subparsers(help='Subcommands')
+
+    parser_help = subparsers.add_parser('help', help='Display usage info')
+    parser_help.set_defaults(func=lambda: C.cmd_help(parser))
+
+    parser_check = subparsers.add_parser(
+        'check', help='Validate current system settings')
+    parser_check.set_defaults(func=C.cmd_check)
+
+    parser_clean = subparsers.add_parser(
+        'clean', help='Remove identity and keychain')
+    parser_clean.set_defaults(func=C.cmd_clean)
+
+    parser_import = subparsers.add_parser(
+        'import', help='Import and trust new identity from .P12 file')
+    parser_import.set_defaults(func=C.cmd_import)
+    parser_import.add_argument('id_file', metavar='P12_FILE',
+                               help='Identity in .p12 format')
+
+    parser_setup = subparsers.add_parser(
+        'setup', help='Initialize keychain and privilege settings')
+    parser_setup.set_defaults(func=C.cmd_setup)
+
+    parser_unlock = subparsers.add_parser(
+        'prep', help='Unlock keychain and verify identity is valid')
+    parser_unlock.set_defaults(func=C.cmd_prep)
+
+    parser_remove = subparsers.add_parser(
+        'remove', help='Remove the installed identity')
+    parser_remove.set_defaults(func=C.cmd_remove)
+
+    parser_lint = subparsers.add_parser(
+        'lint', help='Lint the program source code')
+    parser_lint.set_defaults(func=C.cmd_lint)
+
+    parser_test = subparsers.add_parser(
+        'test', help='Test the program')
+    parser_test.set_defaults(func=C.cmd_test)
+
+    return parser
+
+
+def main(main_args):  # type: (list[str]) -> int
+    parser = make_parser()
+    args = parser.parse_args(main_args)
+
+    if args.unsafe:
+        os.environ[S.UNSAFE_FLAG] = 'YES'
+
+    # Logging setup
+    level = L.BASE_LOGLEVEL + (args.quiet - args.debug) * 10
+    L.set_level(L.normalize(level))
+
+    CFG['debug'] = (args.debug > 0)
+    CFG['keydb'] = '{}/Library/Keychains/{}.{}'.format(
+        os.environ['HOME'], CFG['keynick'], S.derive_keychain_extension())
+    if hasattr(args, 'id_file'):
+        CFG['id_file'] = args.id_file
+
+    log.debug('Command line parameters: %s', sys.argv)
+    log.debug('Arguments to main: %s', main_args)
+    log.debug('Parsed args: %s', args)
+    log.debug('Configuration: %s', CFG)
+
+    return args.func()
+
+
+if __name__ == '__main__':
+    return_code = main(sys.argv[1:])
+    sys.exit(return_code)

Propchange: zorg/trunk/codesign/debugsign/debugsign
------------------------------------------------------------------------------
    svn:executable = *

Added: zorg/trunk/codesign/debugsign/requirements.txt
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/requirements.txt?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/requirements.txt (added)
+++ zorg/trunk/codesign/debugsign/requirements.txt Mon Dec 11 18:36:17 2017
@@ -0,0 +1,4 @@
+coverage
+flake8
+mock
+unittest2

Added: zorg/trunk/codesign/debugsign/unittests/__init__.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/__init__.py?rev=320453&view=auto
==============================================================================
    (empty)

Added: zorg/trunk/codesign/debugsign/unittests/test_ansi.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_ansi.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_ansi.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_ansi.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,68 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_ansi
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import mock
+import re
+import StringIO
+import sys
+
+import dbsign.ansi as A
+
+
+class TestAnsi(unittest.TestCase):
+    def setUp(self):
+        self.test_text = "The quick brown fox, etc."
+        self.codes = A._ANSI_CODES
+        re_ansi = r"\033\[.*?m"
+        ansi_msg_pattern = r'(?P<begin>{0})(?P<middle>.*?)(?P<end>{0})'
+        self.re_ansi_msg = re.compile(ansi_msg_pattern.format(re_ansi))
+
+    @unittest.skipUnless(sys.stdout.isatty(), A.WARN("requires tty"))
+    def test_ansi_tty(self):
+        msg = self.test_text
+
+        for color in self.codes:
+            with self.subTest(color):
+                ansi_msg = A.ANSI(color, msg)
+                self.assertTrue(ansi_msg.startswith(self.codes[color]))
+                self.assertTrue(ansi_msg.endswith(self.codes['clear']))
+
+    @unittest.skipUnless(sys.stdout.isatty(), A.WARN("requires tty"))
+    def test_ansi_convenience_tty(self):
+        msg = self.test_text
+        funcs = [A.OK, A.INFO, A.WARN, A.ERROR]
+
+        for func in funcs:
+            with self.subTest(func=func.func_name):
+                ansi_msg = func(msg)
+                m = self.re_ansi_msg.match(ansi_msg)
+                self.assertTrue(m)
+                self.assertIn(m.group('begin'), self.codes.values())
+                self.assertEqual(m.group('middle'), msg)
+                self.assertEqual(m.group('end'), self.codes['clear'])
+
+    @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+    def test_ansi_notty(self, mock_stdout):
+        msg = self.test_text
+
+        for color in self.codes:
+            with self.subTest(color):
+                ansi_msg = A.ANSI(color, msg)
+                self.assertEqual(msg, ansi_msg)
+
+    @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+    def test_ansi_convenience_notty(self, mock_stdout):
+        msg = self.test_text
+        funcs = [A.OK, A.INFO, A.WARN, A.ERROR]
+
+        for func in funcs:
+            with self.subTest(func=func.func_name):
+                self.assertEqual(msg, func(msg))

Added: zorg/trunk/codesign/debugsign/unittests/test_authdb.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_authdb.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_authdb.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_authdb.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,197 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_authdb
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import logging
+import mock
+import os
+import sys
+
+import dbsign.logger as L
+from dbsign.result import Failure, Success
+import dbsign.shell as sh
+import dbsign.security as S
+
+log = L.get_logger(__name__)
+
+
+ at unittest.skipUnless(sys.platform == 'darwin',
+                     "The authorizationdb is used only by macOS systems.")
+class TestAuthDB(unittest.TestCase):
+    def setUp(self):
+        # Use system.privilege.taskport.debug since it can
+        # be restored using the DevToolsSecurity tool
+        self.invalid_priv = "foo.bar.baz"
+        self.test_priv = "system.privilege.taskport.debug"
+
+        # stash for later
+        self.orig_unsafe_flag = os.getenv(S.UNSAFE_FLAG, None)
+
+    def tearDown(self):
+        if self.orig_unsafe_flag:
+            os.environ[S.UNSAFE_FLAG] = self.orig_unsafe_flag
+
+    def test_privilege_read(self):
+        # assert read of valid privilege succeeds and matches
+        priv = self.test_priv
+
+        res_read = S.authdb_privilege_read(priv)
+        self.assertTrue(res_read)
+        rules = S.rules_from(res_read.value)
+
+        cmd_read = sh.run(['security', 'authorizationdb', 'read', priv])
+        self.assertTrue(cmd_read)
+
+        for rule in rules:
+            self.assertIn(rule, cmd_read.stdout)
+
+    @unittest.skipUnless(os.getenv(S.UNSAFE_FLAG), "Requires --unsafe")
+    def test_privilege_read_negative(self):
+        """
+        - assert read of invalid privilege fails
+
+        Consecutive unsuccessful read operations can
+        result in delayed or denied authdb calls.
+        Require --unsafe to prevent inadvertent lockouts.
+        """
+        priv = self.invalid_priv
+        log_level = L._root.level
+
+        L.set_level(logging.CRITICAL)
+        res_read = S.authdb_privilege_read(priv)
+        L.set_level(log_level)
+        self.assertFalse(res_read)
+
+    def test_privilege_write_safe(self):
+        # make sure we have a value backed up, then delete it
+        if S.UNSAFE_FLAG in os.environ:
+            del os.environ[S.UNSAFE_FLAG]
+
+        res_write = S.authdb_privilege_write(self.test_priv, '42')
+        self.assertFalse(res_write)
+        self.assertIn('--unsafe', res_write.value)
+
+    @unittest.skipUnless(os.getenv(S.UNSAFE_FLAG), "Requires --unsafe")
+    def test_privilege_write(self):
+        """
+        - read current privilege value
+        - write and verify new value
+        - restore and verify original value
+        - in case of failure, re-run DevToolsSecurity to ensure safe value
+        """
+        priv = self.test_priv
+        test_rule = "is-admin"
+
+        # read current value
+        res_read = S.authdb_privilege_read(priv)
+        self.assertTrue(res_read)
+        # stash for later
+        orig_value = res_read.value
+
+        # write new value
+        res_write = S.authdb_privilege_write(priv, "is-admin")
+        self.assertTrue(res_write)
+
+        # read new value
+        res_verify = S.authdb_privilege_read(priv)
+        self.assertTrue(res_verify)
+        new_rules = S.rules_from(res_verify.value)
+        self.assertEqual([test_rule], new_rules)
+
+        # restore original value
+        cmdline = ['security', 'authorizationdb', 'write', priv]
+        res_restore = sh.sudo_run(cmdline, stdin=orig_value)
+        self.assertTrue(res_restore)
+
+        # verify restored value
+        res_verify2 = S.authdb_privilege_read(priv)
+        self.assertTrue(res_verify2)
+        orig_rules = S.rules_from(orig_value)
+        restored_rules = S.rules_from(res_verify2.value)
+        restored_check = orig_rules == restored_rules
+        self.assertEqual(orig_rules, restored_rules,
+                         "There was an error restoring "+priv+". Falling back"
+                         " to system defaults. You may need to re-run"
+                         " DevToolsSecurity or re-init debugsign.")
+
+        if not restored_check:
+            cmd_dts = sh.sudo_run(['/usr/sbin/DevToolsSecurity', '-disable'])
+            self.assertTrue(cmd_dts, "Failed to re-run DevToolsSecurity")
+
+    @unittest.skipUnless(os.getenv(S.UNSAFE_FLAG), "Requires --unsafe")
+    def test_privilege_write_negative(self):
+        """
+        - assert write of invalid privilege fails
+
+        Consecutive unsuccessful write operations can
+        result in delayed or denied authdb calls.
+        Require --unsafe to prevent inadvertent lockouts.
+        """
+        priv = self.invalid_priv
+        log_level = L._root.level
+
+        L.set_level(logging.CRITICAL)
+        res_write = S.authdb_privilege_write(priv, priv)
+        L.set_level(log_level)
+        self.assertFalse(res_write)
+
+    def test_rules_from(self):
+        for test_name in test_xml:
+            with self.subTest(name=test_name):
+                expected_rules, xml = test_xml[test_name]
+                rules = S.rules_from(xml)
+                self.assertEqual(expected_rules, rules)
+
+    @mock.patch('plistlib.readPlistFromString')
+    @mock.patch('dbsign.security.authdb_privilege_read')
+    def test_verify_privilege(self, mock_auth_priv_read, mock_plist_read):
+        mock_auth_priv_read.return_value = Failure('error1')
+        res1 = S.verify_privilege('test1')
+        self.assertFalse(res1)
+        self.assertEqual('error1', res1.value)
+
+        mock_auth_priv_read.return_value = Success('succ1')
+        mock_plist_read.return_value = {'rule': ['allow']}
+        res2 = S.verify_privilege('test2')
+        self.assertTrue(res2)
+        self.assertEqual('test2', res2.value)
+
+    @mock.patch('dbsign.security.verify_privilege')
+    def test_verify_privileges(self, mock_verify_priv):
+        mock_verify_priv.side_effect = [
+                Failure('1'),
+                Success('2'), Failure('3'),
+                Success('4'), Success('5')]
+
+        res1 = S.verify_privileges('one')
+        self.assertFalse(res1)
+        self.assertEqual('1', res1.value)
+
+        res2 = S.verify_privileges(['two', 'three'])
+        self.assertFalse(res2)
+        self.assertEqual('3', res2.value)
+
+        privs = ['four', 'five']
+        res3 = S.verify_privileges(list(privs))
+        self.assertTrue(res3)
+        self.assertEqual(privs, res3.value)
+
+
+test_xml = {
+    # Default value of system.privilege.taskport circa 10.13
+    'sample1': ([],
+        """<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plist version="1.0"><dict><key>allow-root</key><false/><key>authenticate-user</key><true/><key>class</key><string>user</string><key>comment</key><string>Used by task_for_pid(...).  Task_for_pid is called by programs requesting full control over another program for things like debugging or performance analysis. This authorization only applies if the requesting and target programs are run by the same user; it will never authorize access to the program of another user.  WARNING: administrators are advised not to modify this right.</string><key>created</key><real>529441603.70259798</real><key>group</key><string>_developer</string><key>modified</key><real>529441603.70259798</real><key>session-owner</key><false/><key>shared</key><true/><key>timeout</key><integer>36000</integer><key>tries</key><integer>10000</integer><key>version</key><integer>0</integer></dict></plist>"""),  # noqa: E501
+    # Default value of system.privilege.taskport.debug
+    'sample2': (['is-admin', 'is-developer', 'authenticate-developer'],
+                """<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plist version="1.0"><dict><key>class</key><string>rule</string><key>created</key><real>505914196.93768197</real><key>k-of-n</key><integer>1</integer><key>modified</key><real>531881084.83930701</real><key>rule</key><array><string>is-admin</string><string>is-developer</string><string>authenticate-developer</string></array><key>version</key><integer>0</integer></dict></plist>"""),  # noqa: E501
+    # Example of plist after running debugsign
+    'sample3': (['allow'],
+                """<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"><plist version="1.0"><dict><key>class</key><string>rule</string><key>created</key><real>505914196.93768197</real><key>modified</key><real>531785828.17900801</real><key>rule</key><array><string>allow</string></array><key>version</key><integer>0</integer></dict></plist>"""),  # noqa: E501
+}

Added: zorg/trunk/codesign/debugsign/unittests/test_commands.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_commands.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_commands.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_commands.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,103 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_commands
+"""
+
+from __future__ import print_function
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import mock
+import os
+import StringIO
+
+import dbsign.commands as C
+import dbsign.logger as L
+from dbsign.result import Failure, Success
+import dbsign.shell as sh
+
+
+log = L.get_logger(__name__)
+
+
+def dummy_sudo_run(params, *args):
+    if sh.UNSAFE_FLAG in os.environ:
+        return dummy_run(params, *args)
+    else:
+        raise RuntimeError()
+
+
+def dummy_run(params, *args):
+    return sh.ShellCommand(params, 0, 'dummy_stdout', 'dummy_stderr')
+
+
+class TestCommands(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    @mock.patch('dbsign.shell.sudo_run')
+    @mock.patch('__builtin__.print')
+    def test_auth_sudo(self, _print, _sudo_run):
+        _sudo_run.side_effect = [
+            Success('good1'),                   # run 1
+            Failure('bad2'), Success('good2'),  # run 2
+            Failure('bad3'), Failure('bad3'),   # run 3
+        ]
+        self.assertTrue(C._auth_sudo())
+        self.assertTrue(C._auth_sudo())
+        self.assertFalse(C._auth_sudo())
+
+    @mock.patch('dbsign.shell.run')
+    @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+    def test_run_linter(self, mock_stdout, new_run):
+        new_run.side_effect = dummy_run
+        C._run_linter()
+        new_run.assert_called()
+        self.assertIn('flake8', new_run.call_args[0][0])
+
+    @mock.patch('__builtin__.print')
+    def test_cmd_help(self, _print):
+        parser = mock.MagicMock()
+        code = C.cmd_help(parser)
+        self.assertTrue(_print.called)
+        self.assertTrue(parser.format_help.called)
+        self.assertEqual(0, code)
+
+    @mock.patch('dbsign.security.unlock_keychain')
+    @mock.patch('__builtin__.print')
+    def test_cmd_prep(self, _print, _unlock):
+        _unlock.side_effect = (Success('good'), Failure('bad'))
+        self.assertEqual(0, C.cmd_prep())
+        self.assertEqual(1, C.cmd_prep())
+
+    @mock.patch('dbsign.commands._run_unittests')
+    @mock.patch('dbsign.commands._auth_sudo')
+    @mock.patch('__builtin__.print')
+    def test_cmd_test(self, _print, _auth_sudo, _run_unit):
+        _run_unit.side_effect = [(), (1, 2, 3)]
+        self.assertEqual(0, C.cmd_test())
+        self.assertEqual(1, _print.call_count)
+
+        _print.reset_mock()
+        self.assertEqual(3, C.cmd_test())
+        self.assertEqual(2, _print.call_count)
+
+    @mock.patch('unittest.TextTestRunner().run()')
+    @mock.patch('unittest.TestLoader().discover()')
+    @unittest.skip('TODO')
+    def test_run_unittests(self, _test_discover, _test_run):
+        _test_discover.return_value = ('test1', 'test2', 'test3')
+        _test_run.return_value = mock.MagicMock()
+
+        problems = C._run_unittests()
+        self.assertEqual(0, len(problems))
+
+        problems = C._run_unittests()
+        self.assertEqual(0, len(problems))

Added: zorg/trunk/codesign/debugsign/unittests/test_logger.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_logger.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_logger.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_logger.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,46 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_logger
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import logging
+
+import dbsign.logger as L
+
+
+class TestLogger(unittest.TestCase):
+    def test_logger(self):  # type: () -> ()
+        # This test assumes the initial, intended semantics of the various
+        # loggers returned. If semantics change, so must the tests.
+        init_level = L._root.level
+        test_level = 7
+        log_name = "TestLoggerLogger"
+        log = L.get_logger(log_name)
+
+        with self.subTest(method=L.get_logger):
+            self.assertEqual(log.__class__, logging.Logger)
+            self.assertEqual(log_name, log.name)
+            # Returned loggers should default to 0 (inherit parent level)
+            self.assertEqual(0, log.level)
+
+        with self.subTest(method=L.set_level):
+            # Assert set_level() sets root level, not sublogger level
+            L.set_level(test_level)
+            self.assertEqual(test_level, L._root.level)
+            self.assertEqual(0, log.level)
+
+            # Restore and verify
+            L.set_level(init_level)
+            self.assertEqual(init_level, L._root.level)
+            self.assertEqual(0, log.level)
+
+    def test_normalize(self):  # type: () -> ()
+        self.assertEqual(logging.WARN, L.normalize(logging.WARN))
+        self.assertEqual(logging.DEBUG, L.normalize(logging.DEBUG - 1))
+        self.assertEqual(logging.CRITICAL, L.normalize(logging.CRITICAL + 1))

Added: zorg/trunk/codesign/debugsign/unittests/test_result.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_result.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_result.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_result.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,80 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_result
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import mock
+import StringIO
+
+import dbsign.result as R
+
+
+class TestResult(unittest.TestCase):
+    def setUp(self):
+        self.msg = "The quick brown fox."
+
+    def test_result_failure(self):
+        value = self.msg
+        failure = R.Failure(value)
+
+        self.assertFalse(failure.checked)
+        self.assertEqual(value, failure.value)
+        self.assertFalse(failure)
+        self.assertTrue(failure.checked)
+        self.assertRegexpMatches(repr(failure), value)
+
+    def test_result_success(self):
+        value = self.msg
+        success = R.Success(value)
+
+        self.assertFalse(success.checked)
+        self.assertEqual(success.value, value)
+        self.assertTrue(success)
+        self.assertTrue(success.checked)
+        self.assertRegexpMatches(repr(success), value)
+
+    def test_abstract(self):
+        text = self.msg
+        error_text = "does not support boolean evaluation"
+        res = R.Result(text)
+
+        self.assertEqual(text, res.value)
+        with self.assertRaisesRegexp(NotImplementedError, error_text):
+            bool(res)
+
+    def test_renew(self):
+        msg = self.msg
+
+        def fn(res_type):
+            # create instance
+            res = res_type(msg)
+            self.assertFalse(res.checked)
+            # invoke __nonzero__()
+            bool(res)
+            self.assertTrue(res.checked)
+            # return "renewed" object
+            return res.renew()
+
+        for res_type in (R.Success, R.Failure):
+            with self.subTest(resultType=res_type.__name__):
+                res = fn(res_type)
+                self.assertEqual(res_type, res.__class__)
+                self.assertFalse(res.checked)
+                bool(res)
+                self.assertTrue(res.checked)
+
+    @mock.patch('sys.stderr', new_callable=StringIO.StringIO)
+    def test_unchecked_asserts(self, mock_stderr):
+        for res_type in (R.Result, R.Success, R.Failure):
+            with self.subTest(resultType=res_type.__name__):
+                with self.assertRaises(AssertionError):
+                    # generate an instance of the passed class
+                    res = res_type(res_type.__name__)
+                    # call destructor to simulate program termination
+                    res.__del__()

Added: zorg/trunk/codesign/debugsign/unittests/test_security.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_security.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_security.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_security.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,171 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_security
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import logging
+import os
+# import sys
+
+import dbsign.logger as L
+import dbsign.security as S
+import dbsign.shell as sh
+
+log = L.get_logger(__name__)
+
+
+ at unittest.skip("need to mock identity/p12 file")
+class TestIdentity(unittest.TestCase):
+    def setUp(self):
+        self.keyfile = S.keychain_to_file("debugsign_test")
+        self.password = "12345"
+        self.identity = 'debug_codesign'
+
+        # store and override loglevel
+        self.init_loglevel = L._root.level
+        L.set_level(logging.CRITICAL)
+
+        # create test keychain
+        self.assertTrue(S.create_keychain(self.keyfile, self.password))
+
+    def tearDown(self):
+        # restore loglevel
+        L.set_level(self.init_loglevel)
+
+        # force remove test keychain
+        if os.path.exists(self.keyfile):
+            os.remove(self.keyfile)
+
+    @unittest.skip("need to mock p12 file")
+    def test_import_exists(self):  # type: () -> ()
+        keydb = self.keyfile
+        passwd = self.password
+        ident = self.identity
+        id_file = None
+
+        self.assertFalse(S.find_identity(keydb, ident))
+        self.assertTrue(S.import_identity(keydb, ident, id_file, passwd))
+        self.assertTrue(S.find_identity(keydb, ident))
+
+    @unittest.skip("need to mock p12 file")
+    @unittest.skipUnless(os.getenv(S.UNSAFE_FLAG), "Requires --unsafe")
+    def test_trust(self):  # type: () -> ()
+        keydb = self.keyfile
+        passwd = self.password
+        ident = self.identity
+        id_file = None
+
+        self.assertTrue(S.import_identity(keydb, ident, id_file, passwd))
+        self.assertTrue(S.find_identity(keydb, ident))
+        self.assertTrue(S.trust_identity(keydb, ident))
+
+    @unittest.skip("need to mock a trusted identity")
+    def test_verify(self):  # type: () -> ()
+        self.fail()
+
+    @unittest.skip("need to mock a trusted identity")
+    def test_verify_filename(self):  # type: () -> ()
+        self.fail()
+
+    @unittest.skip("need to mock p12 file")
+    def test_delete(self):  # type: () -> ()
+        self.fail()
+
+
+class TestKeychain(unittest.TestCase):
+    def setUp(self):
+        self.keyfile = S.keychain_to_file("debugsign_test")
+        self.password = "12345"
+        self.init_loglevel = L._root.level
+        L.set_level(logging.CRITICAL)
+
+    def tearDown(self):
+        L.set_level(self.init_loglevel)
+        if os.path.exists(self.keyfile):
+            os.remove(self.keyfile)
+
+    def test_keychain_to_file(self):
+        ext = S._KEYCHAIN_EXT
+        login_path = os.path.expanduser(
+            os.path.join("~/Library/Keychains", "login." + ext))
+        self.assertTrue(login_path, S.keychain_to_file('login'))
+
+    def test_derive_keychain_extension(self):
+        """
+        Because the tested method is itself guesswork,
+        the test is rather simplistic:
+        - generate expected location of user's login keychain
+        - verify that "$HOME/Library/Keychains/login.${keychain_extension}"
+          exists and is valid
+        """
+        login_keychain = os.path.expanduser(
+            "~/Library/Keychains/login.{}".format(
+                S.derive_keychain_extension()))
+        self.assertTrue(os.path.exists(login_keychain))
+        self.assertTrue(os.access(login_keychain, os.R_OK))
+
+    def test_keychain_exists(self):
+        """
+        - assert existing keychain => True
+        - assert non-existent keychain => False
+        """
+        valid_keychain = S.keychain_to_file("login")
+        with self.subTest(keychain=valid_keychain):
+            self.assertTrue(S.keychain_exists(valid_keychain))
+
+        invalid_keychain = S.keychain_to_file("invalid")
+        with self.subTest(keychain=invalid_keychain):
+            self.assertFalse(S.keychain_exists(invalid_keychain))
+
+    def test_keychain_operations(self):
+        """
+        - assert keychain does not exist
+        - create keychain and assert exists
+        - lock keychain
+        - unlock keychain and assert exists
+        """
+        keyfile = self.keyfile
+        password = self.password
+
+        # some assorted negatives
+        self.assertFalse(S.create_keychain('/tmp', password))
+
+        # assert keychain does not exist
+        self.assertFalse(S.keychain_exists(keyfile))
+
+        # create and assert success
+        res_create = S.create_keychain(keyfile, password)
+        self.assertTrue(res_create)
+        self.assertTrue(S.keychain_exists(keyfile))
+
+        # assert second creation succeeds
+        self.assertTrue(S.create_keychain(keyfile, password))
+
+        # keychain is unlocked at creation; lock it to test unlocking
+        cmd_lock = sh.run(['security', 'lock-keychain', keyfile])
+        self.assertTrue(cmd_lock)
+        self.assertFalse(S.unlock_keychain(keyfile, ''))
+        self.assertTrue(S.unlock_keychain(keyfile, password))
+
+        # ensure keychain settings were set correctly
+        cmd_info = sh.run(['security', 'show-keychain-info', keyfile])
+        self.assertTrue(cmd_info)
+        self.assertRegexpMatches(cmd_info.stderr, r"\bno-timeout\b", cmd_info)
+
+        # delete with backup
+        res_delete = S.delete_keychain(keyfile, backup=True)
+        self.assertTrue(res_delete)
+        backup_file = res_delete.value
+        # assert backup was made
+        self.assertTrue(os.path.exists(backup_file))
+        # assert keychain is gone
+        self.assertFalse(S.keychain_exists(keyfile))
+        self.assertFalse(os.path.exists(keyfile))
+        # cleanup backup file
+        os.unlink(backup_file)

Added: zorg/trunk/codesign/debugsign/unittests/test_shell.py
URL: http://llvm.org/viewvc/llvm-project/zorg/trunk/codesign/debugsign/unittests/test_shell.py?rev=320453&view=auto
==============================================================================
--- zorg/trunk/codesign/debugsign/unittests/test_shell.py (added)
+++ zorg/trunk/codesign/debugsign/unittests/test_shell.py Mon Dec 11 18:36:17 2017
@@ -0,0 +1,168 @@
+#!/usr/bin/env python -tt
+
+"""
+unittests.test_shell
+"""
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+import os
+import sys
+
+import dbsign.shell as sh
+import dbsign.logger as logger
+
+log = logger.get_logger(__name__)
+
+
+class TestShellCommand(unittest.TestCase):
+    def setUp(self):
+        """Create some dummy ShellCommand objects"""
+        self.params = [
+            (["good", "command"], 0, 'OUT', 'ERR'),
+            (["bad", "command"], 1, 'OUT', 'ERR'),
+        ]
+
+    def test_attrs(self):
+        """
+        - assert data equivalence of each attribute for all commands
+        """
+        self.assertTrue(self.params, "Empty test set?")
+        for params in self.params:
+            with self.subTest(params=params):
+                cmd = sh.ShellCommand(*params)
+                with self.assertRaises(AttributeError):
+                    cmd.dummy_attribute
+                self.assertEqual(params[0], cmd.args)
+                self.assertEqual(params[1], cmd.code)
+                self.assertEqual(params[2], cmd.stdout)
+                self.assertEqual(params[3], cmd.stderr)
+
+    def test_repr(self):
+        self.assertTrue(self.params, "Empty test set?")
+        for params in self.params:
+            with self.subTest(params=params):
+                cmd = sh.ShellCommand(*params)
+                stringified = repr(cmd)
+                args = params[0]
+                self.assertGreaterEqual(len(args), 1)
+                for arg in args:
+                    self.assertIn(arg, stringified, )
+
+    def test_equal(self):
+        """
+        - assert objects with different data are not equal
+        - create duplicate ShellCommand objects
+        - assert object data is equal
+        - assert objects are equal
+        """
+        self.assertTrue(self.params, "Empty test set?")
+        cmd_params = self.params
+        cmd_a = sh.ShellCommand(*cmd_params[0])
+        cmd_b = sh.ShellCommand(*cmd_params[-1])
+        self.assertNotEqual(cmd_a, cmd_b)
+        for params in cmd_params:
+            with self.subTest(params=params):
+                cmd_a = sh.ShellCommand(*params)
+                cmd_b = sh.ShellCommand(*params)
+                self.assertEqual(cmd_a, cmd_b, params)
+                self.assertDictEqual(cmd_a.data, cmd_b.data, params)
+
+    def test_nonzero(self):
+        """
+        - assert ShellCommand objects are "True" if code == 0
+        - assert ShellCommand objects are "False" if code != 0
+        """
+        for params in self.params:
+            with self.subTest(params=params):
+                (args, code, stdout, stderr) = params
+                cmd = sh.ShellCommand(*params)
+                bool_value = bool(cmd)
+                self.assertEqual(code == 0, bool_value, params)
+
+
+class TestRun(unittest.TestCase):
+    def test_run_simple(self):
+        """
+        - ensure true returns 0
+        - ensure false returns non-zero
+        """
+        cmd_true = sh.run(['/usr/bin/true'])
+        log.debug(cmd_true)
+        self.assertTrue(cmd_true)
+        self.assertEqual(0, cmd_true.code)
+
+        cmd_false = sh.run(['/usr/bin/false'])
+        log.debug(cmd_false)
+        self.assertFalse(cmd_false)
+        self.assertNotEqual(0, cmd_false.code)
+
+    def test_sudo_run_simple(self):
+        """
+        - ensure true returns 0
+        - ensure false returns non-zero
+        """
+        cmd_true = sh.sudo_run(['/usr/bin/true'])
+        log.debug(cmd_true)
+        self.assertTrue(cmd_true)
+        self.assertEqual(0, cmd_true.code)
+
+        cmd_false = sh.sudo_run(['/usr/bin/false'])
+        log.debug(cmd_false)
+        self.assertFalse(cmd_false)
+        self.assertNotEqual(0, cmd_false.code)
+
+    def test_run_invalid_executable(self):
+        """
+        - run() should raise OSError if execution is impossible
+        """
+        with self.assertRaises(OSError):
+            sh.run(['/'])
+
+    def test_run_compound(self):
+        """
+        - describe expected output (stdout, stderr, return code)
+        - construct python program to emit expected output
+        - run python code (using the current python interpreter)
+        - ensure program executed successfully
+        - ensure expected output was received
+        """
+        sub_stdout = r"Some out text."
+        sub_stderr = r"Some err text."
+        sub_code = 7
+
+        sub_string = ('import sys;'
+                      ' sys.stdout.write("{0}");'
+                      ' sys.stderr.write("{1}");'
+                      ' sys.exit({2})').format(
+            sub_stdout, sub_stderr, sub_code)
+
+        # run above python code in current executable
+        sub_cmd = sh.run([sys.executable, '-c', sub_string])
+        log.debug(sub_cmd)
+
+        self.assertEqual(sub_code, sub_cmd.code)
+        self.assertEqual(sub_stderr, sub_cmd.stderr)
+        self.assertEqual(sub_stdout, sub_cmd.stdout)
+        if sub_code == 0:
+            self.assertTrue(sub_cmd)
+        else:
+            self.assertFalse(sub_cmd)
+
+    def test_run_with_sudo(self):
+        """
+        - run should raise if asked to perform su/sudo operations
+        - attempt to execute several illegal commands
+        - assert that each raises a RuntimeError
+        """
+        illegal_cmds = [
+            ['sudo', 'ls'],
+            ['su', os.getenv('USER'), '-c', 'ls'],
+        ]
+        for cmd in illegal_cmds:
+            with self.subTest(cmd=cmd):
+                with self.assertRaisesRegexp(RuntimeError, 'Unauthorized'):
+                    sh.run(cmd)




More information about the llvm-commits mailing list