Skip to content

run()

async Class: JobsAustriaDetailsETL
File: jobs_austria_details_scraping.py · line 219

Signature

Parameters none
Returns not annotated
Async Yes
Visibility Public

Implementation

async def run(self):
    # Step 1: Fetch pending URLs
    urls = await self._fetch_pending_urls()
    if not urls:
        logs.info("No pending jobs found. Exiting.")
        return

    configs = self._build_configs(urls)
    total_chunks = (len(configs) + MAX_CONCURRENT_ACTORS - 1) // MAX_CONCURRENT_ACTORS
    logs.info(
        f"Found {len(urls)} jobs → {len(configs)} actor batches of {URL_BATCH_SIZE} URLs, "
        f"firing max {MAX_CONCURRENT_ACTORS} actors at a time ({total_chunks} chunks)."
    )

    # Step 2: Process configs in chunks of MAX_CONCURRENT_ACTORS
    for i in range(0, len(configs), MAX_CONCURRENT_ACTORS):
        chunk = configs[i:i + MAX_CONCURRENT_ACTORS]
        chunk_num = i // MAX_CONCURRENT_ACTORS + 1

        run_ids = self._start_actors(chunk)
        self.generators = [ApifyGenerator(run_id) for run_id in run_ids]
        self.queue = asyncio.Queue()  # fresh queue per chunk

        logs.info(f"Chunk {chunk_num}/{total_chunks}: {len(chunk)} actors running.")
        try:
            await self.produce_and_consume()
        except BaseException:
            await asyncio.gather(*[g.abort_run() for g in self.generators], return_exceptions=True)
            raise
        logs.info(f"Chunk {chunk_num}/{total_chunks} complete.")