wiki:AdvancedApp

Version 1 (modified by 221511, 6 days ago) ( diff )

--

Advanced Application Development

Transactions

A database transaction groups multiple operations into a single atomic unit: either all operations succeed and are committed, or all are rolled back if any error occurs. This prevents the database from being left in an inconsistent state.

In the FRRUAS prototype (Python + psycopg2), we implement transactions using a context manager get_transaction() that automatically commits on success and rolls back on exception:

@contextmanager
def get_transaction():
    p = _get_pool()
    conn = p.getconn()
    try:
        conn.autocommit = False
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        p.putconn(conn)

Transaction 1: Make a Reservation (conflict check + insert)

When a user creates a reservation, two operations must happen atomically: checking for time conflicts and inserting the reservation. Without a transaction, a race condition exists: two users could both pass the conflict check simultaneously, and both insert overlapping reservations.

# uc_reserve.py — make_reservation()

with get_transaction() as conn:
    with conn.cursor() as cur:
        # Re-check conflicts inside the transaction to prevent
        # race conditions (another user inserting between our
        # check and insert)
        cur.execute("""
            SELECT COUNT(*) FROM reservations
            WHERE resource_id = %s
              AND status IN ('approved', 'pending')
              AND start_time < %s AND end_time > %s
        """, (rid, end_dt, start_dt))

        if cur.fetchone()[0] > 0:
            raise RuntimeError("Conflict: another reservation was just created for this time slot")

        cur.execute("""
            INSERT INTO reservations
                (start_time, end_time, status, purpose, created_at, user_id, resource_id)
            VALUES (%s, %s, 'pending', %s, CURRENT_TIMESTAMP, %s, %s)
            RETURNING reservation_id, status, created_at
        """, (start_dt, end_dt, purpose, user_id, rid))

        result = cur.fetchone()
# Transaction committed automatically on successful exit

Transaction 2: Approve or Reject a Reservation (conflict re-check + status update)

When an administrator approves a reservation, the system must verify no conflicting reservation was approved in the meantime. The conflict check and status update are wrapped in a single transaction to prevent two administrators from simultaneously approving overlapping reservations.

# uc_approve.py — _review_reservation()

with get_transaction() as conn:
    with conn.cursor() as cur:
        # For approvals, re-check conflicts within the transaction
        if new_status == "approved":
            cur.execute("""
                SELECT COUNT(*) FROM reservations
                WHERE resource_id = %s
                  AND reservation_id != %s
                  AND status = 'approved'
                  AND start_time < %s AND end_time > %s
            """, (res["resource_id"], res["reservation_id"],
                  res["end_time"], res["start_time"]))

            if cur.fetchone()[0] > 0:
                raise RuntimeError("Conflict: another reservation was approved for this time slot")

        cur.execute("""
            UPDATE reservations
            SET status = %s, approved_by = %s
            WHERE reservation_id = %s AND status = 'pending'
            RETURNING reservation_id, status
        """, (new_status, admin_user_id, res["reservation_id"]))

        result = cur.fetchone()
# Transaction committed automatically

Transaction 3: Register a New User (email uniqueness check + insert)

User registration checks email uniqueness and inserts the new user atomically. Without a transaction, two simultaneous registrations with the same email could both pass the uniqueness check and both insert, violating the UNIQUE constraint.

# uc_users.py — register_user()

with get_transaction() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM users WHERE email = %s", (email,))

        if cur.fetchone()[0] > 0:
            raise ValueError(f"A user with email '{email}' already exists.")

        cur.execute("""
            INSERT INTO users (first_name, last_name, email, password, type_id)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING user_id, first_name, last_name, email
        """, (first_name, last_name, email, hashed, type_id))

        result = cur.fetchone()
# Transaction committed automatically

Database Connection Pooling

Connection pooling reuses open database connections instead of creating a new one for every query. Opening a database connection involves TCP handshake, authentication, and session setup, which adds overhead. With pooling, these costs are paid once, and subsequent operations borrow pre-established connections from the pool.

Implementation

The prototype uses psycopg2.pool.SimpleConnectionPool to maintain a pool of reusable connections:

# db.py

from psycopg2 import pool

_pool = None

def _get_pool():
    global _pool
    if _pool is None:
        _pool = pool.SimpleConnectionPool(minconn=2, maxconn=10, **DB_CONFIG)
    return _pool

Pool configuration:

ParameterValueMeaning
minconn2Minimum connections kept open and ready
maxconn10Maximum concurrent connections allowed

The pool is initialized lazily on first use and cleaned up when the application exits via close_pool().

Connection Usage

Two context managers provide connections from the pool:

Read operations use get_connection() with autocommit enabled:

@contextmanager
def get_connection():
    p = _get_pool()
    conn = p.getconn()
    try:
        conn.autocommit = True
        yield conn
    finally:
        conn.autocommit = False
        p.putconn(conn)

Write operations use get_transaction() with explicit commit/rollback:

@contextmanager
def get_transaction():
    p = _get_pool()
    conn = p.getconn()
    try:
        conn.autocommit = False
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        p.putconn(conn)

In both cases, the connection is automatically returned to the pool when the with block exits, ensuring no connection leaks.

Example: Read Operation Using Pool

# uc_browse.py — fetching resources

with get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("""
            SELECT r.resource_id, r.name, rt.type_name, l.building, l.room
            FROM resources r
            JOIN resource_types rt ON r.type_id = rt.type_id
            LEFT JOIN locations l ON r.location_id = l.location_id
            ORDER BY rt.type_name, r.name
        """)
        resources = cur.fetchall()
# Connection returned to pool automatically

Pool Lifecycle

# main.py

def main():
    try:
        while True:
            user = login()       # uses pooled connections
            if user is None:
                break
            main_menu(...)       # all operations use pooled connections
    except KeyboardInterrupt:
        pass
    finally:
        close_pool()             # close all connections on exit
Note: See TracWiki for help on using the wiki.