Hour 8: Retrieving all content
In my last entry, I shared that I'd finally managed to figure out the API powering the Eyad Qunaibi app. Today, I'll retrieve all the content, and save it locally.
We'll then figure out a way to use AI to map that to questions people are seeking answers to via Google, and then create those answers based on the entire corpus of materials. This'll be fun!
Retrieving all the info locally
This bit was fairly straightforward. I simply fed the MITMproxy logs into the o3 model (GPT-4o couldn't handle the format, but o3 could for some reason) and asked it to write the code for me.
The code it came up with was partially correct, and I had to fix a few minor bugs with Cursor.
import argparse, json, os, sys, time
from pathlib import Path
import requests
import shutil # Added for potential future use, e.g. sanitizing filenames
try:
from tqdm import tqdm
except ImportError:
tqdm = None
# ----------------- CONFIG -----------------
BASE = os.getenv("FORQAN_BASE", "https://www.forqan-cast.com")
API = f"{BASE}/api/References"
KEY = os.getenv("FORQAN_KEY")
HEADERS = {"User-Agent": "Dart/3.0", "app-key": KEY}
_DEF_NAMES = {
"Essay": {"false": "مقالات", "true": "سلاسل مقالات"},
"TranslatedEssay": {"false": "مقالات مترجمة"},
"Tweet": {"false": "تغريدات"},
"Video": {"false": "مرئيات", "true": "سلاسل مرئية"},
"ImageComment": {"false": "صور وتعليقات"},
"Book": {"false": "كتب"},
"TranslatedBook": {"false": "كتب مترجمة"},
"Brochure": {"false": "مطويات"},
"NewsComment": {"false": "تعليقات إخبارية"}
}
NAMES = json.loads(os.getenv("FORQAN_NAMES", json.dumps(_DEF_NAMES)))
TYPES = list(NAMES)
OUT = Path("data"); OUT.mkdir(exist_ok=True)
MEDIA_OUT = Path("media_files"); # MEDIA_OUT.mkdir(exist_ok=True) will be in main
# -------------- HELPERS -------------------
def out_path(typ, series=False):
return OUT / f"{typ.lower()}s{'_series' if series else ''}.jsonl"
def write_jsonl(path, obj):
with open(path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
def ref_request(session, params):
r = session.get(API, params=params, timeout=15)
if r.status_code == 401:
raise SystemExit("401 Unauthorized – bad or missing FORQAN_KEY")
r.raise_for_status()
return r.json()
def download_media(session, media_id, target_path, item_title="item"):
"""Downloads media file from /api/Media/{media_id}."""
media_url = f"{BASE}/api/Media/{media_id}"
progress_bar = None
try:
r = session.get(media_url, stream=True, timeout=60) # Increased timeout for media
if r.status_code == 401:
print(f"Warning: 401 Unauthorized for media ID {media_id}. Skipping.", file=sys.stderr)
return False # Indicate failure
r.raise_for_status()
file_size = int(r.headers.get('content-length', 0))
# Setup tqdm progress bar for download
if tqdm:
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc=f"DL {item_title[:30]}...", # Truncate long titles
leave=False
)
with open(target_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if progress_bar:
progress_bar.update(len(chunk))
if progress_bar:
progress_bar.close()
return True # Indicate success
except requests.exceptions.RequestException as e:
print(f"Error downloading media ID {media_id} to {target_path}: {e}", file=sys.stderr)
if target_path.exists(): # Clean up partial file
try:
os.remove(target_path)
except OSError:
pass # Ignore if removal fails
return False # Indicate failure
finally:
if progress_bar:
progress_bar.close()
# ----------- FETCH LOGIC ------------------
def page_iter(session, typ=None, is_series=None, start=0, parent=None, delay=0.2):
"""Yield slice by slice until empty.
If `parent` is given, `childrenOfId` is used to fetch child items."""
count = 20
while True:
if parent:
params = {"childrenOfId": str(parent)}
else:
params = {
"CategoryType": typ,
"CategoryName": NAMES[typ]["true" if is_series else "false"],
"IsSeries": str(is_series).lower(),
"count": str(count),
"startfrom": str(start)
}
data = ref_request(session, params)
if not data:
break
yield from data
start += count
time.sleep(delay)
# ---------------- MAIN PROCESSING FUNCTIONS --------------------
def fetch_data_from_api(args, sess):
seen = set(); total = 0
print("Fetching data from API...")
for typ in TYPES:
print(f"\n### {typ}")
tqdm_common_kwargs = {"unit": "item", "leave": True}
raw_singles_iter = page_iter(sess, typ, False, delay=args.sleep)
if tqdm:
singles_iter_with_progress = tqdm(raw_singles_iter, desc=f"{typ} singles (data)", **tqdm_common_kwargs)
else:
singles_iter_with_progress = raw_singles_iter
for obj in singles_iter_with_progress:
if obj["Id"] in seen:
if tqdm and isinstance(singles_iter_with_progress, tqdm): singles_iter_with_progress.set_postfix_str("seen, skipped")
continue
write_jsonl(out_path(typ), obj)
seen.add(obj["Id"]); total += 1
if tqdm and isinstance(singles_iter_with_progress, tqdm): singles_iter_with_progress.set_postfix_str(f"wrote {obj['Id']}")
if "true" in NAMES[typ]:
parent_ids_and_titles = []
raw_series_iter = page_iter(sess, typ, True, delay=args.sleep)
if tqdm:
series_iter_with_progress = tqdm(raw_series_iter, desc=f"{typ} series (data)", **tqdm_common_kwargs)
else:
series_iter_with_progress = raw_series_iter
for series_obj in series_iter_with_progress:
pid = series_obj["Id"]
series_title = series_obj.get("Title", f"series_{pid}")
if pid not in seen:
write_jsonl(out_path(typ, True), series_obj)
if tqdm and isinstance(series_iter_with_progress, tqdm): series_iter_with_progress.set_postfix_str(f"wrote {pid}")
seen.add(pid)
total +=1
parent_ids_and_titles.append((pid, series_title))
for pid, _ in parent_ids_and_titles: # series_title_for_path not needed here, just pid
raw_ep_iter = page_iter(sess, parent=pid, delay=args.sleep)
ep_desc = f"{typ}:{pid} eps (data)"
if tqdm:
ep_iter_with_progress = tqdm(raw_ep_iter, desc=ep_desc, **tqdm_common_kwargs)
else:
ep_iter_with_progress = raw_ep_iter
for ep in ep_iter_with_progress:
if ep["Id"] in seen:
if tqdm and isinstance(ep_iter_with_progress, tqdm): ep_iter_with_progress.set_postfix_str("seen, skipped")
continue
write_jsonl(out_path(typ), ep)
seen.add(ep["Id"]); total += 1
if tqdm and isinstance(ep_iter_with_progress, tqdm): ep_iter_with_progress.set_postfix_str(f"wrote {ep['Id']}")
print(f"\nFinished fetching data. {total} objects written.")
def download_media_from_local_files(args, sess):
MEDIA_OUT.mkdir(exist_ok=True)
print(f"Downloading media based on local data files. Media will be saved to: {MEDIA_OUT.resolve()}")
parent_id_to_series_title_map = {}
media_types_for_download = frozenset(["Book", "TranslatedBook", "Brochure"])
print("Scanning existing series data for titles...")
# Scan all types for series, as a series of one type might link to an episode that is a book/brochure - though unlikely
for typ_scan in TYPES:
series_file_path = OUT / f"{typ_scan.lower()}s_series.jsonl"
if series_file_path.exists() and "true" in NAMES.get(typ_scan, {}):
try:
with open(series_file_path, "r", encoding="utf-8") as fh_series:
for line in fh_series:
try:
series_obj = json.loads(line)
if "Id" in series_obj: # Title is optional, Id is key
parent_id_to_series_title_map[str(series_obj["Id"])] = series_obj.get("Title", f"series_{series_obj['Id']}")
except json.JSONDecodeError:
# Optional: log this error if needed
pass
except IOError as e:
print(f"Warning: Could not read series file {series_file_path}: {e}", file=sys.stderr)
print(f"Found {len(parent_id_to_series_title_map)} series titles from existing data.")
total_downloaded_media = 0
tqdm_item_kwargs = {"unit": "item", "leave": True}
for typ_process_name in media_types_for_download:
data_file_path = OUT / f"{typ_process_name.lower()}s.jsonl"
print(f"\n### Processing for media: {data_file_path}")
if not data_file_path.exists():
print(f"Data file not found, skipping: {data_file_path}")
continue
num_lines = 0
try:
with open(data_file_path, "r", encoding="utf-8") as f_count:
num_lines = sum(1 for _ in f_count)
except IOError: pass
try:
with open(data_file_path, "r", encoding="utf-8") as fh_data:
file_iterator = fh_data
if tqdm:
file_iterator = tqdm(fh_data, total=num_lines, desc=f"Scanning {typ_process_name} data", **tqdm_item_kwargs)
for line_num, line in enumerate(file_iterator):
try:
item = json.loads(line)
except json.JSONDecodeError:
if tqdm and isinstance(file_iterator, tqdm): file_iterator.set_postfix_str(f"JSON err L{line_num+1}")
# print(f"Warning: JSON decode error in {data_file_path} line {line_num+1}. Skipping.", file=sys.stderr)
continue
item_id = item.get("Id")
# file_name = item.get("FileName") # No longer strictly needed for filename
item_title_for_pbar = item.get("Title", f"Item {item_id}") # Use ID if title missing for pbar
if not item_id: # Only skip if ID is missing
if tqdm and isinstance(file_iterator, tqdm): file_iterator.set_postfix_str("no Id")
continue
# Construct filename from ID
target_file_name = f"{item_id}.pdf"
parent_ref_id = item.get("ParentReferenceId")
item_type_for_path = typ_process_name.lower()
if parent_ref_id:
series_title = parent_id_to_series_title_map.get(str(parent_ref_id), str(parent_ref_id))
sane_series_folder_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in series_title).strip().replace(' ', '_')
if not sane_series_folder_name: sane_series_folder_name = str(parent_ref_id)
media_item_dir = MEDIA_OUT / item_type_for_path / sane_series_folder_name
else:
media_item_dir = MEDIA_OUT / item_type_for_path
media_item_dir.mkdir(parents=True, exist_ok=True)
target_file_path = media_item_dir / target_file_name # Use constructed filename
current_status = "exists"
if not target_file_path.exists():
if tqdm and isinstance(file_iterator, tqdm): file_iterator.set_postfix_str(f"DLing {target_file_name}...")
if download_media(sess, str(item_id), target_file_path, item_title=item_title_for_pbar):
total_downloaded_media += 1
current_status = f"OK: {target_file_name}"
time.sleep(args.sleep)
else:
current_status = f"FAIL: {target_file_name}"
if tqdm and isinstance(file_iterator, tqdm): file_iterator.set_postfix_str(current_status)
except IOError as e:
print(f"Error reading data file {data_file_path}: {e}", file=sys.stderr)
continue
print(f"\nFinished media download attempt. {total_downloaded_media} new media files downloaded.")
# ---------------- MAIN --------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--sleep", type=float, default=0.2)
ap.add_argument(
"--download-type",
choices=["data", "media"],
required=True,
help=(
"Specify the operation mode: "
"\'data\' to fetch/update metadata from the API and save to data/ directory. "
"\'media\' to download actual files (e.g., PDFs for Books, Brochures) based on existing .jsonl files in data/ directory, saving to media_files/."
)
)
args = ap.parse_args()
sess = requests.Session(); sess.headers.update(HEADERS)
if args.download_type == "data":
fetch_data_from_api(args, sess)
elif args.download_type == "media":
download_media_from_local_files(args, sess)
# Original main logic moved to fetch_data_from_api or download_media_from_local_files
# The old loop for typ in TYPES and its sub-logics for singles/series/episodes
# specific to --download-type media has been removed, as media download is now
# handled entirely by download_media_from_local_files.
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit()
The thing works like a charm!
I could now download all the materials in JSON, as well as the PDFs (which will come in handy later when converting to text, and vectorizing later too!)
An hour well spent!