Stage 2 — Payload Sync Class Diagram¶
Status: 🟠 In Progress
classDiagram
class JobsAustriaCacheSynchronizer {
-db_url: str
-engine: Engine
+synchronize() None
+run_cycle() bool
+synchronize_fk_id(df) DataFrame
+synchronize_company_id(df_enriched) None
-_fetch_fk_pending_batch() DataFrame
-_fetch_payload_pending_batch() DataFrame
-_bulk_update_scrape_cache_fk(to_update) None
-_unpack_payload(df) DataFrame
-_sync_companies(df) DataFrame
-_sync_locations(df) DataFrame
-_update_jobs(df) None
-_extract_portal(url_str)$ str
-_parse_date(val)$ date
-_str_or_none(val)$ str
}
class scrape_cache {
<<MySQL Table>>
+id
+url
+url_hash
+data_payload
+fk_job_id
+scraped_at
}
class jobs {
<<MySQL Table>>
+id
+url
+url_hash
+position
+company_id
+location_id
+publication_date
+portal
}
class companies {
<<MySQL Table>>
+id
+company_crawler_name
}
class locations {
<<MySQL Table>>
+id
+zipcode
}
JobsAustriaCacheSynchronizer ..> scrape_cache : reads + UPDATE fk_job_id
JobsAustriaCacheSynchronizer ..> jobs : UPDATE company_id, location_id, portal...
JobsAustriaCacheSynchronizer ..> companies : INSERT IGNORE + lookup
JobsAustriaCacheSynchronizer ..> locations : lookup zipcode → id
click JobsAustriaCacheSynchronizer href "../../src/pipelines/JobsAustria/jobs_austria_cache_synchronizer.py"
Responsibilities¶
Two-queue design:
- Queue 1 —
synchronize_fk_id(): matchesscrape_cacherows tojobsviaurl_hash, writesjobs.idback intoscrape_cache.fk_job_id. Marks the row as processed. - Queue 2 —
synchronize_company_id(): unpacks fulldata_payloadJSON, syncs companies, maps locations via zipcode, bulk updatesjobswithcompany_id,location_id,publication_date,portal,salary,education.
Polling loop runs every 5s and stops automatically when both queues are empty.
Known issues / refactor targets¶
_extract_portal(),_parse_date(),_str_or_none()are duplicated injobs_austria_details_scraping.py— move toutils/parsing.py_update_jobs()builds the record dict inline insideitertuples()— extract to_build_job_record(row)_sync_companies()and_sync_locations()follow identical lookup-insert-remap pattern — could be a generic helper
Source files¶
src/pipelines/JobsAustria/jobs_austria_cache_synchronizer.py