_project_db
TOC
- Attributes:
- 🅰 logger - check if the db path is valid. if not, create it
- 🅰 DB_PROJECT_FILE_FOLDER - check if the db path is valid. if not, create it
- 🅰 DB_PROJECT_FILE_TYPE_NAME - check if the db path is valid. if not, create it
- Functions:
- 🅵 _CHECK_GET_PROJECT_FILE_FOLDER - check if the db path is valid. if not, create it
- Classes:
Attributes
🅰 logger
logger = Logger("PROJECT DB", skip_writers_names=["ws"]) #check if the db path is valid. if not, create it
🅰 DB_PROJECT_FILE_FOLDER
DB_PROJECT_FILE_FOLDER = f"""{get_global_config('vault')}/server/db/project""" #check if the db path is valid. if not, create it
🅰 DB_PROJECT_FILE_TYPE_NAME
DB_PROJECT_FILE_TYPE_NAME = """projectdb""" #check if the db path is valid. if not, create it
Functions
🅵 _CHECK_GET_PROJECT_FILE_FOLDER
def _CHECK_GET_PROJECT_FILE_FOLDER():
check if the db path is valid. if not, create it
Classes
🅲 ProjectDB
class ProjectDB(ManageableDB):
🅼 __new__
def __new__(
cls, project_id: str = None, path: str = None, **kwargs
) -> "ProjectDB":
🅼 __repr__
def __repr__(self):
🅼 size
def size(self):
return local storage usage by this project db in bytes
Returns:
- int: size in bytes
🅼 close
def close(self):
🅼 delete
def delete(self):
delete related files of db
🅼 items
def items(cls):
🅼 of_project_id
def of_project_id(cls, project_id):
🅼 _execute
def _execute(self, query, *args, fetch: FetchType = FetchType.ALL, **kwargs):
🅼 _query
def _query(self, query, *args, fetch: FetchType = FetchType.ALL, **kwargs):
🅼 table_exist
def table_exist(self, table_name):
🅼 get_table_names
def get_table_names(self):
🅼 fetch_db_version
def fetch_db_version(self, default=None):
🅼 fetch_db_project_id
def fetch_db_project_id(self, default=None):
🅼 get_id_of_run_id
def get_id_of_run_id(self, run_id: str):
🅼 fetch_id_of_run_id
def fetch_id_of_run_id(self, run_id: str, timestamp: str = None):
🅼 fetch_metadata_of_run_id
def fetch_metadata_of_run_id(
self, run_id: str, metadata: Union[dict, str] = None
):
🅼 get_run_id_of_id
def get_run_id_of_id(self, id_of_run_id):
🅼 get_run_ids
def get_run_ids(self):
🅼 delete_run_id
def delete_run_id(self, run_id: str):
🅼 get_series_of_table
def get_series_of_table(self, table_name, run_id=None):
🅼 do_limit_num_row_for
def do_limit_num_row_for(
self, table_name: str, run_id: str, num_row_limit: int, series: str = None
):
🅼 write_json
def write_json(
self,
table_name: str,
json_data: str,
series: str = None,
run_id: str = None,
timestamp: str = None,
num_row_limit=-1,
):
🅼 read_json
def read_json(self, table_name: str, condition: ProjectDbQueryCondition = None):
🅼 set_status
def set_status(self, run_id: str, series: str, json_data):
🅼 get_status
def get_status(self, run_id: str = None, series: str = None):
🅼 write_blob
def write_blob(
self,
table_name: str,
meta_data: Union[str, dict],
blob_data: bytes,
series: str = None,
run_id: str = None,
timestamp: str = None,
num_row_limit=-1,
):
🅼 read_blob
def read_blob(
self,
table_name: str,
condition: ProjectDbQueryCondition = None,
meta_only=False,
):
🅼 load_db_of_path
def load_db_of_path(cls, path):
🅼 get_db_list
def get_db_list(cls):
🅼 get_db_of_id
def get_db_of_id(cls, project_id, rescan: bool = True):