JobsAustriaETLCache¶
File: jobs_austria_cache_import.py
Inherits: —
Class Diagram¶
classDiagram
class JobsAustriaETLCache {
+__init__()
-_create_engine()
-_create_generators()
+async connect()
+async disconnect()
-async _drain_generator()
+async produce_apify_results()
+async consume_apify_results()
+async produce_and_consume()
+async run()
}
Hold "Ctrl" to enable pan & zoom
Sequence Diagram¶
Shows the full runtime flow from actor startup through concurrent scraping and database insertion.
sequenceDiagram
autonumber
actor Caller
participant ETL as JobsAustriaETLCache
participant Loader as ApifyActorLoader
participant Apify as Apify Platform
participant Gen as ApifyGenerator(s)
participant Q as asyncio.Queue
participant MySQL as MySQLAsyncLoader
participant DB as scrape_cache 🗄️
Caller->>ETL: run()
note over ETL,Apify: Fire Apify actors
ETL->>Loader: run() — connect → load config → start actors
Loader->>Apify: actor.start(run_input) × N configs
Apify-->>Loader: run_info (run IDs)
ETL->>ETL: _create_generators() — one per run ID
note over ETL,DB: @with_session wraps produce_and_consume()
ETL->>ETL: connect() — open async DB connection
par produce_apify_results()
loop each ApifyGenerator (concurrent via asyncio.gather)
ETL->>Gen: _drain_generator(generator)
loop async for item in generator.run()
Gen->>Apify: poll dataset page
Apify-->>Gen: items
Gen-->>ETL: item
ETL->>Q: queue.put(item)
end
end
ETL->>Q: queue.put(None) ← poison pill signals end
and consume_apify_results()
ETL->>MySQL: load(queue, insert_strategy)
MySQL->>MySQL: start queue monitor (logs size every 5s)
loop until None received
MySQL->>Q: queue.get()
Q-->>MySQL: item
note over MySQL: accumulate — flush when queue empty or batch ≥ 100
MySQL->>DB: INSERT IGNORE (url, scraped_at, data_payload)
DB-->>MySQL: inserted / ignored counts
end
MySQL->>MySQL: cancel queue monitor
end
ETL->>ETL: disconnect() — close connection, dispose engine
note over ETL: On any exception — abort all Apify runs before re-raising
Hold "Ctrl" to enable pan & zoom
Source
class JobsAustriaETLCache:
def __init__(self, apify_input_config_path: str, insert_strategy: Callable):
self.queue = asyncio.Queue()
self.insert_strategy = insert_strategy
self.apify_actor_loader = ApifyActorLoader(apify_input_config_path)
self.generators = None # deferred until run()
self.mysql_loader = None # deferred until connect()
self.engine = None
self.connection = None
@staticmethod
def _create_engine():
return create_async_engine(os.getenv('DATABASE_URL_JOBS_INTELLIGENCE_AUSTRIA'))
def _create_generators(self):
return [ApifyGenerator(dataset_id) for dataset_id in self.apify_actor_loader.return_dataset_id()]
async def connect(self):
self.engine = self._create_engine()
self.connection = await self.engine.connect()
self.mysql_loader = MySQLAsyncLoader(self.connection)
return self
async def disconnect(self):
await self.connection.close()
await self.engine.dispose()
async def _drain_generator(self, generator: ApifyGenerator):
async for item in generator.run():
await self.queue.put(item)
async def produce_apify_results(self):
await asyncio.gather(*[self._drain_generator(g) for g in self.generators])
await self.queue.put(None) # poison pill to stop consumer
async def consume_apify_results(self):
await self.mysql_loader.load(self.queue, self.insert_strategy)
@with_session
async def produce_and_consume(self):
await asyncio.gather(
self.produce_apify_results(),
self.consume_apify_results()
)
async def run(self):
self.apify_actor_loader.run()
self.generators = self._create_generators()
try:
await self.produce_and_consume()
except BaseException:
await asyncio.gather(*[g.abort_run() for g in self.generators], return_exceptions=True)
raise
Methods¶
| Method | Parameters | Returns |
|---|---|---|
__init__() |
apify_input_config_path: str, insert_strategy: Callable |
— |
_create_engine() |
— | — |
_create_generators() |
— | — |
connect() |
— | — |
disconnect() |
— | — |
_drain_generator() |
generator: ApifyGenerator |
— |
produce_apify_results() |
— | — |
consume_apify_results() |
— | — |
produce_and_consume() |
— | — |
run() |
— | — |
Attributes¶
No class-level attributes.