PNC Virtual Wallet Statement Parser
For some reason PNC Bank will only give me my bank statements as PDFs. This makes it annoying to import transactions into my accounting software GnuCash.
A few years ago I wrote a regex parser that pulled transactions from the text of the PDF. Recently I've been doing a lot of structured data extraction using LLM's and I wanted to see how well they could do.
Turns out gpt-4o
and gpt-4o-mini
can zero shot this task.
I've included the code below so anyone can try it out on their own statements.
"""Extracts transactions from pdf PNC Virtual Wallet Statements.
Pulls the balance summary from statement and then validates that the sum of
all transaction deposits and withdrawals matches the balance summary deposits
and withdrawals.
Python and pdfplumber are the only dependencies
$ pip install pdfplumber
Extract transactions from all statements in a directory
$ python pnc_vw.py ~/Documents/accounting/2020/statements/
Extract transactions from a single statement
$ python pnc_vw.py ~/Documents/accounting/2020/statements/Statement_Feb_19_2020.pdf
"""
import argparse
import csv
import itertools
import re
import sys
from datetime import date as Date
from decimal import Decimal
from functools import partial
from pathlib import Path
import instructor
import pdfplumber
from openai import OpenAI
from pydantic.json_schema import SkipJsonSchema
from pydantic import BaseModel, ValidationError, model_validator, Field
SUCCESS = 0
PARSING_ERROR = 1
VALIDATION_ERROR = 2
UNKNOWN_ERROR = 3
ARGUMENT_ERROR = 4
class ParsingError(Exception):
pass
class BalanceSummary(BaseModel):
beginning: Decimal = Field(
description="Account balance at the start of the statement period"
)
deposits: Decimal = Field(description="Total deposits over the statement period")
deductions: Decimal = Field(
description="Total withdrawals over the statement period"
)
ending: Decimal = Field(
description="Account balance at the end of the statement period"
)
@classmethod
def from_slice(cls, slice):
if len(slice) != 4:
raise ValueError("Should be exactly 4 values")
return cls(
beginning=Decimal(slice[0]),
deposits=Decimal(slice[1]),
deductions=Decimal(slice[2]),
ending=Decimal(slice[3]),
)
class Transaction(BaseModel):
date: Date
description: str
deposit: Decimal = Field(default=Decimal(0))
withdrawal: Decimal = Field(default=Decimal(0))
account: SkipJsonSchema[str] = "Assets:Current Assets:Checking Account"
@classmethod
def from_check(cls, num: int, date: Date, amount: Decimal):
return cls(
date=date,
description=f"Check #{num}",
withdrawal=amount,
)
class Period(BaseModel):
start: Date
end: Date
class VWStatement(BaseModel):
account_number: str
period: Period = Field(
description="The start and end of the current statement period"
)
balance_summary: BalanceSummary
transactions: list[Transaction]
@model_validator(mode="after")
def run_the_numbers(self):
deposits, withdrawals = Decimal(0), Decimal(0)
for t in self.transactions:
deposits += t.deposit
withdrawals += t.withdrawal
bs = self.balance_summary
if deposits != bs.deposits:
error_msg = (
f"Deposits of {deposits} != balance summary deposits of {bs.deposits}"
)
raise ValueError(error_msg)
elif withdrawals != bs.deductions:
error_msg = f"Withdrawals of {withdrawals} != balance summary withdrawals of {bs.deductions}"
raise ValueError(error_msg)
elif bs.beginning + deposits - withdrawals != bs.ending:
error_msg = "WTF"
raise ValueError(error_msg)
return self
def tokenize(text: str) -> list[tuple[str, str]]:
fluff_words = [
"Amount",
"continued",
"Description",
"Primary",
"account number",
"Banking",
"is",
"Date",
"Daily",
"Balance",
"totaling",
"Additions",
"Member FDIC Equal Housing Lender",
"Deposits and Other",
"on next page",
"Virtual Wallet Spend Statement",
"Online or Electronic",
"Online and Electronic",
"Deductions",
"Detail",
"other",
"deductions",
"For the period",
]
token_specification = [
("DATE", r"\d\d/\d\d"), # 01/12
(
"BALANCE_SUMMARY",
r"(?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+) (?:\d+\.\d+|\.\d+)",
),
("AMOUNT", r"(?:\d+\.\d+|\.\d+)"), # 100.00 or .08
("ACCOUNT_NUMBER", r"(?<=Account Number: )\d+-\d+-\d+"),
("PERIOD", r"For the period \d\d/\d\d/\d\d\d\d to \d\d/\d\d/\d\d\d\d"),
(
"BOILERPLATE",
r"Virtual Wallet Spend Statement PNC Bank.*Virtual Wallet Spend Account Summary",
),
("BOILERPLATE2", r"Overdraft Protection.*Balance Summary"),
("THERE", r"(?:There were \d+|There is \d+)"),
("TOTALING", r"totaling \$\d+\.\d+"),
("PAGE", r"Page \d+ of \d+"),
("FLUFF", r"(?:" + "|".join(fluff_words) + ")"),
("WORD", r"\w+"),
("DOLLAR", r"\$\d+\.\d+"),
("NEWLINE", r"\n"), # Line endings
("MISMATCH", r"."), # Any other character
]
tok_regex = "|".join("(?P<%s>%s)" % pair for pair in token_specification)
# for text in text_stream:
tokens = []
for mo in re.finditer(tok_regex, text, re.DOTALL):
kind = mo.lastgroup
value = mo.group()
if kind not in {
"BOILERPLATE",
"BOILERPLATE2",
"THERE",
"TOTALING",
"FLUFF",
"MISMATCH",
"NEWLINE",
}:
tokens.append((kind, value))
return tokens
def get_text(path: Path) -> str:
"""Returns text of the PDF as one big string stripped of newlines."""
with pdfplumber.open(path) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text().replace("\n", " ").replace(",", "")
return text
def _get_period(tokens: list[tuple[str, str]]) -> Period:
for kind, value in tokens:
if kind == "PERIOD":
words = value.split(" ")
_from, _to = words[3].split("/"), words[5].split("/")
m, d, y = [int(v) for v in _from]
start = Date(y, m, d)
m, d, y = [int(v) for v in _to]
end = Date(y, m, d)
return Period(start=start, end=end)
else:
raise ParsingError("Did not extract period (start: date, end: date)")
def _get_date(date_str: str, period: Period) -> Date:
m, d = [int(v) for v in date_str.split("/")]
start, end = period.start, period.end
y = start.year if m == 12 else end.year
return Date(y, m, d)
def _is_transaction_description(token: tuple[str, str]) -> bool:
kind, word = token
if kind == "DATE":
return False
termination_words = ["Checks", "Machine", "Withdrawals", "For"]
if word in termination_words:
return False
return True
def extract_with_regex(text: str) -> VWStatement:
balance_summary, account_number = None, None
tokens = tokenize(text)
period = _get_period(tokens)
transactions = []
for n, token in enumerate(tokens):
kind_0, value_0 = token
if kind_0 == "BALANCE_SUMMARY":
balance_summary = BalanceSummary.from_slice(value_0.split(" "))
elif kind_0 == "ACCOUNT_NUMBER":
account_number = value_0
if n >= len(tokens) - 2:
continue
kind_1, value_1 = tokens[n + 1]
kind_2, value_2 = tokens[n + 2]
if (kind_0, kind_1, kind_2) == ("WORD", "AMOUNT", "DATE"):
check_num = int(value_0)
date = _get_date(value_2, period)
transaction = Transaction.from_check(check_num, date, Decimal(value_1))
transactions.append(transaction)
elif (kind_0, kind_1, kind_2) == ("DATE", "AMOUNT", "WORD"):
date = _get_date(value_0, period)
maybe_description = tokens[n + 2 :]
description_words = (
value
for _, value in itertools.takewhile(
_is_transaction_description, maybe_description
)
)
description = " ".join(description_words)
if (
"Deposit" in description
or "Debit Card Credit" in description
or "Transfer From" in description
or "Rtp Received Venmo" in description
):
transaction = Transaction(
date=date, description=description, deposit=Decimal(value_1)
)
else:
transaction = Transaction(
date=date, description=description, withdrawal=Decimal(value_1)
)
transactions.append(transaction)
else:
continue
if not account_number:
raise ParsingError("account_number")
if not period:
raise ParsingError("period")
if not balance_summary:
raise ParsingError("balance_summary")
if not transactions:
raise ParsingError("transactions")
statement = VWStatement(
account_number=account_number,
period=period,
balance_summary=balance_summary,
transactions=transactions,
)
return statement
def extract_with_gpt(text: str, model: str) -> VWStatement:
"""Use LLM to do the work"""
# Try to be reproducible
seed = 32**3
top_p = 0.1
client = instructor.from_openai(OpenAI())
statement = client.chat.create(
model=model,
seed=seed,
top_p=top_p,
messages=[
{
"role": "system",
"content": "Your task is to extract data from a PNC Bank Virtual Wallet statement.",
},
{"role": "user", "content": text},
],
response_model=VWStatement, # type: ignore[]
)
return statement
def write_json(statement: VWStatement):
for t in statement.transactions:
sys.stdout.write(t.model_dump_json())
def write_csv(statement: VWStatement):
headers = ["date", "description", "deposit", "withdrawal", "account"]
writer = csv.writer(sys.stdout)
writer.writerow(headers)
for t in statement.transactions:
row = [t.date, t.description, t.deposit, t.withdrawal, t.account]
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(description="Process some arguments.")
parser.add_argument("path", type=str, help="The path to a file or directory.")
parser.add_argument("--gpt", type=str, help="Any chatgpt model")
parser.add_argument("--quiet", "-q", action="store_true", help="suppress output")
parser.add_argument(
"--directory", "-d", type=str, help="Optional directory to write transaction to"
)
parser.add_argument(
"--json",
action="store_true",
help="Write transactions out as json instead of csv",
)
args = parser.parse_args()
path = Path(args.path)
assert path.exists()
if path.is_dir():
texts = (get_text(file) for file in path.iterdir())
else:
texts = [get_text(path)]
if args.gpt:
model = {
"gpt4omini": "gpt-4o-mini-2024-07-18",
"gpt4o": "gpt-4o-2024-08-06",
"gpt-4o-mini": "gpt-4o-mini-2024-07-18",
"gpt-4o": "gpt-4o-2024-08-06",
"4o-mini": "gpt-4o-mini-2024-07-18",
"4o": "gpt-4o-2024-08-06",
}.get(args.gpt)
if not model:
print(f"Invalid GPT model: {args.gpt}")
sys.exit(ARGUMENT_ERROR)
extract_func = partial(extract_with_gpt, model=model)
else:
extract_func = extract_with_regex
if args.json:
write_func = write_json
else:
write_func = write_csv
for text in texts:
try:
statement = extract_func(text)
except ParsingError as e:
print(f"ParsingError: {e}")
sys.exit(PARSING_ERROR)
except ValidationError as e:
print(f"ValidationError: {e}")
sys.exit(VALIDATION_ERROR)
except Exception as e:
print(f"Exception: {e}")
sys.exit(UNKNOWN_ERROR)
if not args.quiet:
write_func(statement)
sys.exit(SUCCESS)
if __name__ == "__main__":
main()