Source code for models.base_model

"""
Models initializer.
"""
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
from server import app
from utils import time_to_json
from errors import DataBaseException

DB = SQLAlchemy(app)


[docs]def db_factory_func(func): """ Database connection decorator. Creates a database connection and wraps it with try/expect and gives to given function :param func: function to decorate. """ @wraps(func) def wrapper(*args, **kw): """ Wrapper function for database operations. """ try: conn = DB.engine.connect() result = func(conn=conn, *args, **kw) if result is not None: return [time_to_json(dict(r)) for r in result] return result finally: if conn is not None: conn.close() return wrapper
[docs]class BaseModel: """ Base class for the data manipulation and database operations. """ def __init__(self, table, fields, primary_key=None, init_table=False): self.table_name = table self.fields = fields self.primary_key = primary_key if init_table: self.__init_table() @db_factory_func def __init_table(self, conn=None): """ Checks If the table exists and if it is not created yet, creates the table. :param conn: Connection from the db_factory_func. """ try: result = conn.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'; """) result = [dict(r) for r in result] result = [r.get("table_name") for r in result] if self.table_name not in result: table_fields = [] for field in self.fields: table_fields.append(field + " " + self.fields[field]) if self.primary_key: table_fields.append( "PRIMARY KEY (" + str.join(", ", self.primary_key) + ")") init_query = "CREATE TABLE {} ( {} )".format( self.table_name, str.join(", ", table_fields)) conn.execute(init_query) except Exception as sql_err: print(sql_err)
[docs] @db_factory_func def create(self, conn=None, data=None): """ Creates a and inserts a database row. Given data dictionary fields must be in database fields. :param conn: Connection from the db_factory_func. :param data: Relevant data to insert. """ try: if data: values = [] for field in data: if self.fields.get(field) is not None: values.append("%({})s".format(field)) if values: sql_statement = "INSERT INTO {} ({}) VALUES({})".format(self.table_name, str.join( ", ", data.keys()), str.join(", ", values)) try: conn.execute(sql_statement, data) except Exception as sql_err: print(sql_err) raise DataBaseException("sql query error.") else: raise DataBaseException("data provided is not correct.") else: raise DataBaseException("data is empty.") except DataBaseException: raise
[docs] @db_factory_func def find(self, conn=None, query="", limit=0, sort_by="", return_cols=None, offset=None): """ Finds and retrieves data with given query from database. :param conn: Connection from the db_factory_func. :param query: where sql query string. :param limit: sql limit value for select. :param sort_by: sql sort information. :param return_cols: array of fields to return. :param offset: sql offset value. """ try: selected_cols = "*" if return_cols: selected_cols = [] for col in return_cols: if self.fields.get(col) is not None: selected_cols.append(col) else: print("%s is not a valid column name!" % col) if selected_cols: selected_cols = str.join(",", selected_cols) else: raise DataBaseException("Invalid column selection.") sql_statement = "SELECT {} FROM {} ".format( selected_cols, self.table_name) if query: sql_statement += " WHERE {} ".format(query) if limit > 0: sql_statement += " LIMIT {} ".format(limit) if offset: sql_statement += " OFFSET {} ".format(offset) if sort_by: sql_statement += " ORDER BY {} ".format(sort_by) try: print(sql_statement) return conn.execute(sql_statement) except Exception as sql_err: print(sql_err) raise DataBaseException("sql query error.") except DataBaseException: raise
[docs] def find_one(self, query=""): """ Finds one element from database with a given query. :param query: where sql query string. """ return self.find(query=query, limit=1)
[docs] def find_by_id(self, _id): """ Finds one element from database with a given id. :param id: element id number. """ return self.find_one(query="id=%s" % _id)
[docs] @db_factory_func def update(self, conn=None, data=None, query="", return_cols=None): """ Finds and updates the rows with given query. :param conn: Connection from the db_factory_func. :param data: Relevant data to update. :param query: where sql query string. :param return_cols: array of fields to return. """ try: if data: sql_statement = "UPDATE {} SET ".format(self.table_name) values = [] for field in data: if self.fields.get(field) is not None: values.append("{}=%({})s".format(field, field)) if values: sql_statement += str.join(", ", values) else: raise DataBaseException("No valid field is provided.") if query: sql_statement += " WHERE {} ".format(query) if return_cols: return_cols = [ col for col in return_cols if col in self.fields.keys()] if return_cols: sql_statement += " RETURNING {} ".format( str.join(", ", return_cols)) else: raise DataBaseException("Invalid column return!") try: return conn.execute(sql_statement, data) except Exception as sql_err: print(sql_err) raise DataBaseException("sql query error.") else: raise DataBaseException("No data to update.") except DataBaseException: raise
[docs] def update_by_id(self, _id, data=None, return_cols=None): """ Update a row with id. :param id: element id number. :param data: Relevant data to update. :param return_cols: array of fields to return. """ return self.update(data=data, query=("id=%s" % _id), return_cols=return_cols)[0]
[docs] @db_factory_func def delete(self, conn=None, query="", return_cols=None): """ Deletes a row with given query. :param conn: Connection from the db_factory_func. :param query: where sql query string. :param return_cols: array of fields to return. """ sql_statement = "DELETE FROM {} ".format(self.table_name) if query: sql_statement += " WHERE {} ".format(query) if return_cols: return_cols = [ col for col in return_cols if col in self.fields.keys()] if return_cols: sql_statement += " RETURNING {} ".format(str.join(", ", return_cols)) else: raise DataBaseException("Invalid field return!") try: return conn.execute(sql_statement) except Exception as sql_err: print(sql_err) raise DataBaseException("sql query error.")
[docs] def delete_by_id(self, _id, return_cols=None): """ Deletes a row with the given id. :param id: element id number. :param return_cols: array of fields to return. """ return self.delete(query=("id=%s" % _id), return_cols=return_cols)[0]