job-bot/extract_v2.py
2025-06-03 22:40:21 -07:00

110 lines
3.6 KiB
Python

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime, date, timedelta
from pymongo import MongoClient
from jobspy import scrape_jobs
import pandas as pd
import yaml
import graypy
import logging
DEBUG = False
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Configure logging
logger = logging.getLogger('JobBot')
logger.setLevel(logging.INFO)
# Configure graypy handler for GELF UDP
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
graylog_handler.include_logger_name = True
logger.addHandler(graylog_handler)
logger.addHandler(logging.StreamHandler())
def load_search_config(config_path):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
def process_jobs(search_params):
logger.info(f"Scraping jobs with parameters: {search_params['name']}")
jobs = scrape_jobs(**search_params['params'])
return jobs
def date_to_datetime(d):
if isinstance(d, date) and not isinstance(d, datetime):
return datetime.combine(d, datetime.min.time())
return d
with DAG(
'job_bot_api_dag2',
default_args=default_args,
description='A DAG to fetch data from job-bot API and process it',
schedule='*/10 * * * *', # Every 10 minutes
start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day
catchup=False,
max_active_runs=1,
tags=['job-bot', 'api'],
) as dag:
@task()
def fetch_jobs():
# Load configuration
config = load_search_config('/opt/airflow/dags/search_criteria.yaml')
jobs = pd.DataFrame()
# Process each search configuration
for search in config['searches']:
try:
_jobs = process_jobs(search)
if len(_jobs) > 0:
# Apply filters from search configuration if they exist
if 'filter' in search:
filter_field = search['filter']['field']
exclude_list = search['filter']['exclude']
_jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)]
jobs = pd.concat([jobs, _jobs])
logger.info(f"Found {len(_jobs)} jobs for search '{search['name']}'")
except Exception as e:
logger.error(f"Error processing search '{search['name']}': {str(e)}")
continue
# Connect to MongoDB
mongo_username = "root"
mongo_password = "passwd"
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
db = client["jobs-etl"]
collection = db["incoming_jobs"]
# Insert into MongoDB
jobs = jobs.where(pd.notnull(jobs), None)
jobs_data = jobs.to_dict('records')
for job in jobs_data:
job['date_posted'] = date_to_datetime(job['date_posted'])
result = collection.update_one(
{"id": job["id"]}, # filter by id field
{
"$set": job,
"$setOnInsert": {"created_at": datetime.now()},
}, # set created_at on insert
upsert=True,
)
logger.info(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}")
# TaskFlow dependencies
api_results = fetch_jobs()
if __name__ == "__main__":
dag.test()