Skip to content

_sync_companies()

Class: JobsAustriaCacheSynchronizer
File: jobs_austria_cache_synchronizer.py ยท line 149

Inserts missing company names into the companies table and maps company_id back onto every row. Max 3 DB roundtrips per batch.

Signature

Parameters df
Returns not annotated
Async No
Visibility Private

Implementation

def _sync_companies(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Inserts missing company names into the companies table and maps
    company_id back onto every row. Max 3 DB roundtrips per batch.
    """
    if 'company' not in df.columns:
        df['company_id'] = None
        return df

    df['company'] = df['company'].str.strip()
    unique_names = [n for n in df['company'].dropna().unique() if n]
    if not unique_names:
        df['company_id'] = None
        return df

    lookup_stmt = text(
        "SELECT id, company_crawler_name FROM companies "
        "WHERE company_crawler_name IN :names"
    ).bindparams(bindparam("names", expanding=True))

    # 1. Batch lookup existing companies
    with self.engine.connect() as conn:
        rows = conn.execute(lookup_stmt, {"names": unique_names}).fetchall()
    lookup = {row.company_crawler_name: row.id for row in rows}

    # 2. Insert missing companies
    new_names = [n for n in unique_names if n not in lookup]
    if new_names:
        with self.engine.begin() as conn:
            conn.execute(
                text("INSERT IGNORE INTO companies (company_crawler_name) VALUES (:name)"),
                [{"name": n} for n in new_names],
            )
        # 3. Fetch IDs for newly inserted rows
        with self.engine.connect() as conn:
            rows = conn.execute(lookup_stmt, {"names": new_names}).fetchall()
        lookup.update({row.company_crawler_name: row.id for row in rows})
        logs.info(f"_sync_companies: inserted {len(new_names)} new companies.")

    df['company_id'] = df['company'].map(lookup)
    return df