Module ti.kit.helpers
Expand source code Browse git
import os
import sys
import time
import re
from subprocess import run as run_cmd, CalledProcessError
from datetime import datetime as dt, timezone, UTC
import unicodedata
from .files import readYaml, unexpanduser as ux
TZ_RE = re.compile(r"""(?:Z|(?:[+-][0-9:]+))$""")
LETTER = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
SEP_RE = re.compile(r"[\n\t ,]+")
STRIP_RE = re.compile(r"(?:^[\n\t ,]+)|(?:[\n\t ,]+$)", re.S)
VAR_RE = re.compile(r"\{([^}]+?)(:[^}]*)?\}")
MSG_LINE_RE = re.compile(r"^( *[0-9]+) (.*)$")
NUM_ALFA_RE = re.compile(r"^([0-9]*)([^0-9]*)(.*)$")
WHITE_RE = re.compile(r"""\s+""")
TO_ASCII_DEF = dict(
ñ="n",
ø="o",
ç="c",
)
"""Undecomposable UNICODE characters mapped to their related ASCII characters."""
TO_ASCII = {}
for u, a in TO_ASCII_DEF.items():
TO_ASCII[u] = a
TO_ASCII[u.upper()] = a.upper()
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
LETTER = set(ALPHABET) | set(ALPHABET.upper()) | set("()-")
VALID = set("_0123456789") | LETTER
SEQ_RE = re.compile(r"""^(.*)\(([0-9]+)\)$""", re.S)
VAR_RE = re.compile(
r"""
(\$?)
\{
([a-z0-9_]+)
\}
""",
re.X | re.S | re.I,
)
def makeVarReplace(info, found, notFound):
def varReplace(match):
(fullName, kind, name) = match.group(0, 1, 2)
fullName = fullName.replace("{", "").replace("}", "")
result = var(name) if kind else info.get(name, None)
if result is None:
notFound.append(fullName)
result = fullName
else:
found[fullName] = result
return result
return varReplace
def normalize(text):
"""Produce a normalized version of a string.
Parameters
----------
text: string
The input text
Returns
-------
string
The lower-cased, whitespace normalized version of the input.
"""
return WHITE_RE.sub(" ", text.strip()).lower()
def toAscii(text, lowercase=True):
"""Transforms a text with diacritical marks into a plain ASCII text.
Characters with diacritics are replaced by their base character.
Some characters with diacritics are considered by UNICODE to be undecomposable
characters, such as `ø` and `ñ`.
We use a table (`TO_ASCII_DEF`) to map these on their related ASCII characters.
We replace all consecutive whitespace by `_`, en we replace all non-alphanumeric
characters except `()` by `x`.
Parameters
----------
text: string
The text to be translated
lowercase: boolean, optional True
Convert the text to lowercase
Returns
-------
string
The translated text.
"""
text = WHITE_RE.sub("_", text.strip())
if lowercase:
text = text.lower()
text = "".join(
TO_ASCII.get(c, c)
for c in unicodedata.normalize("NFD", text)
if unicodedata.category(c) != "Mn"
)
text = "".join(c if c in VALID else "x" for c in text)
return text
def makeAsciiUnique(text, among):
"""Transforms a text into plain ASCII and makes it unique among alternatives.
Paramaters
----------
text: string
The text to be translated
among: set of string
The strings among which the result should be unique.
It is assumed that these strings are themselves the results of `toAscii`.
"""
candidate = toAscii(text)
if candidate not in among:
return candidate
match = SEQ_RE.match(candidate)
(base, seq) = match.group(1, 2) if match else (candidate, None)
if seq is not None:
seq = int(seq)
oSeqs = set()
for other in sorted(among):
match = SEQ_RE.match(other)
(oBase, oSeq) = match.group(1, 2) if match else (other, None)
if oBase == base:
oSeqs.add(None if oSeq is None else int(oSeq))
if len(oSeqs):
if seq in oSeqs:
newSeqRep = (
f"({max((oSeq for oSeq in oSeqs if oSeq is not None), default=0) + 1})"
)
else:
newSeqRep = "" if seq is None else f"({seq})"
else:
newSeqRep = "(1)"
return f"{base}{newSeqRep}"
def htmlEsc(val, math=False):
"""Escape certain HTML characters by HTML entities.
To prevent them to be interpreted as HTML
in cases where you need them literally.
Parameters
----------
val: string
The input value
math: boolean, optional False
Whether retain TeX notation.
If True, `$` is not escaped, if False, it is not escaped.
"""
return (
""
if val is None
else (
(str(val).replace("&", "&").replace("<", "<").replace(">", ">"))
if math
else (
str(val)
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace("$", "<span>$</span>")
)
)
)
def utcnow():
"""The current moment in time in the UTC time zone.
Returns
-------
datetime
An aware datetime object (in the sense of: having the timezone included
in its value.
"""
return dt.now(timezone.utc)
def isonow():
"""The current moment in time as an ISO 8601 string value.
Details:
* the precision is up to the second;
* the separator between the date part and the timpe part is `T`;
* the timezone is UTC, marked as `Z` directly after the time part.
Returns
-------
string
E.g. `2024-11-13T10:53:15Z`
"""
return TZ_RE.sub("Z", utcnow().isoformat(timespec="seconds", sep="T"))
def pseudoisonow():
"""The current moment in time as a isolike string value.
It is like `isonow()`, but the time separators (`:`) are
replaced by `-`, so that the string can be included in urls.
Returns
-------
string
E.g. `2024-11-13T10-53-15Z`
"""
return isonow().replace(":", "-")
def getDelta(days, refDate, iso=True):
if refDate is None:
# undefined dates count as not recent
return 0
delta = utcnow() - (
dt.fromisoformat(refDate) if iso else dt.fromtimestamp(refDate, tz=UTC)
)
deltaDays = delta.days + delta.seconds / 86400
return deltaDays
def lessAgo(days, refDate, iso=True):
return 0 <= getDelta(days, refDate, iso=iso) < days
def versionSort(x):
parts = []
for p in x.split("."):
match = NUM_ALFA_RE.match(p)
(num, alfa, rest) = match.group(1, 2, 3)
parts.append((int(num) if num else 0, alfa, rest))
return tuple(parts)
def plainify(value):
"""Make sure that the value is either a string or a list of strings.
If it is a dict, turn it into a list of stringified key-value pairs.
"""
if value is None:
return ""
tp = type(value)
if tp is list:
return [plainify(v) for v in value]
if tp is dict:
return [f"{k}: {plainify(v)}" for (k, v) in value.items()]
return str(value)
def ucFirst(x):
if not x:
return ""
return x[0].upper() + x[1:]
def prettify(x):
return " ".join(ucFirst(w) for w in x.split("_"))
def var(envVar):
"""Retrieves the value of an environment variable.
Parameters
----------
envVar: string
The name of the environment variable.
Returns
-------
string or void
The value of the environment variable if it exists, otherwise `None`.
"""
return os.environ.get(envVar, None)
def fillin(pairs, config):
"""Fills in variable parts into values of a dict.
Values may contain strings of the form `{var}`.
These `var` names will be looked up in a source dict, and their values
will be substituted.
Values may also contain strings of the form `${var}`.
These `var` names refer to environment variables and will be looked up as well.
Parameters
----------
pairs: dict
The dict whose values must be filled in
config: dict
Source of the values used for filling
Returns
-------
list, dict
`list` is a list of variables that could not be looked up.
"""
notFound = []
found = {}
result = {}
varReplace = makeVarReplace(config, found, notFound)
for (k, v) in pairs.items():
if type(v) is str:
v = VAR_RE.sub(varReplace, v)
result[k] = v
return notFound, found, result
def addToDict(source, additions):
"""Add material from an dict to an other dict without destroying information.
Values in the additions will be added to the source only if the key in question
does not exist in the source, or its value is `None`.
If the key for a value exists in the source, and the source and addition values
are both dicts, the addition will proceed recursively.
In all other cases, no replacement takes place.
Parameters
----------
source: dict
The source dictionary, which will be modified by the additions.
additions: dict
The additions, itself a dictionary.
"""
for k, v in additions.items():
if k not in source or source[k] is None:
source[k] = v
else:
sv = source[k]
if type(sv) is dict and type(v) is dict:
addToDict(sv, v)
def console(*msg, error=False, newline=True, sleep=None, indent=""):
msg = " ".join(m if type(m) is str else repr(m) for m in msg)
msg = "" if not msg else ux(msg)
msg = msg[1:] if msg.startswith("\n") else msg
msg = msg[0:-1] if msg.endswith("\n") else msg
target = sys.stderr if error else sys.stdout
nl = "\n" if newline else ""
if indent:
msg = indent + msg.replace("\n", f"\n{indent}")
target.write(f"{msg}{nl}")
target.flush()
if sleep:
time.sleep(sleep)
def consoleT01(msg, error=False, newline=True):
console(msg, error=error, newline=newline, sleep=0.1)
def run(cmdline, workDir=None):
"""Runs a shell command and returns all relevant info.
The function runs a command-line in a shell, and returns
whether the command was successful, and also what the output was, separately for
standard error and standard output.
Parameters
----------
cmdline: string
The command-line to execute.
workDir: string, optional None
The working directory where the command should be executed.
If `None` the current directory is used.
"""
try:
result = run_cmd(
cmdline,
shell=True,
cwd=workDir,
check=True,
capture_output=True,
)
stdOut = result.stdout.decode("utf8").strip()
stdErr = result.stderr.decode("utf8").strip()
returnCode = 0
good = True
except CalledProcessError as e:
stdOut = e.stdout.decode("utf8").strip()
stdErr = e.stderr.decode("utf8").strip()
returnCode = e.returncode
good = False
return (good, returnCode, stdOut, stdErr)
def readCfg(settingsFile, label, verbose=0, **kwargs):
settings = readYaml(asFile=settingsFile, **kwargs)
if settings:
if verbose == 1:
console(f"{label} settings read from {settingsFile}")
good = True
else:
console(f"No {label} settings found, looked for {settingsFile}", error=True)
good = False
return (good, settings)
Global variables
var TO_ASCII_DEF-
Undecomposable UNICODE characters mapped to their related ASCII characters.
Functions
def addToDict(source, additions)-
Add material from an dict to an other dict without destroying information.
Values in the additions will be added to the source only if the key in question does not exist in the source, or its value is
None.If the key for a value exists in the source, and the source and addition values are both dicts, the addition will proceed recursively.
In all other cases, no replacement takes place.
Parameters
source:dict- The source dictionary, which will be modified by the additions.
additions:dict- The additions, itself a dictionary.
Expand source code Browse git
def addToDict(source, additions): """Add material from an dict to an other dict without destroying information. Values in the additions will be added to the source only if the key in question does not exist in the source, or its value is `None`. If the key for a value exists in the source, and the source and addition values are both dicts, the addition will proceed recursively. In all other cases, no replacement takes place. Parameters ---------- source: dict The source dictionary, which will be modified by the additions. additions: dict The additions, itself a dictionary. """ for k, v in additions.items(): if k not in source or source[k] is None: source[k] = v else: sv = source[k] if type(sv) is dict and type(v) is dict: addToDict(sv, v) def console(*msg, error=False, newline=True, sleep=None, indent='')-
Expand source code Browse git
def console(*msg, error=False, newline=True, sleep=None, indent=""): msg = " ".join(m if type(m) is str else repr(m) for m in msg) msg = "" if not msg else ux(msg) msg = msg[1:] if msg.startswith("\n") else msg msg = msg[0:-1] if msg.endswith("\n") else msg target = sys.stderr if error else sys.stdout nl = "\n" if newline else "" if indent: msg = indent + msg.replace("\n", f"\n{indent}") target.write(f"{msg}{nl}") target.flush() if sleep: time.sleep(sleep) def consoleT01(msg, error=False, newline=True)-
Expand source code Browse git
def consoleT01(msg, error=False, newline=True): console(msg, error=error, newline=newline, sleep=0.1) def fillin(pairs, config)-
Fills in variable parts into values of a dict.
Values may contain strings of the form
{var}. Thesevar()names will be looked up in a source dict, and their values will be substituted.Values may also contain strings of the form
${var}. Thesevar()names refer to environment variables and will be looked up as well.Parameters
pairs:dict- The dict whose values must be filled in
config:dict- Source of the values used for filling
Returns
list, dictlistis a list of variables that could not be looked up.
Expand source code Browse git
def fillin(pairs, config): """Fills in variable parts into values of a dict. Values may contain strings of the form `{var}`. These `var` names will be looked up in a source dict, and their values will be substituted. Values may also contain strings of the form `${var}`. These `var` names refer to environment variables and will be looked up as well. Parameters ---------- pairs: dict The dict whose values must be filled in config: dict Source of the values used for filling Returns ------- list, dict `list` is a list of variables that could not be looked up. """ notFound = [] found = {} result = {} varReplace = makeVarReplace(config, found, notFound) for (k, v) in pairs.items(): if type(v) is str: v = VAR_RE.sub(varReplace, v) result[k] = v return notFound, found, result def getDelta(days, refDate, iso=True)-
Expand source code Browse git
def getDelta(days, refDate, iso=True): if refDate is None: # undefined dates count as not recent return 0 delta = utcnow() - ( dt.fromisoformat(refDate) if iso else dt.fromtimestamp(refDate, tz=UTC) ) deltaDays = delta.days + delta.seconds / 86400 return deltaDays def htmlEsc(val, math=False)-
Escape certain HTML characters by HTML entities.
To prevent them to be interpreted as HTML in cases where you need them literally.
Parameters
val:string- The input value
math:boolean, optionalFalse- Whether retain TeX notation.
If True,
$is not escaped, if False, it is not escaped.
Expand source code Browse git
def htmlEsc(val, math=False): """Escape certain HTML characters by HTML entities. To prevent them to be interpreted as HTML in cases where you need them literally. Parameters ---------- val: string The input value math: boolean, optional False Whether retain TeX notation. If True, `$` is not escaped, if False, it is not escaped. """ return ( "" if val is None else ( (str(val).replace("&", "&").replace("<", "<").replace(">", ">")) if math else ( str(val) .replace("&", "&") .replace("<", "<") .replace(">", ">") .replace("$", "<span>$</span>") ) ) ) def isonow()-
The current moment in time as an ISO 8601 string value.
Details:
- the precision is up to the second;
- the separator between the date part and the timpe part is
T; - the timezone is UTC, marked as
Zdirectly after the time part.
Returns
string- E.g.
2024-11-13T10:53:15Z
Expand source code Browse git
def isonow(): """The current moment in time as an ISO 8601 string value. Details: * the precision is up to the second; * the separator between the date part and the timpe part is `T`; * the timezone is UTC, marked as `Z` directly after the time part. Returns ------- string E.g. `2024-11-13T10:53:15Z` """ return TZ_RE.sub("Z", utcnow().isoformat(timespec="seconds", sep="T")) def lessAgo(days, refDate, iso=True)-
Expand source code Browse git
def lessAgo(days, refDate, iso=True): return 0 <= getDelta(days, refDate, iso=iso) < days def makeAsciiUnique(text, among)-
Transforms a text into plain ASCII and makes it unique among alternatives.
Paramaters
text: string The text to be translated
among: set of string The strings among which the result should be unique. It is assumed that these strings are themselves the results of
toAscii().Expand source code Browse git
def makeAsciiUnique(text, among): """Transforms a text into plain ASCII and makes it unique among alternatives. Paramaters ---------- text: string The text to be translated among: set of string The strings among which the result should be unique. It is assumed that these strings are themselves the results of `toAscii`. """ candidate = toAscii(text) if candidate not in among: return candidate match = SEQ_RE.match(candidate) (base, seq) = match.group(1, 2) if match else (candidate, None) if seq is not None: seq = int(seq) oSeqs = set() for other in sorted(among): match = SEQ_RE.match(other) (oBase, oSeq) = match.group(1, 2) if match else (other, None) if oBase == base: oSeqs.add(None if oSeq is None else int(oSeq)) if len(oSeqs): if seq in oSeqs: newSeqRep = ( f"({max((oSeq for oSeq in oSeqs if oSeq is not None), default=0) + 1})" ) else: newSeqRep = "" if seq is None else f"({seq})" else: newSeqRep = "(1)" return f"{base}{newSeqRep}" def makeVarReplace(info, found, notFound)-
Expand source code Browse git
def makeVarReplace(info, found, notFound): def varReplace(match): (fullName, kind, name) = match.group(0, 1, 2) fullName = fullName.replace("{", "").replace("}", "") result = var(name) if kind else info.get(name, None) if result is None: notFound.append(fullName) result = fullName else: found[fullName] = result return result return varReplace def normalize(text)-
Produce a normalized version of a string.
Parameters
text:string- The input text
Returns
string- The lower-cased, whitespace normalized version of the input.
Expand source code Browse git
def normalize(text): """Produce a normalized version of a string. Parameters ---------- text: string The input text Returns ------- string The lower-cased, whitespace normalized version of the input. """ return WHITE_RE.sub(" ", text.strip()).lower() def plainify(value)-
Make sure that the value is either a string or a list of strings.
If it is a dict, turn it into a list of stringified key-value pairs.
Expand source code Browse git
def plainify(value): """Make sure that the value is either a string or a list of strings. If it is a dict, turn it into a list of stringified key-value pairs. """ if value is None: return "" tp = type(value) if tp is list: return [plainify(v) for v in value] if tp is dict: return [f"{k}: {plainify(v)}" for (k, v) in value.items()] return str(value) def prettify(x)-
Expand source code Browse git
def prettify(x): return " ".join(ucFirst(w) for w in x.split("_")) def pseudoisonow()-
The current moment in time as a isolike string value.
It is like
isonow(), but the time separators (:) are replaced by-, so that the string can be included in urls.Returns
string- E.g.
2024-11-13T10-53-15Z
Expand source code Browse git
def pseudoisonow(): """The current moment in time as a isolike string value. It is like `isonow()`, but the time separators (`:`) are replaced by `-`, so that the string can be included in urls. Returns ------- string E.g. `2024-11-13T10-53-15Z` """ return isonow().replace(":", "-") def readCfg(settingsFile, label, verbose=0, **kwargs)-
Expand source code Browse git
def readCfg(settingsFile, label, verbose=0, **kwargs): settings = readYaml(asFile=settingsFile, **kwargs) if settings: if verbose == 1: console(f"{label} settings read from {settingsFile}") good = True else: console(f"No {label} settings found, looked for {settingsFile}", error=True) good = False return (good, settings) def run(cmdline, workDir=None)-
Runs a shell command and returns all relevant info.
The function runs a command-line in a shell, and returns whether the command was successful, and also what the output was, separately for standard error and standard output.
Parameters
cmdline:string- The command-line to execute.
workDir:string, optionalNone- The working directory where the command should be executed.
If
Nonethe current directory is used.
Expand source code Browse git
def run(cmdline, workDir=None): """Runs a shell command and returns all relevant info. The function runs a command-line in a shell, and returns whether the command was successful, and also what the output was, separately for standard error and standard output. Parameters ---------- cmdline: string The command-line to execute. workDir: string, optional None The working directory where the command should be executed. If `None` the current directory is used. """ try: result = run_cmd( cmdline, shell=True, cwd=workDir, check=True, capture_output=True, ) stdOut = result.stdout.decode("utf8").strip() stdErr = result.stderr.decode("utf8").strip() returnCode = 0 good = True except CalledProcessError as e: stdOut = e.stdout.decode("utf8").strip() stdErr = e.stderr.decode("utf8").strip() returnCode = e.returncode good = False return (good, returnCode, stdOut, stdErr) def toAscii(text, lowercase=True)-
Transforms a text with diacritical marks into a plain ASCII text.
Characters with diacritics are replaced by their base character. Some characters with diacritics are considered by UNICODE to be undecomposable characters, such as
øandñ. We use a table (TO_ASCII_DEF) to map these on their related ASCII characters.We replace all consecutive whitespace by
_, en we replace all non-alphanumeric characters except()byx.Parameters
text:string- The text to be translated
lowercase:boolean, optionalTrue- Convert the text to lowercase
Returns
string- The translated text.
Expand source code Browse git
def toAscii(text, lowercase=True): """Transforms a text with diacritical marks into a plain ASCII text. Characters with diacritics are replaced by their base character. Some characters with diacritics are considered by UNICODE to be undecomposable characters, such as `ø` and `ñ`. We use a table (`TO_ASCII_DEF`) to map these on their related ASCII characters. We replace all consecutive whitespace by `_`, en we replace all non-alphanumeric characters except `()` by `x`. Parameters ---------- text: string The text to be translated lowercase: boolean, optional True Convert the text to lowercase Returns ------- string The translated text. """ text = WHITE_RE.sub("_", text.strip()) if lowercase: text = text.lower() text = "".join( TO_ASCII.get(c, c) for c in unicodedata.normalize("NFD", text) if unicodedata.category(c) != "Mn" ) text = "".join(c if c in VALID else "x" for c in text) return text def ucFirst(x)-
Expand source code Browse git
def ucFirst(x): if not x: return "" return x[0].upper() + x[1:] def utcnow()-
The current moment in time in the UTC time zone.
Returns
datetime- An aware datetime object (in the sense of: having the timezone included in its value.
Expand source code Browse git
def utcnow(): """The current moment in time in the UTC time zone. Returns ------- datetime An aware datetime object (in the sense of: having the timezone included in its value. """ return dt.now(timezone.utc) def var(envVar)-
Retrieves the value of an environment variable.
Parameters
envVar:string- The name of the environment variable.
Returns
stringorvoid- The value of the environment variable if it exists, otherwise
None.
Expand source code Browse git
def var(envVar): """Retrieves the value of an environment variable. Parameters ---------- envVar: string The name of the environment variable. Returns ------- string or void The value of the environment variable if it exists, otherwise `None`. """ return os.environ.get(envVar, None) def versionSort(x)-
Expand source code Browse git
def versionSort(x): parts = [] for p in x.split("."): match = NUM_ALFA_RE.match(p) (num, alfa, rest) = match.group(1, 2, 3) parts.append((int(num) if num else 0, alfa, rest)) return tuple(parts)