Files
KARL/gitea_bot/poller.py
Kalle Bracht e18680544b Updates AI comment
The review comment was just all files with the findings, wich is redundant to the actual comment and takes a lot of space.

Co-authored-by: Copilot <copilot@github.com>
2026-05-02 10:56:53 +02:00

312 lines
11 KiB
Python

import os
import time
import json
import re
from pathlib import Path
from typing import List, Optional
import requests
from gitea_client import GiteaClient
from gemini_client import GeminiClient
from dotenv import load_dotenv
# Load environment variables from parent directory .env (project root)
env_path = Path(__file__).resolve().parents[1] / ".env"
if env_path.exists():
load_dotenv(dotenv_path=env_path)
# Configuration
API_URL = os.getenv("GITEA_API_URL")
TOKEN = os.getenv("GITEA_TOKEN")
BOT = os.getenv("BOT_USERNAME")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "60"))
POLL_OWNER = os.getenv("POLL_OWNER")
POLL_REPOS = os.getenv("POLL_REPOS") # comma-separated owner/repo
ROOT = Path(__file__).resolve().parent.parent
SEEN_PATH = ROOT / ".poller_seen.json"
if not (API_URL and TOKEN and BOT):
raise RuntimeError("GITEA_API_URL, GITEA_TOKEN and BOT_USERNAME must be set for poller")
gitea = GiteaClient(API_URL, TOKEN)
gemini = GeminiClient()
def load_seen() -> set:
if SEEN_PATH.exists():
try:
with open(SEEN_PATH, "r", encoding="utf-8") as f:
return set(tuple(x) for x in json.load(f))
except Exception:
return set()
return set()
def save_seen(seen: set):
with open(SEEN_PATH, "w", encoding="utf-8") as f:
json.dump([list(x) for x in seen], f)
def build_prompt_from_file(file_dict: dict) -> str:
"""Build a structured prompt for reviewing a single file diff."""
filename = file_dict.get("filename") or file_dict.get("path") or "unknown"
patch = file_dict.get("patch") or file_dict.get("diff") or ""
if len(patch) > 30000:
patch = patch[:30000] + "\n...TRUNCATED..."
prompt = (
"You are a senior code reviewer. Analyze exactly one file diff and return ONLY JSON.\n"
"You review C++ code with the Qt framework\n"
"Rules:\n"
"1) Only report real issues or actionable improvements.\n"
"2) Use diff positions (line index in the unified diff hunk) for comment anchoring.\n"
"3) Keep each comment short and specific.\n"
"4) If there are no findings, return an empty findings array.\n\n"
"JSON schema:\n"
"{\n"
" \"summary\": \"short summary\",\n"
" \"findings\": [\n"
" {\n"
" \"diff_position\": 12,\n"
" \"severity\": \"high|medium|low\",\n"
" \"comment\": \"text\"\n"
" }\n"
" ]\n"
"}\n\n"
f"File: {filename}\n"
"Unified diff:\n"
f"{patch}"
)
return prompt
def extract_json_object(text: str) -> Optional[dict]:
"""Extract a JSON object from model output, including fenced JSON blocks."""
if not text:
return None
raw = text.strip()
if raw.startswith("```"):
lines = raw.splitlines()
if len(lines) >= 3 and lines[0].startswith("```") and lines[-1].strip() == "```":
raw = "\n".join(lines[1:-1]).strip()
if raw.startswith("json"):
raw = raw[4:].strip()
try:
data = json.loads(raw)
return data if isinstance(data, dict) else None
except json.JSONDecodeError:
pass
start = raw.find("{")
end = raw.rfind("}")
if start == -1 or end == -1 or end <= start:
return None
candidate = raw[start:end + 1]
try:
data = json.loads(candidate)
return data if isinstance(data, dict) else None
except json.JSONDecodeError:
return None
def parse_structured_review(ai_response: str) -> dict:
"""Parse model output into normalized review structure."""
parsed = extract_json_object(ai_response) or {}
summary = str(parsed.get("summary") or "No summary provided.").strip()
findings_raw = parsed.get("findings") or []
findings = []
if isinstance(findings_raw, list):
for item in findings_raw:
if not isinstance(item, dict):
continue
try:
diff_position = int(item.get("diff_position"))
except (TypeError, ValueError):
continue
comment = str(item.get("comment") or "").strip()
severity = str(item.get("severity") or "low").strip().lower()
if not comment:
continue
findings.append(
{
"diff_position": diff_position,
"severity": severity,
"comment": comment,
}
)
return {"summary": summary, "findings": findings}
def split_unified_diff_by_file(unified_diff: str) -> dict:
"""Split a PR unified diff into per-file diff chunks keyed by new path."""
file_diffs = {}
current_lines: List[str] = []
current_path: Optional[str] = None
def flush_current() -> None:
if current_path and current_lines:
file_diffs[current_path] = "\n".join(current_lines).strip()
for line in unified_diff.splitlines():
if line.startswith("diff --git "):
flush_current()
current_lines = [line]
current_path = None
continue
if current_lines is not None:
current_lines.append(line)
# Example: +++ b/src/main.cpp
if line.startswith("+++ "):
raw_path = line[4:].strip()
if raw_path == "/dev/null":
# Deleted file; fallback to old path if needed.
continue
current_path = raw_path[2:] if raw_path.startswith("b/") else raw_path
# Fallback for rename/deletion edge cases.
if current_path is None and line.startswith("diff --git "):
match = re.match(r"diff --git a/(.+?) b/(.+)", line)
if match:
current_path = match.group(2)
flush_current()
return file_diffs
def handle_assignment(owner: str, repo: str, pr: dict):
pr_number = pr.get("number") or pr.get("index") or pr.get("id")
try:
files = gitea.list_pull_request_files(owner, repo, pr_number)
except Exception as e:
print(f"failed to fetch files for {owner}/{repo}#{pr_number}: {e}")
return False
if not files:
print(f"No files found for {owner}/{repo}#{pr_number}")
return False
# Some Gitea setups return filenames but no patch in /pulls/{n}/files.
fallback_patches = {}
if files and all(not (f.get("patch") or f.get("diff") or "").strip() for f in files):
try:
unified_diff = gitea.get_pull_request_diff(owner, repo, pr_number)
fallback_patches = split_unified_diff_by_file(unified_diff)
print(
f"Loaded fallback unified diff for {owner}/{repo}#{pr_number} "
f"({len(fallback_patches)} file patches)"
)
except Exception as e:
print(f"failed to load fallback diff for {owner}/{repo}#{pr_number}: {e}")
# Analyze each file individually based on its diff.
review_comments: List[dict] = []
file_errors: List[str] = []
for file_dict in files:
filename = file_dict.get("filename") or file_dict.get("path")
if not filename:
continue
patch = file_dict.get("patch") or file_dict.get("diff") or ""
if not patch.strip() and fallback_patches:
patch = fallback_patches.get(filename, "")
if not patch.strip():
file_errors.append(f"**{filename}**: No textual diff available.")
continue
file_for_prompt = dict(file_dict)
file_for_prompt["patch"] = patch
print(f"Analyzing {filename} for {owner}/{repo}#{pr_number}")
prompt = build_prompt_from_file(file_for_prompt)
try:
ai_response = gemini.generate_review(prompt)
parsed_review = parse_structured_review(ai_response)
for finding in parsed_review["findings"]:
severity = finding["severity"].upper()
body = f"[{severity}] {finding['comment']}"
review_comments.append({
"path": filename,
"new_position": finding["diff_position"],
"body": body,
})
except Exception as e:
print(f"failed to generate review for {filename}: {e}")
file_errors.append(f"**{filename}**: Error analyzing file - {e}")
# Create one PR review containing summary + line-anchored comments.
review_body = "### AI Code Review by [Karl der Computer](https://dev.skui.eu/SKUI/KARL)\n"
review_body += "There are three severity levels for comments: HIGH (red) indicates critical issues, MEDIUM (orange) suggests improvements, and LOW (blue) points out minor concerns or style suggestions.\n"
review_body += "Note: AI can make mistakes. Please review carefully.\n"
review_body += "If there are any mistakes, please report to the [issue tracker](https://dev.skui.eu/SKUI/KARL/issues) of Karl\n"
if file_errors:
review_body += "\n#### Issues with file analysis:\n"
review_body += "\n".join(file_errors)
try:
gitea.create_pull_request_review(
owner, repo, pr_number,
body=review_body,
comments=review_comments if review_comments else None
)
print(f"Posted review for {owner}/{repo}#{pr_number} with {len(review_comments)} line comments")
return True
except Exception as e:
print(f"failed to post review for {owner}/{repo}#{pr_number}: {e}")
return False
def run():
seen = load_seen()
print("Starting poller; checking repos...")
try:
while True:
repos = list(gitea.available_repositories())
print(f"Found {len(repos)} accessible repositories")
for owner, repo in repos:
try:
prs = gitea.list_open_pull_requests(owner, repo)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
# Repo exists but is not accessible (permission or deleted)
continue
print(f"failed to list PRs for {owner}/{repo}: {e}")
continue
except Exception as e:
print(f"failed to list PRs for {owner}/{repo}: {e}")
continue
for pr in prs:
key = (f"{owner}/{repo}", pr.get("number"))
reviewers = [r.get("login") or r.get("username") for r in (pr.get("requested_reviewers") or [])]
if BOT in reviewers and key not in seen:
print(f"Detected assignment: {key}")
ok = handle_assignment(owner, repo, pr)
if ok:
seen.add(key)
save_seen(seen)
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
print("Poller stopped")
if __name__ == "__main__":
run()