Overview
Storage layer for PR analytics records with concurrency-safe update operations, provided by the OpenHands enterprise storage layer.
Description
OpenhandsPRStore manages the persistence of pull request analytics data. It tracks PRs that OpenHands has interacted with and records statistics about its processing activity.
The store provides four key operations: insert_pr() creates a new PR record, increment_process_attempts() safely bumps the retry counter, update_pr_openhands_stats() updates analytics fields using SELECT...FOR UPDATE to prevent lost updates under concurrent access, and get_unprocessed_prs() retrieves PRs that have not yet been fully analyzed.
The use of SELECT...FOR UPDATE in update_pr_openhands_stats() is critical for data integrity. Because multiple workers or background jobs may attempt to update the same PR record simultaneously, the row-level lock ensures that concurrent updates are serialized rather than overwriting each other. This is a standard pessimistic locking pattern for counters and aggregate statistics.
Usage
Use OpenhandsPRStore in background analytics pipelines that process PR data. The typical workflow is: retrieve unprocessed PRs with get_unprocessed_prs(), process each one, and update stats with update_pr_openhands_stats(). If processing fails partway, increment_process_attempts() tracks how many times the system has tried.
Code Reference
Source Location
Signature
class OpenhandsPRStore:
def __init__(self, session: Session):
...
def insert_pr(self, pr_data: dict) -> OpenhandsPR:
...
def increment_process_attempts(self, pr_id: str) -> None:
...
def update_pr_openhands_stats(self, pr_id: str, stats: dict) -> OpenhandsPR:
"""Uses SELECT...FOR UPDATE to prevent concurrent overwrites."""
...
def get_unprocessed_prs(self, limit: int = 100) -> List[OpenhandsPR]:
...
Import
from enterprise.storage.openhands_pr_store import OpenhandsPRStore
I/O Contract
Inputs
Constructor
| Name |
Type |
Required |
Description
|
| session |
Session |
Yes |
SQLAlchemy database session for executing queries
|
insert_pr()
| Name |
Type |
Required |
Description
|
| pr_data |
dict |
Yes |
Dictionary containing PR metadata (e.g., repo, pr_number, author, created_at)
|
increment_process_attempts()
| Name |
Type |
Required |
Description
|
| pr_id |
str |
Yes |
The ID of the PR record to increment the attempt counter for
|
update_pr_openhands_stats()
| Name |
Type |
Required |
Description
|
| pr_id |
str |
Yes |
The ID of the PR record to update
|
| stats |
dict |
Yes |
Dictionary of analytics fields to update (e.g., lines_changed, files_touched)
|
get_unprocessed_prs()
| Name |
Type |
Required |
Description
|
| limit |
int |
No |
Maximum number of unprocessed PRs to return (default: 100)
|
Outputs
| Method |
Return Type |
Description
|
| insert_pr() |
OpenhandsPR |
The newly created PR analytics record
|
| increment_process_attempts() |
None |
Increments the attempt counter in place
|
| update_pr_openhands_stats() |
OpenhandsPR |
The updated PR record with new stats (row-locked during update)
|
| get_unprocessed_prs() |
List[OpenhandsPR] |
List of PR records that have not been fully processed
|
Usage Examples
Inserting and Processing PRs
from enterprise.storage.openhands_pr_store import OpenhandsPRStore
store = OpenhandsPRStore(session=db_session)
# Record a new PR
pr = store.insert_pr(pr_data={
"repo": "All-Hands-AI/OpenHands",
"pr_number": 1234,
"author": "contributor",
"created_at": "2026-01-15T10:00:00Z"
})
# Later, in a background pipeline:
unprocessed = store.get_unprocessed_prs(limit=50)
for pr in unprocessed:
try:
stats = analyze_pr(pr)
store.update_pr_openhands_stats(pr_id=pr.id, stats=stats)
except Exception:
store.increment_process_attempts(pr_id=pr.id)
Concurrency-Safe Stats Update
# The SELECT...FOR UPDATE ensures that concurrent workers
# do not overwrite each other's stats updates
updated_pr = store.update_pr_openhands_stats(
pr_id="pr-abc-123",
stats={
"lines_changed": 42,
"files_touched": 5,
"processing_complete": True
}
)
Related Pages
Environment