_sync_companies()¶
Class: JobsAustriaCacheSynchronizer
File: jobs_austria_cache_synchronizer.py ยท line 149
Inserts missing company names into the companies table and maps company_id back onto every row. Max 3 DB roundtrips per batch.
Signature¶
| Parameters | df |
| Returns | not annotated |
| Async | No |
| Visibility | Private |
Implementation¶
def _sync_companies(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Inserts missing company names into the companies table and maps
company_id back onto every row. Max 3 DB roundtrips per batch.
"""
if 'company' not in df.columns:
df['company_id'] = None
return df
df['company'] = df['company'].str.strip()
unique_names = [n for n in df['company'].dropna().unique() if n]
if not unique_names:
df['company_id'] = None
return df
lookup_stmt = text(
"SELECT id, company_crawler_name FROM companies "
"WHERE company_crawler_name IN :names"
).bindparams(bindparam("names", expanding=True))
# 1. Batch lookup existing companies
with self.engine.connect() as conn:
rows = conn.execute(lookup_stmt, {"names": unique_names}).fetchall()
lookup = {row.company_crawler_name: row.id for row in rows}
# 2. Insert missing companies
new_names = [n for n in unique_names if n not in lookup]
if new_names:
with self.engine.begin() as conn:
conn.execute(
text("INSERT IGNORE INTO companies (company_crawler_name) VALUES (:name)"),
[{"name": n} for n in new_names],
)
# 3. Fetch IDs for newly inserted rows
with self.engine.connect() as conn:
rows = conn.execute(lookup_stmt, {"names": new_names}).fetchall()
lookup.update({row.company_crawler_name: row.id for row in rows})
logs.info(f"_sync_companies: inserted {len(new_names)} new companies.")
df['company_id'] = df['company'].map(lookup)
return df