process_once()¶
async Class: JobsAustriaCacheProcess
File: jobs_austria_cache_process_data_payload.py ยท line 36
Signature¶
| Parameters | none |
| Returns | int |
| Async | Yes |
| Visibility | Public |
Implementation¶
async def process_once(self) -> int:
total_inserted = 0
offset = 0
while True:
async with self.engine.connect() as conn:
rows = (await conn.execute(
text("SELECT url_hash, data_payload FROM scrape_cache WHERE fk_job_id IS NULL LIMIT :limit OFFSET :offset"),
{"limit": BATCH_SIZE, "offset": offset}
)).fetchall()
if not rows:
break
records = [
{
"url_hash": row.url_hash,
"url": payload.get("url"),
"position": payload.get("position"),
}
for row in rows
if (payload := json.loads(row.data_payload))
]
hashes = tuple(r["url_hash"] for r in records)
async with self.engine.connect() as conn:
existing = {
r[0] for r in (await conn.execute(
text("SELECT url_hash FROM jobs WHERE url_hash IN :hashes"),
{"hashes": hashes}
)).fetchall()
}
new_records = [r for r in records if r["url_hash"] not in existing]
if new_records:
async with self.engine.connect() as conn:
await conn.execute(
text("INSERT INTO jobs (url, position) VALUES (:url, :position)"),
[{"url": r["url"], "position": r["position"]} for r in new_records]
)
await conn.commit()
total_inserted += len(new_records)
logs.info(f"Inserted {len(new_records)} rows at offset {offset}")
offset += BATCH_SIZE
return total_inserted