Skip to main content

_project_db

TOC

Attributes

🅰 logger

logger = Logger("PROJECT DB", skip_writers_names=["ws"]) #check if the db path is valid. if not, create it

🅰 DB_PROJECT_FILE_FOLDER

DB_PROJECT_FILE_FOLDER = f"""{get_global_config('vault')}/server/db/project""" #check if the db path is valid. if not, create it

🅰 DB_PROJECT_FILE_TYPE_NAME

DB_PROJECT_FILE_TYPE_NAME = """projectdb""" #check if the db path is valid. if not, create it

Functions

🅵 _CHECK_GET_PROJECT_FILE_FOLDER

def _CHECK_GET_PROJECT_FILE_FOLDER():

check if the db path is valid. if not, create it

Classes

🅲 ProjectDB

class ProjectDB(ManageableDB):

🅼 __new__

def __new__(
cls, project_id: str = None, path: str = None, **kwargs
) -> "ProjectDB":

🅼 __repr__

def __repr__(self):

🅼 size

def size(self):

return local storage usage by this project db in bytes

Returns:

  • int: size in bytes

🅼 close

def close(self):

🅼 delete

def delete(self):

delete related files of db

🅼 items

def items(cls):

🅼 of_project_id

def of_project_id(cls, project_id):

🅼 _execute

def _execute(self, query, *args, fetch: FetchType = FetchType.ALL, **kwargs):

🅼 _query

def _query(self, query, *args, fetch: FetchType = FetchType.ALL, **kwargs):

🅼 table_exist

def table_exist(self, table_name):

🅼 get_table_names

def get_table_names(self):

🅼 fetch_db_version

def fetch_db_version(self, default=None):

🅼 fetch_db_project_id

def fetch_db_project_id(self, default=None):

🅼 get_id_of_run_id

def get_id_of_run_id(self, run_id: str):

🅼 fetch_id_of_run_id

def fetch_id_of_run_id(self, run_id: str, timestamp: str = None):

🅼 fetch_metadata_of_run_id

def fetch_metadata_of_run_id(
self, run_id: str, metadata: Union[dict, str] = None
):

🅼 get_run_id_of_id

def get_run_id_of_id(self, id_of_run_id):

🅼 get_run_ids

def get_run_ids(self):

🅼 delete_run_id

def delete_run_id(self, run_id: str):

🅼 get_series_of_table

def get_series_of_table(self, table_name, run_id=None):

🅼 do_limit_num_row_for

def do_limit_num_row_for(
self, table_name: str, run_id: str, num_row_limit: int, series: str = None
):

🅼 write_json

def write_json(
self,
table_name: str,
json_data: str,
series: str = None,
run_id: str = None,
timestamp: str = None,
num_row_limit=-1,
):

🅼 read_json

def read_json(self, table_name: str, condition: ProjectDbQueryCondition = None):

🅼 set_status

def set_status(self, run_id: str, series: str, json_data):

🅼 get_status

def get_status(self, run_id: str = None, series: str = None):

🅼 write_blob

def write_blob(
self,
table_name: str,
meta_data: Union[str, dict],
blob_data: bytes,
series: str = None,
run_id: str = None,
timestamp: str = None,
num_row_limit=-1,
):

🅼 read_blob

def read_blob(
self,
table_name: str,
condition: ProjectDbQueryCondition = None,
meta_only=False,
):

🅼 load_db_of_path

def load_db_of_path(cls, path):

🅼 get_db_list

def get_db_list(cls):

🅼 get_db_of_id

def get_db_of_id(cls, project_id, rescan: bool = True):