_signal_and_slot
TOC
- Attributes:
- 🅰 __TIME_CTR_MAX_CYCLE - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 __TIME_UNIT_SEC - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 _WATCH_QUERY_DICT - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 _LISTEN_QUERY_DICT - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 _UPDATE_VALUE_DICT - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 DATETIME_FORMAT - YYYY-MM-DDTHH:MM:SS.SSS
- 🅰 _DEFAULT_CHANNEL - default watch and listen channel. users use this channel by default. default channel name varies on each start
- 🅰 SYSTEM_CHANNEL - values on this channel will upload via http client
- 🅰 update_thread
- Functions:
- 🅵 __get
- 🅵 __update_and_get
- 🅵 _watch
- 🅵 watch
- 🅵 _listen
- 🅵 listen
- 🅵 _update_thread
- Classes:
Attributes
🅰 __TIME_CTR_MAX_CYCLE
__TIME_CTR_MAX_CYCLE = Decimal("99999.0") #YYYY-MM-DDTHH:MM:SS.SSS
🅰 __TIME_UNIT_SEC
__TIME_UNIT_SEC = Decimal("0.1") #YYYY-MM-DDTHH:MM:SS.SSS
🅰 _WATCH_QUERY_DICT
_WATCH_QUERY_DICT = Registry("__pipeline_watch") #YYYY-MM-DDTHH:MM:SS.SSS
🅰 _LISTEN_QUERY_DICT
_LISTEN_QUERY_DICT = collections.defaultdict(lambda: {}) #YYYY-MM-DDTHH:MM:SS.SSS
🅰 _UPDATE_VALUE_DICT
_UPDATE_VALUE_DICT = collections.defaultdict(lambda: {}) #YYYY-MM-DDTHH:MM:SS.SSS
🅰 DATETIME_FORMAT
DATETIME_FORMAT = """%Y-%m-%dT%H:%M:%S.%f""" #YYYY-MM-DDTHH:MM:SS.SSS
🅰 _DEFAULT_CHANNEL
_DEFAULT_CHANNEL = str(uuid4()) #default watch and listen channel. users use this channel by default. default channel name varies on each start
🅰 SYSTEM_CHANNEL
SYSTEM_CHANNEL = """__system""" #values on this channel will upload via http client
🅰 update_thread
update_thread = Thread(target=_update_thread, daemon=True)
Functions
🅵 __get
def __get(name: str, channel):
🅵 __update_and_get
def __update_and_get(name: str, *args, **kwargs):
🅵 _watch
def _watch(
func: Callable,
name: Optional[str],
interval: float,
initiative: bool = False,
overwrite: bool = False,
_channel: str = None,
):
🅵 watch
def watch(
name: str = None,
interval: float = None,
initiative: bool = True,
overwrite: bool = False,
_channel: str = None,
):
🅵 _listen
def _listen(
func: Callable,
target: Union[str, Callable],
listener_name: Optional[str] = None,
overwrite: bool = False,
):
🅵 listen
def listen(
target: Union[str, Any],
listener_name: Optional[str] = None,
overwrite: bool = False,
):
🅵 _update_thread
def _update_thread():
Classes
🅲 _WatchConfig
class _WatchConfig(dict):
🅲 _WatchedFun
class _WatchedFun:
🅼 __init__
def __init__(self, func: Callable, cfg: _WatchConfig) -> None:
🅼 __call__
def __call__(self, *args, **kwargs) -> Any:
🅼 __repr__
def __repr__(self) -> str: