Skip to main content

_client

TOC

Attributes

🅰 logger

logger = Logger(name_alias="CLIENT", skip_writers_names=["ws"])

🅰 connection

connection = NeetboxClient() #assign this connection to websocket log writer

🅰 LogWriters

LogWriters = Registry("LOG_WRITERS")

Functions

🅵 addr_of_api

def addr_of_api(api, http_root=None):

🅵 log_writer_ws

def log_writer_ws(log: RawLog):

Classes

🅲 NeetboxClient

class NeetboxClient:

🅼 __init__

def __init__(self) -> None:

🅼 wait_should_online

def wait_should_online(self):

🅼 post_check_online

def post_check_online(self, api: str, root: str = None, *args, **kwargs):

🅼 post

def post(self, api: str, root: str = None, *args, **kwargs):

🅼 get_check_online

def get_check_online(self, api: str, root: str = None, *args, **kwargs):

🅼 get

def get(self, api: str, root: str = None, *args, **kwargs):

🅼 put_check_online

def put_check_online(self, api: str, root: str = None, *args, **kwargs):

🅼 put

def put(self, api: str, root: str = None, *args, **kwargs):

🅼 delete_check_online

def delete_check_online(self, api: str, root: str = None, *args, **kwargs):

🅼 delete

def delete(self, api: str, root: str = None, *args, **kwargs):

🅼 subscribe

def subscribe(self, event_type_name: str, callback):

🅼 unsubscribe

def unsubscribe(self, event_type_name: str, callback):

🅼 ws_subscribe

def ws_subscribe(self, event_type_name: str):

let a function subscribe to ws messages with event type name.

!!! dfor inner APIs only, do not use this in your code! !!! developers should contorl blocking on their own functions

Parameters:

  • function (Callable): who is subscribing the event type
  • event_type_name (str) (default to None): Which event to listen. Defaults to None.

🅼 check_server_connectivity

def check_server_connectivity(self, config=None):

🅼 initialize_connection

def initialize_connection(self, config=None):

🅼 on_ws_open

def on_ws_open(self, ws: WebsocketClient):

🅼 on_ws_err

def on_ws_err(self, ws: WebsocketClient, msg):

🅼 on_ws_close

def on_ws_close(self, ws: WebsocketClient, close_status_code, close_msg):

🅼 on_ws_message

def on_ws_message(self, ws: WebsocketClient, message):

🅼 ws_send

def ws_send(
self,
event_type: str,
payload: dict,
series=None,
timestamp: str = None,
event_id=-1,
identity_type=IdentityType.CLI,
_history_len=-1,
):