Implementation Details — JobsAustria Pipeline¶
Stage 1b — Full Refresh (Built, needs improvements)¶
Class: JobsAustriaETLCache in jobs_austria_cache_import.py
What it does: Fires one or more Apify actors with a batch of AMS URLs, streams results asynchronously via a producer/consumer queue, and inserts raw items into scrape_cache using INSERT IGNORE.
Known issues:
- Engine is created twice (in __init__ and in connect())
- run() has inconsistent indentation
- No stop-signal mechanism yet for incremental mode
- Hardcoded local path in __main__ block
Stage 2 — Payload Sync (In progress)¶
Classes: JobsAustriaCacheProcess (cache_process_data_payload.py) + JobsAustriaCacheSynchronizer (cache_synchronizer.py)
What it does:
1. Reads scrape_cache rows where fk_job_id IS NULL
2. Unpacks data_payload JSON — extracts url, position, company, location, dates
3. Inserts new rows into jobs table (deduplicates via url_hash)
4. Writes jobs.id back into scrape_cache.fk_job_id (marks row as processed)
5. Enriches jobs with company_id, location_id, publication_date, portal
Known issues:
- JobsAustriaCacheProcess and JobsAustriaCacheSynchronizer overlap — need to consolidate into one class
- process_once() does too many things — needs to be split into focused helpers
- _extract_portal() and _parse_date() duplicated across files — move to utils/parsing.py
- cache_key_sync.py (JobsAustriaCacheProcessRework) is a leftover draft — safe to delete
Stage 3 — Detail Enrichment (In progress)¶
Class: JobsAustriaDetailsETL in jobs_austria_details_scraping.py
What it does:
1. Polls jobs for rows where order_number IS NULL
2. Routes URLs by portal via PortalRouter (currently only AMS supported)
3. Fires Apify detail actors in batches of 100 URLs, max 3 concurrent
4. Streams results back and writes to jobs (order_number, education, salary, employment_relationship)
5. Inserts job descriptions into descriptions table
Known issues:
- _fetch_pending_urls() creates its own engine instead of reusing self.engine
- PortalRouter._PORTAL_INPUTS accessed directly from outside the class
- _extract_portal() duplicated from CacheSynchronizer — move to shared utils
- Only AMS portal supported — other portals silently grouped as unknown and skipped
Future extension points:
- Add crawl4ai_jobs run_input to PortalRouter._PORTAL_INPUTS for non-AMS portals
- Add salary scraping from external salary sites
Stage 4 — Additional Info (Not built)¶
Planned enrichments: - LinkedIn company/job data - Company firmographic details (size, industry, founded year) - Possibly salary benchmarks from external sources
Architecture: Same polling pattern as stages 2 and 3 — a new class watches for null columns in jobs and fires the appropriate Apify actor.
Shared utilities to extract (refactor task)¶
These functions currently exist in multiple files and should be moved to utils/parsing.py:
_extract_portal(url)— extracts portal name from URL hostname_parse_date(val)— parses DD.MM.YYYY strings into date objects_str_or_none(val)— strips strings, returns None if empty or NaN