Skip to content

process_once()

async Class: JobsAustriaCacheProcess
File: jobs_austria_cache_process_data_payload.py ยท line 36

Signature

Parameters none
Returns int
Async Yes
Visibility Public

Implementation

async def process_once(self) -> int:
    total_inserted = 0
    offset = 0

    while True:
        async with self.engine.connect() as conn:
            rows = (await conn.execute(
                text("SELECT url_hash, data_payload FROM scrape_cache WHERE fk_job_id IS NULL LIMIT :limit OFFSET :offset"),
                {"limit": BATCH_SIZE, "offset": offset}
            )).fetchall()

        if not rows:
            break

        records = [
            {
                "url_hash": row.url_hash,
                "url": payload.get("url"),
                "position": payload.get("position"),
            }
            for row in rows
            if (payload := json.loads(row.data_payload))
        ]

        hashes = tuple(r["url_hash"] for r in records)
        async with self.engine.connect() as conn:
            existing = {
                r[0] for r in (await conn.execute(
                    text("SELECT url_hash FROM jobs WHERE url_hash IN :hashes"),
                    {"hashes": hashes}
                )).fetchall()
            }

        new_records = [r for r in records if r["url_hash"] not in existing]

        if new_records:
            async with self.engine.connect() as conn:
                await conn.execute(
                    text("INSERT INTO jobs (url, position) VALUES (:url, :position)"),
                    [{"url": r["url"], "position": r["position"]} for r in new_records]
                )
                await conn.commit()
            total_inserted += len(new_records)
            logs.info(f"Inserted {len(new_records)} rows at offset {offset}")

        offset += BATCH_SIZE

    return total_inserted