Db session

use "promises"

actor DbSession
  """
  Actor wrapper around Connection. Provides non-blocking behaviors that
  return Promises, enabling concurrent database access without blocking
  the caller's actor.

  Each DbSession owns one Connection. Operations are serialized through
  the actor's mailbox — no concurrent access to the underlying ODBC handles.

  Usage:
  ```pony
  let db = DbSession(Dsn("DSN=mydb"))
  let p = db.exec("CREATE TABLE t (id INTEGER)")
  p.next[None]({(result: (RowCount | ExecError)) =>
    match result
    | let rc: RowCount => env.out.print("done")
    | let e: ExecError => env.err.print(e.string())
    end
  })
  ```
  """

  var _conn: (Connection | ConnectError)
  let _opts: OdbcOptions

  new create(dsn: Dsn, opts: OdbcOptions = OdbcOptions) =>
    _opts = opts
    _conn = Odbc.connect(dsn, opts)

  be exec(sql: String val, promise: Promise[(RowCount | ExecError)]) =>
    """
    Execute DDL/DML and fulfill the promise with the result.
    """
    match \exhaustive\ _conn
    | let conn: Connection =>
      promise(conn.exec(sql))
    | let e: ConnectError =>
      promise(ExecError(
        ConnectionClosed, recover val Array[DiagRecord] end, sql))
    end

  be query(sql: String val,
    promise: Promise[(Array[Row val] val | ExecError)]) =>
    """
    Execute a SELECT and fulfill the promise with all rows.
    Fetches all rows into memory.
    """
    match \exhaustive\ _conn
    | let conn: Connection =>
      match \exhaustive\ conn.query(sql)
      | let cursor: Cursor =>
        let rows = recover iso Array[Row val] end
        while true do
          match \exhaustive\ cursor.fetch()
          | let row: Row => rows.push(row)
          | EndOfRows => break
          | let e: FetchError =>
            cursor.close()
            let ediag = e.unsafe_diag()
            promise(ExecError(
              ExecErrorClassifier.classify(ediag), ediag))
            return
          end
        end
        cursor.close()
        promise(consume rows)
      | let e: ExecError =>
        promise(e)
      end
    | let e: ConnectError =>
      promise(ExecError(
        ConnectionClosed, recover val Array[DiagRecord] end, sql))
    end

  be begin(promise: Promise[(TxBegun | TxBeginError)]) =>
    """
    Begin a transaction.
    """
    match \exhaustive\ _conn
    | let conn: Connection => promise(conn.begin())
    | let _: ConnectError =>
      promise(TxBeginError(TxBeginConnectionClosed))
    end

  be commit(promise: Promise[(TxCommitted | TxCommitError)]) =>
    """
    Commit the current transaction.
    """
    match \exhaustive\ _conn
    | let conn: Connection => promise(conn.commit())
    | let _: ConnectError =>
      promise(TxCommitError(NotInTransaction))
    end

  be rollback(promise: Promise[(TxRolledBack | TxRollbackError)]) =>
    """
    Rollback the current transaction.
    """
    match \exhaustive\ _conn
    | let conn: Connection => promise(conn.rollback())
    | let _: ConnectError =>
      promise(TxRollbackError(RollbackNotInTransaction))
    end

  be close() =>
    """
    Close the underlying connection.
    """
    match _conn
    | let conn: Connection => conn.close()
    end