from airflow import DAG from airflow.decorators import task from airflow.models import Variable from datetime import datetime, date, timedelta from pymongo import MongoClient from jobspy import scrape_jobs import pandas as pd import yaml import graypy import logging DEBUG = False default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Configure logging logger = logging.getLogger('JobBot') logger.setLevel(logging.INFO) # Configure graypy handler for GELF UDP graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201) graylog_handler.include_logger_name = True logger.addHandler(graylog_handler) def load_search_config(config_path): with open(config_path, 'r') as file: return yaml.safe_load(file) def process_jobs(search_params): print(f"Scraping jobs with parameters: {search_params['name']}") jobs = scrape_jobs(**search_params['params']) return jobs def date_to_datetime(d): if isinstance(d, date) and not isinstance(d, datetime): return datetime.combine(d, datetime.min.time()) return d with DAG( 'job_bot_api_dag', default_args=default_args, description='A DAG to fetch data from job-bot API and process it', schedule='*/10 * * * *', # Every 10 minutes start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day catchup=False, max_active_runs=1, tags=['job-bot', 'api'], ) as dag: @task() def fetch_jobs(): # Load configuration config = load_search_config('search_criteria.yaml') jobs = pd.DataFrame() # Process each search configuration for search in config['searches']: try: _jobs = process_jobs(search) if len(_jobs) > 0: # Apply filters from search configuration if they exist if 'filter' in search: filter_field = search['filter']['field'] exclude_list = search['filter']['exclude'] _jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)] jobs = pd.concat([jobs, _jobs]) except Exception as e: print(f"Error processing search '{search['name']}': {str(e)}") continue # Basic stats print(f"Found {len(jobs)} jobs") # TaskFlow dependencies api_results = fetch_jobs() if __name__ == "__main__": dag.test()