"""Windows Utilities"""
import logging
import os
import platform
import socket
from subprocess import PIPE, Popen
import regex as re
import pathlib
logger = logging.getLogger(__name__)
__all__ = [
'run_command',
'psexec_session',
'file_share_session',
'mount_admin_share',
'mount_file_share',
'parse_wmic_output',
'exit_cmd',
]
_HAS_WIN32COM = False
if 'Win' in platform.system():
try:
from win32com.client import GetObject
_HAS_WIN32COM = True
except ImportError:
pass
[docs]
def run_command(cmd, workingdir=None, raise_on_error=True, hidearg=None):
"""Execute a shell command and return output.
:param cmd: Command as string or list of arguments.
:param str workingdir: Directory to execute in (optional).
:param bool raise_on_error: Raise exception on non-zero return code.
:param str hidearg: Argument value to mask in logs (for passwords).
:returns: Combined stdout and stderr output.
:rtype: bytes
:raises Exception: If command fails and raise_on_error is True.
.. seealso::
See ``tests/test_win.py`` for usage examples.
"""
def hide(cmd):
for bit in cmd:
if bit == hidearg:
yield '******'
else:
yield bit
if not isinstance(cmd, (list, tuple)):
cmd = cmd.split(' ')
logger.info(f"Running: {' '.join(hide(cmd))}")
if workingdir:
curdir = pathlib.Path.cwd()
os.chdir(workingdir)
try:
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
if out:
logger.info(out)
if p.returncode != 0 and raise_on_error:
msg = f"Error executing: {' '.join(hide(cmd))}"
if workingdir:
msg += f' in {workingdir}'
logger.error(msg)
raise Exception(err)
if err:
logger.info(err)
return out + err
finally:
if workingdir:
os.chdir(curdir)
[docs]
class psexec_session:
"""Context manager for running psexec commands.
Mounts admin share before commands and unmounts on exit.
:param str host: Remote host name or IP.
:param str password: Password for authentication.
Example::
with shell.psexec_session(host, password):
for cmd in commands:
out = shell.run_command(cmd)
"""
def __init__(self, host, password):
self.host = host
self.password = password
def __enter__(self):
mount_admin_share(self.host, self.password)
def __exit__(self, type, value, traceback):
mount_admin_share(self.host, self.password, unmount=True)
[docs]
class file_share_session:
"""Context manager for temporarily mounting a file share.
Mounts share before commands and unmounts on exit.
:param str host: Remote host name or IP.
:param str password: Password for authentication.
:param str drive: Local drive letter to mount to.
:param str share: Remote share name.
Example::
with shell.file_share_session(host, password, 'Z:', 'data'):
for cmd in commands:
out = shell.run_command(cmd)
"""
def __init__(self, host, password, drive, share):
self.host = host
self.password = password
self.drive = drive
self.share = share
def __enter__(self):
mount_file_share(self.host, self.password, self.drive, self.share)
def __exit__(self, type, value, traceback):
mount_file_share(self.host, self.password, self.drive, self.share, unmount=True)
[docs]
def mount_admin_share(host, password, unmount=False):
"""Mount or unmount the admin$ share required for psexec commands.
Resolves host to IP address to avoid Windows multiple-connection errors.
:param str host: Remote host name.
:param str password: Password for authentication.
:param bool unmount: If True, unmount instead of mount.
.. note::
Connects by IP address to work around Windows complaining about
multiple connections to a share by the same user.
"""
user = os.environ['USERNAME'].lower()
hostip = socket.gethostbyname(host)
if not unmount:
run_command(['net', 'use', r'\\' + hostip + r'\admin$', rf'/user:TENOR\{user}', password], hidearg=password)
else:
run_command(['net', 'use', r'\\' + hostip + r'\admin$', '/del'])
[docs]
def mount_file_share(host, password, drive, share, unmount=False):
"""Mount or unmount a Windows file share.
:param str host: Remote host name.
:param str password: Password for authentication.
:param str drive: Local drive letter to mount to.
:param str share: Remote share name.
:param bool unmount: If True, unmount instead of mount.
"""
user = os.environ['USERNAME'].lower()
hostip = socket.gethostbyname(host)
if not unmount:
run_command(
['net', 'use', drive, r'\\' + hostip + '\\' + share, rf'/user:TENOR\{user}', password], hidearg=password
)
else:
run_command(['net', 'use', drive, '/del'])
[docs]
def parse_wmic_output(output):
"""Parse output from WMIC query into list of dicts.
:param str output: Raw WMIC output string.
:returns: List of dictionaries with column headers as keys.
:rtype: list[dict]
Example::
>> wmic_output = os.popen('wmic product where name="Python 2.7.11" get Caption, Description, Vendor').read()
>> result = parse_wmic_output(wmic_output)
>> result[0]['Caption']
>> result[0]['Vendor']
"""
result = []
lines = [s for s in output.splitlines() if s.strip()]
if len(lines) == 0:
return result
header_line = lines[0]
headers = re.findall(r'\S+\s+|\S$', header_line)
pos = [0]
for header in headers:
pos.append(pos[-1] + len(header))
for i in range(len(headers)):
headers[i] = headers[i].strip()
for r in range(1, len(lines)):
row = {}
for i in range(len(pos) - 1):
row[headers[i]] = lines[r][pos[i] : pos[i + 1]].strip()
result.append(row)
return result
[docs]
def exit_cmd():
"""Kill all running cmd.exe processes via WMI.
Requires pywin32 to be installed on Windows.
"""
if not _HAS_WIN32COM:
raise ImportError('exit_cmd requires pywin32: pip install pywin32')
WMI = GetObject('winmgmts:')
processes = WMI.InstancesOf('Win32_Process')
for p in WMI.ExecQuery('select * from Win32_Process where Name="cmd.exe"'):
logger.debug(f'Killing PID: {p.Properties_("ProcessId").Value}')
os.system('taskkill /pid ' + str(p.Properties_('ProcessId').Value))
if __name__ == '__main__':
__import__('doctest').testmod(optionflags=4 | 8 | 32)