run()¶
async Class: JobsAustriaDetailsETL
File: jobs_austria_details_scraping.py · line 219
Signature¶
| Parameters | none |
| Returns | not annotated |
| Async | Yes |
| Visibility | Public |
Implementation¶
async def run(self):
# Step 1: Fetch pending URLs
urls = await self._fetch_pending_urls()
if not urls:
logs.info("No pending jobs found. Exiting.")
return
configs = self._build_configs(urls)
total_chunks = (len(configs) + MAX_CONCURRENT_ACTORS - 1) // MAX_CONCURRENT_ACTORS
logs.info(
f"Found {len(urls)} jobs → {len(configs)} actor batches of {URL_BATCH_SIZE} URLs, "
f"firing max {MAX_CONCURRENT_ACTORS} actors at a time ({total_chunks} chunks)."
)
# Step 2: Process configs in chunks of MAX_CONCURRENT_ACTORS
for i in range(0, len(configs), MAX_CONCURRENT_ACTORS):
chunk = configs[i:i + MAX_CONCURRENT_ACTORS]
chunk_num = i // MAX_CONCURRENT_ACTORS + 1
run_ids = self._start_actors(chunk)
self.generators = [ApifyGenerator(run_id) for run_id in run_ids]
self.queue = asyncio.Queue() # fresh queue per chunk
logs.info(f"Chunk {chunk_num}/{total_chunks}: {len(chunk)} actors running.")
try:
await self.produce_and_consume()
except BaseException:
await asyncio.gather(*[g.abort_run() for g in self.generators], return_exceptions=True)
raise
logs.info(f"Chunk {chunk_num}/{total_chunks} complete.")