_client
TOC
- Attributes:
- 🅰 logger
- 🅰 connection - assign this connection to websocket log writer
- 🅰 LogWriters
- Functions:
- Classes:
Attributes
🅰 logger
logger = Logger(name_alias="CLIENT", skip_writers_names=["ws"])
🅰 connection
connection = NeetboxClient() #assign this connection to websocket log writer
🅰 LogWriters
LogWriters = Registry("LOG_WRITERS")
Functions
🅵 addr_of_api
def addr_of_api(api, http_root=None):
🅵 log_writer_ws
def log_writer_ws(log: RawLog):
Classes
🅲 NeetboxClient
class NeetboxClient:
🅼 __init__
def __init__(self) -> None:
🅼 wait_should_online
def wait_should_online(self):
🅼 post_check_online
def post_check_online(self, api: str, root: str = None, *args, **kwargs):
🅼 post
def post(self, api: str, root: str = None, *args, **kwargs):
🅼 get_check_online
def get_check_online(self, api: str, root: str = None, *args, **kwargs):
🅼 get
def get(self, api: str, root: str = None, *args, **kwargs):
🅼 put_check_online
def put_check_online(self, api: str, root: str = None, *args, **kwargs):
🅼 put
def put(self, api: str, root: str = None, *args, **kwargs):
🅼 delete_check_online
def delete_check_online(self, api: str, root: str = None, *args, **kwargs):
🅼 delete
def delete(self, api: str, root: str = None, *args, **kwargs):
🅼 subscribe
def subscribe(self, event_type_name: str, callback):
🅼 unsubscribe
def unsubscribe(self, event_type_name: str, callback):
🅼 ws_subscribe
def ws_subscribe(self, event_type_name: str):
let a function subscribe to ws messages with event type name.
!!! dfor inner APIs only, do not use this in your code! !!! developers should contorl blocking on their own functions
Parameters:
- function (Callable): who is subscribing the event type
- event_type_name (str) (default to
None
): Which event to listen. Defaults to None.
🅼 check_server_connectivity
def check_server_connectivity(self, config=None):
🅼 initialize_connection
def initialize_connection(self, config=None):
🅼 on_ws_open
def on_ws_open(self, ws: WebsocketClient):
🅼 on_ws_err
def on_ws_err(self, ws: WebsocketClient, msg):
🅼 on_ws_close
def on_ws_close(self, ws: WebsocketClient, close_status_code, close_msg):
🅼 on_ws_message
def on_ws_message(self, ws: WebsocketClient, message):
🅼 ws_send
def ws_send(
self,
event_type: str,
payload: dict,
series=None,
timestamp: str = None,
event_id=-1,
identity_type=IdentityType.CLI,
_history_len=-1,
):