# Copyright (c) 2016, Yahoo Inc.
# Copyrights licensed under the BSD License
# See the accompanying LICENSE.txt file for terms.
"""
General utility functionality module
"""
from __future__ import print_function
import logging
import os
import textwrap
import sys
from jinja2 import Template
from distutils.spawn import find_executable
from .exceptions import CommandNotFound
logger = logging.getLogger(__name__) # pylint: disable=C0103
[docs]def get_terminal_size():
"""
Get the terminal rows and columns if we are running on an
interactive terminal.
Returns
-------
rows : int
The number of rows on the current terminal.
columns : int
The number of columns on the current terminal.
"""
try:
rows, columns = os.popen('stty size', 'r').read().split()
except ValueError: # pragma: no cover
rows = 24
columns = 80
return int(rows), int(columns)
[docs]def which(command):
"""
Function searches the path for the command executable
Parameters
----------
command : str
Returns
-------
str
Path to the command
Raises
------
CommandNotFound:
Command was not found
"""
bin_dir = os.path.dirname(sys.executable)
bin_exe = os.path.join(bin_dir, command)
if os.path.exists(bin_exe) and os.access(bin_exe, os.X_OK): # pragma: no cover
return bin_exe
result = find_executable(command)
if not result: # pragma: no cover
raise CommandNotFound('Command %r was not found in the path' % command)
return result
[docs]def csv_list(value):
"""
Convert a comma separated string into a list
Parameters
----------
value : str
The string object to convert to a list
Returns
-------
list
A list based on splitting the string on the ',' character
"""
if value:
result = []
for item in value.split(','):
item = item.strip()
if item:
result.append(item)
return result
return []
[docs]def str_to_bool(value):
"""
Convert a string too a bool object
Parameters
----------
value : str
The string object to convert to a bool. The following case insensitive
strings evaluate to True ['true', 1', 'up', 'on']
Returns
-------
bool
Boolean based on the string
"""
value = value.lower()
return value in ['true', '1', 'up', 'on']
[docs]def str_to_list(value):
"""
Convert a newline terminated string into a list. Any empty lines
will be removed from the result list.
Parameters
----------
value : str
The string object to convert to a list
Returns
-------
list
List based on the string
"""
result_list = []
for item in value.strip().split('\n'):
item = item.strip()
if item:
result_list.append(item)
return result_list
[docs]def str_to_dict(value):
"""
Convert a newline terminated string of key=value into a dictionary.
Parameters
----------
value : str
The string object to convert to a dictionary
Returns
-------
dict
Dictionary based on the string
"""
result_dict = {}
for item in str_to_list(value):
split_setting = item.split('=')
result_dict[split_setting[0]] = '='.join(split_setting[1:])
return result_dict
[docs]def change_uid_gid(user_uid=None, user_gid=None): # pragma: no cover
"""
preexec_fn to change the uid/gid when using subprocess
Parameters
----------
user_uid : int, optional
The user UID to set to, does not set user uid if this is not passed
user_gid : int, optional
The user GID to set to, does not set the gid if this is not passed
"""
if user_uid is not None:
os.setuid(user_uid)
if user_gid is not None:
os.setgid(user_gid)
[docs]def chown_recursive(path, uid, gid): # pragma: no cover
"""
Change the ownership of all files and directories in path
Parameters
----------
path : str
The root path to start changing the ownership at
uid, : int
The uid to change the ownership to
gid, : int
The gid to change the ownership to
"""
for root, dirs, files in os.walk(path):
for item in dirs:
os.chown(os.path.join(root, item), uid, gid)
for item in files:
os.chown(os.path.join(root, item), uid, gid)
[docs]def update_recursive_generator(basedict, updatedict):
"""
Recursively run update on a dictionary
Parameters
----------
basedict : dict
First dictionary, which will contain the updated result
updatedict : dict
Second dictionary that will be be updated to basedict, conflicts will
be replaced with the values from this dictionary.
Returns
-------
dict
basedict updated to contain the values from updatedict
"""
for key in set(basedict.keys()) | set(updatedict.keys()):
if key in basedict and key in updatedict:
# Both dicts have the same key
if isinstance(basedict[key], dict) and \
isinstance(updatedict[key], dict):
# Both values are dicts so merge them.
yield (
key,
dict(
update_recursive_generator(
basedict[key],
updatedict[key]
)
)
)
else:
# One of the values isn't a dict to merge so use the value from
# updatedict.
yield (key, updatedict[key])
elif key in basedict:
yield (key, basedict[key])
else:
yield (key, updatedict[key])
[docs]def update_recursive(basedict, updatedict):
"""
Recursively run update on a dictionary
Parameters
----------
basedict : dict
First dictionary, which will contain the updated result
updatedict : dict
Second dictionary that will be be updated to basedict, conflicts will
be replaced with the values from this dictionary.
Returns
-------
dict
basedict updated to contain the values from updatedict
"""
return dict(update_recursive_generator(basedict, updatedict))