Skip to main content

_signal_and_slot

TOC

Attributes

🅰 __TIME_CTR_MAX_CYCLE

__TIME_CTR_MAX_CYCLE = Decimal("99999.0") #YYYY-MM-DDTHH:MM:SS.SSS

🅰 __TIME_UNIT_SEC

__TIME_UNIT_SEC = Decimal("0.1") #YYYY-MM-DDTHH:MM:SS.SSS

🅰 _WATCH_QUERY_DICT

_WATCH_QUERY_DICT = Registry("__pipeline_watch") #YYYY-MM-DDTHH:MM:SS.SSS

🅰 _LISTEN_QUERY_DICT

_LISTEN_QUERY_DICT = collections.defaultdict(lambda: {}) #YYYY-MM-DDTHH:MM:SS.SSS

🅰 _UPDATE_VALUE_DICT

_UPDATE_VALUE_DICT = collections.defaultdict(lambda: {}) #YYYY-MM-DDTHH:MM:SS.SSS

🅰 DATETIME_FORMAT

DATETIME_FORMAT = """%Y-%m-%dT%H:%M:%S.%f""" #YYYY-MM-DDTHH:MM:SS.SSS

🅰 _DEFAULT_CHANNEL

_DEFAULT_CHANNEL = str(uuid4()) #default watch and listen channel. users use this channel by default. default channel name varies on each start

🅰 SYSTEM_CHANNEL

SYSTEM_CHANNEL = """__system""" #values on this channel will upload via http client

🅰 update_thread

update_thread = Thread(target=_update_thread, daemon=True)

Functions

🅵 __get

def __get(name: str, channel):

🅵 __update_and_get

def __update_and_get(name: str, *args, **kwargs):

🅵 _watch

def _watch(
func: Callable,
name: Optional[str],
interval: float,
initiative: bool = False,
overwrite: bool = False,
_channel: str = None,
):

🅵 watch

def watch(
name: str = None,
interval: float = None,
initiative: bool = True,
overwrite: bool = False,
_channel: str = None,
):

🅵 _listen

def _listen(
func: Callable,
target: Union[str, Callable],
listener_name: Optional[str] = None,
overwrite: bool = False,
):

🅵 listen

def listen(
target: Union[str, Any],
listener_name: Optional[str] = None,
overwrite: bool = False,
):

🅵 _update_thread

def _update_thread():

Classes

🅲 _WatchConfig

class _WatchConfig(dict):

🅲 _WatchedFun

class _WatchedFun:

🅼 __init__

def __init__(self, func: Callable, cfg: _WatchConfig) -> None:

🅼 __call__

def __call__(self, *args, **kwargs) -> Any:

🅼 __repr__

def __repr__(self) -> str: