Dan Davis

PNC Virtual Wallet Statement Parser

For some reason PNC Bank will only give me my bank statements as PDFs. This makes it annoying to import transactions into my accounting software GnuCash.

A few years ago I wrote a regex parser that pulled transactions from the text of the PDF. Recently I've been doing a lot of structured data extraction using LLM's and I wanted to see how well they could do.

Turns out gpt-4o and gpt-4o-mini can zero shot this task. I've included the code below so anyone can try it out on their own statements.

"""Extracts transactions from pdf PNC Virtual Wallet Statements.

Pulls the balance summary from statement and then validates that the sum of
all transaction deposits and withdrawals matches the balance summary deposits
and withdrawals.

Python and pdfplumber are the only dependencies
$ pip install pdfplumber

Extract transactions from all statements in a directory
$ python pnc_vw.py ~/Documents/accounting/2020/statements/

Extract transactions from a single statement
$ python pnc_vw.py ~/Documents/accounting/2020/statements/Statement_Feb_19_2020.pdf
"""

import argparse
import csv
import itertools
import re
import sys
from datetime import date as Date
from decimal import Decimal
from functools import partial
from pathlib import Path

import instructor
import pdfplumber
from openai import OpenAI
from pydantic.json_schema import SkipJsonSchema
from pydantic import BaseModel, ValidationError, model_validator, Field


SUCCESS = 0
PARSING_ERROR = 1
VALIDATION_ERROR = 2
UNKNOWN_ERROR = 3
ARGUMENT_ERROR = 4


class ParsingError(Exception):
    pass


class BalanceSummary(BaseModel):
    beginning: Decimal = Field(
        description="Account balance at the start of the statement period"
    )
    deposits: Decimal = Field(description="Total deposits over the statement period")
    deductions: Decimal = Field(
        description="Total withdrawals over the statement period"
    )
    ending: Decimal = Field(
        description="Account balance at the end of the statement period"
    )

    @classmethod
    def from_slice(cls, slice):
        if len(slice) != 4:
            raise ValueError("Should be exactly 4 values")
        return cls(
            beginning=Decimal(slice[0]),
            deposits=Decimal(slice[1]),
            deductions=Decimal(slice[2]),
            ending=Decimal(slice[3]),
        )


class Transaction(BaseModel):
    date: Date
    description: str
    deposit: Decimal = Field(default=Decimal(0))
    withdrawal: Decimal = Field(default=Decimal(0))
    account: SkipJsonSchema[str] = "Assets:Current Assets:Checking Account"

    @classmethod
    def from_check(cls, num: int, date: Date, amount: Decimal):
        return cls(
            date=date,
            description=f"Check #{num}",
            withdrawal=amount,
        )


class Period(BaseModel):
    start: Date
    end: Date


class VWStatement(BaseModel):
    account_number: str
    period: Period = Field(
        description="The start and end of the current statement period"
    )
    balance_summary: BalanceSummary
    transactions: list[Transaction]

    @model_validator(mode="after")
    def run_the_numbers(self):
        deposits, withdrawals = Decimal(0), Decimal(0)
        for t in self.transactions:
            deposits += t.deposit
            withdrawals += t.withdrawal

        bs = self.balance_summary
        if deposits != bs.deposits:
            error_msg = (
                f"Deposits of {deposits} != balance summary deposits of {bs.deposits}"
            )
            raise ValueError(error_msg)
        elif withdrawals != bs.deductions:
            error_msg = f"Withdrawals of {withdrawals} != balance summary withdrawals of {bs.deductions}"
            raise ValueError(error_msg)
        elif bs.beginning + deposits - withdrawals != bs.ending:
            error_msg = "WTF"
            raise ValueError(error_msg)

        return self


def tokenize(text: str) -> list[tuple[str, str]]:
    fluff_words = [
        "Amount",
        "continued",
        "Description",
        "Primary",
        "account number",
        "Banking",
        "is",
        "Date",
        "Daily",
        "Balance",
        "totaling",
        "Additions",
        "Member FDIC Equal Housing Lender",
        "Deposits and Other",
        "on next page",
        "Virtual Wallet Spend Statement",
        "Online or Electronic",
        "Online and Electronic",
        "Deductions",
        "Detail",
        "other",
        "deductions",
        "For the period",
    ]
    token_specification = [
        ("DATE", r"\d\d/\d\d"),  # 01/12
        (
            "BALANCE_SUMMARY",
            r"(?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+)",
        ),
        ("AMOUNT", r"(?:\d+\.\d+|\.\d+)"),  # 100.00 or .08
        ("ACCOUNT_NUMBER", r"(?<=Account Number: )\d+-\d+-\d+"),
        ("PERIOD", r"For the period \d\d/\d\d/\d\d\d\d to \d\d/\d\d/\d\d\d\d"),
        (
            "BOILERPLATE",
            r"Virtual Wallet Spend Statement PNC Bank.*Virtual Wallet Spend Account Summary",
        ),
        ("BOILERPLATE2", r"Overdraft Protection.*Balance Summary"),
        ("THERE", r"(?:There were \d+|There is \d+)"),
        ("TOTALING", r"totaling \$\d+\.\d+"),
        ("PAGE", r"Page \d+ of \d+"),
        ("FLUFF", r"(?:" + "|".join(fluff_words) + ")"),
        ("WORD", r"\w+"),
        ("DOLLAR", r"\$\d+\.\d+"),
        ("NEWLINE", r"\n"),  # Line endings
        ("MISMATCH", r"."),  # Any other character
    ]
    tok_regex = "|".join("(?P<%s>%s)" % pair for pair in token_specification)
    # for text in text_stream:
    tokens = []
    for mo in re.finditer(tok_regex, text, re.DOTALL):
        kind = mo.lastgroup
        value = mo.group()
        if kind not in {
            "BOILERPLATE",
            "BOILERPLATE2",
            "THERE",
            "TOTALING",
            "FLUFF",
            "MISMATCH",
            "NEWLINE",
        }:
            tokens.append((kind, value))
    return tokens


