Skip to content

JobsAustriaETLCache

File: jobs_austria_cache_import.py
Inherits:

Class Diagram

classDiagram
    class JobsAustriaETLCache {
        +__init__()
        -_create_engine()
        -_create_generators()
        +async connect()
        +async disconnect()
        -async _drain_generator()
        +async produce_apify_results()
        +async consume_apify_results()
        +async produce_and_consume()
        +async run()
    }
Hold "Ctrl" to enable pan & zoom

Sequence Diagram

Shows the full runtime flow from actor startup through concurrent scraping and database insertion.

sequenceDiagram
    autonumber

    actor Caller
    participant ETL as JobsAustriaETLCache
    participant Loader as ApifyActorLoader
    participant Apify as Apify Platform
    participant Gen as ApifyGenerator(s)
    participant Q as asyncio.Queue
    participant MySQL as MySQLAsyncLoader
    participant DB as scrape_cache 🗄️

    Caller->>ETL: run()

    note over ETL,Apify: Fire Apify actors
    ETL->>Loader: run() — connect → load config → start actors
    Loader->>Apify: actor.start(run_input) × N configs
    Apify-->>Loader: run_info (run IDs)
    ETL->>ETL: _create_generators() — one per run ID

    note over ETL,DB: @with_session wraps produce_and_consume()
    ETL->>ETL: connect() — open async DB connection

    par produce_apify_results()
        loop each ApifyGenerator (concurrent via asyncio.gather)
            ETL->>Gen: _drain_generator(generator)
            loop async for item in generator.run()
                Gen->>Apify: poll dataset page
                Apify-->>Gen: items
                Gen-->>ETL: item
                ETL->>Q: queue.put(item)
            end
        end
        ETL->>Q: queue.put(None) ← poison pill signals end
    and consume_apify_results()
        ETL->>MySQL: load(queue, insert_strategy)
        MySQL->>MySQL: start queue monitor (logs size every 5s)
        loop until None received
            MySQL->>Q: queue.get()
            Q-->>MySQL: item
            note over MySQL: accumulate — flush when queue empty or batch ≥ 100
            MySQL->>DB: INSERT IGNORE (url, scraped_at, data_payload)
            DB-->>MySQL: inserted / ignored counts
        end
        MySQL->>MySQL: cancel queue monitor
    end

    ETL->>ETL: disconnect() — close connection, dispose engine

    note over ETL: On any exception — abort all Apify runs before re-raising
Hold "Ctrl" to enable pan & zoom

Source
class JobsAustriaETLCache:
    def __init__(self, apify_input_config_path: str, insert_strategy: Callable):
        self.queue = asyncio.Queue()
        self.insert_strategy = insert_strategy
        self.apify_actor_loader = ApifyActorLoader(apify_input_config_path)
        self.generators = None  # deferred until run()
        self.mysql_loader = None  # deferred until connect()
        self.engine = None
        self.connection = None

    @staticmethod
    def _create_engine():
        return create_async_engine(os.getenv('DATABASE_URL_JOBS_INTELLIGENCE_AUSTRIA'))

    def _create_generators(self):
        return [ApifyGenerator(dataset_id) for dataset_id in self.apify_actor_loader.return_dataset_id()]

    async def connect(self):
        self.engine = self._create_engine()
        self.connection = await self.engine.connect()
        self.mysql_loader = MySQLAsyncLoader(self.connection)
        return self

    async def disconnect(self):
        await self.connection.close()
        await self.engine.dispose()

    async def _drain_generator(self, generator: ApifyGenerator):
        async for item in generator.run():
            await self.queue.put(item)

    async def produce_apify_results(self):
        await asyncio.gather(*[self._drain_generator(g) for g in self.generators])
        await self.queue.put(None)  # poison pill to stop consumer

    async def consume_apify_results(self):
        await self.mysql_loader.load(self.queue, self.insert_strategy)

    @with_session
    async def produce_and_consume(self):
        await asyncio.gather(
            self.produce_apify_results(),
            self.consume_apify_results()
        )

    async def run(self):
                    self.apify_actor_loader.run()
                    self.generators = self._create_generators()
                    try:
                        await self.produce_and_consume()
                    except BaseException:
                        await asyncio.gather(*[g.abort_run() for g in self.generators], return_exceptions=True)
                        raise

Methods

Method Parameters Returns
__init__() apify_input_config_path: str, insert_strategy: Callable
_create_engine()
_create_generators()
connect()
disconnect()
_drain_generator() generator: ApifyGenerator
produce_apify_results()
consume_apify_results()
produce_and_consume()
run()

Attributes

No class-level attributes.