def get_text(path: Path) -> str:
    """Returns text of the PDF as one big string stripped of newlines."""
    with pdfplumber.open(path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text().replace("\n", " ").replace(",", "")
    return text


def _get_period(tokens: list[tuple[str, str]]) -> Period:
    for kind, value in tokens:
        if kind == "PERIOD":
            words = value.split(" ")
            _from, _to = words[3].split("/"), words[5].split("/")
            m, d, y = [int(v) for v in _from]
            start = Date(y, m, d)
            m, d, y = [int(v) for v in _to]
            end = Date(y, m, d)
            return Period(start=start, end=end)
    else:
        raise ParsingError("Did not extract period (start: date, end: date)")


def _get_date(date_str: str, period: Period) -> Date:
    m, d = [int(v) for v in date_str.split("/")]
    start, end = period.start, period.end
    y = start.year if m == 12 else end.year
    return Date(y, m, d)


def _is_transaction_description(token: tuple[str, str]) -> bool:
    kind, word = token
    if kind == "DATE":
        return False
    termination_words = ["Checks", "Machine", "Withdrawals", "For"]
    if word in termination_words:
        return False
    return True


def extract_with_regex(text: str) -> VWStatement:
    balance_summary, account_number = None, None
    tokens = tokenize(text)
    period = _get_period(tokens)

    transactions = []
    for n, token in enumerate(tokens):

        kind_0, value_0 = token

        if kind_0 == "BALANCE_SUMMARY":
            balance_summary = BalanceSummary.from_slice(value_0.split(" "))
        elif kind_0 == "ACCOUNT_NUMBER":
            account_number = value_0

        if n >= len(tokens) - 2:
            continue

        kind_1, value_1 = tokens[n + 1]
        kind_2, value_2 = tokens[n + 2]

        if (kind_0, kind_1, kind_2) == ("WORD", "AMOUNT", "DATE"):
            check_num = int(value_0)
            date = _get_date(value_2, period)
            transaction = Transaction.from_check(check_num, date, Decimal(value_1))
            transactions.append(transaction)
        elif (kind_0, kind_1, kind_2) == ("DATE", "AMOUNT", "WORD"):
            date = _get_date(value_0, period)
            maybe_description = tokens[n + 2 :]
            description_words = (
                value
                for _, value in itertools.takewhile(
                    _is_transaction_description, maybe_description
                )
            )
            description = " ".join(description_words)
            if (
                "Deposit" in description
                or "Debit Card Credit" in description
                or "Transfer From" in description
                or "Rtp Received Venmo" in description
            ):
                transaction = Transaction(
                    date=date, description=description, deposit=Decimal(value_1)
                )
            else:
                transaction = Transaction(
                    date=date, description=description, withdrawal=Decimal(value_1)
                )
            transactions.append(transaction)
        else:
            continue

    if not account_number:
        raise ParsingError("account_number")
    if not period:
        raise ParsingError("period")
    if not balance_summary:
        raise ParsingError("balance_summary")
    if not transactions:
        raise ParsingError("transactions")

    statement = VWStatement(
        account_number=account_number,
        period=period,
        balance_summary=balance_summary,
        transactions=transactions,
    )
    return statement


def extract_with_gpt(text: str, model: str) -> VWStatement:
    """Use LLM to do the work"""

    # Try to be reproducible
    seed = 32**3
    top_p = 0.1

    client = instructor.from_openai(OpenAI())
    statement = client.chat.create(
        model=model,
        seed=seed,
        top_p=top_p,
        messages=[
            {
                "role": "system",
                "content": "Your task is to extract data from a PNC Bank Virtual Wallet statement.",
            },
            {"role": "user", "content": text},
        ],
        response_model=VWStatement,  # type: ignore[]
    )
    return statement


def write_json(statement: VWStatement):
    for t in statement.transactions:
        sys.stdout.write(t.model_dump_json())


def write_csv(statement: VWStatement):
    headers = ["date", "description", "deposit", "withdrawal", "account"]
    writer = csv.writer(sys.stdout)
    writer.writerow(headers)
    for t in statement.transactions:
        row = [t.date, t.description, t.deposit, t.withdrawal, t.account]
        writer.writerow(row)


def main():
    parser = argparse.ArgumentParser(description="Process some arguments.")
    parser.add_argument("path", type=str, help="The path to a file or directory.")
    parser.add_argument("--gpt", type=str, help="Any chatgpt model")
    parser.add_argument("--quiet", "-q", action="store_true", help="suppress output")
    parser.add_argument(
        "--directory", "-d", type=str, help="Optional directory to write transaction to"
    )
    parser.add_argument(
        "--json",
        action="store_true",
        help="Write transactions out as json instead of csv",
    )
    args = parser.parse_args()

    path = Path(args.path)
    assert path.exists()

    if path.is_dir():
        texts = (get_text(file) for file in path.iterdir())
    else:
        texts = [get_text(path)]

    if args.gpt:
        model = {
            "gpt4omini": "gpt-4o-mini-2024-07-18",
            "gpt4o": "gpt-4o-2024-08-06",
            "gpt-4o-mini": "gpt-4o-mini-2024-07-18",
            "gpt-4o": "gpt-4o-2024-08-06",
            "4o-mini": "gpt-4o-mini-2024-07-18",
            "4o": "gpt-4o-2024-08-06",
        }.get(args.gpt)
        if not model:
            print(f"Invalid GPT model: {args.gpt}")
            sys.exit(ARGUMENT_ERROR)
        extract_func = partial(extract_with_gpt, model=model)
    else:
        extract_func = extract_with_regex

    if args.json:
        write_func = write_json
    else:
        write_func = write_csv

    for text in texts:
        try:
            statement = extract_func(text)
        except ParsingError as e:
            print(f"ParsingError: {e}")
            sys.exit(PARSING_ERROR)
        except ValidationError as e:
            print(f"ValidationError: {e}")
            sys.exit(VALIDATION_ERROR)
        except Exception as e:
            print(f"Exception: {e}")
            sys.exit(UNKNOWN_ERROR)

        if not args.quiet:
            write_func(statement)
    sys.exit(SUCCESS)


if __name__ == "__main__":
    main()