110 lines
3.6 KiB
Python
110 lines
3.6 KiB
Python
from airflow import DAG
|
|
from airflow.decorators import task
|
|
from airflow.models import Variable
|
|
from datetime import datetime, date, timedelta
|
|
from pymongo import MongoClient
|
|
|
|
from jobspy import scrape_jobs
|
|
|
|
import pandas as pd
|
|
import yaml
|
|
import graypy
|
|
import logging
|
|
|
|
DEBUG = False
|
|
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger('JobBot')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Configure graypy handler for GELF UDP
|
|
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
|
|
graylog_handler.include_logger_name = True
|
|
logger.addHandler(graylog_handler)
|
|
logger.addHandler(logging.StreamHandler())
|
|
|
|
def load_search_config(config_path):
|
|
with open(config_path, 'r') as file:
|
|
return yaml.safe_load(file)
|
|
|
|
def process_jobs(search_params):
|
|
logger.info(f"Scraping jobs with parameters: {search_params['name']}")
|
|
jobs = scrape_jobs(**search_params['params'])
|
|
return jobs
|
|
|
|
def date_to_datetime(d):
|
|
if isinstance(d, date) and not isinstance(d, datetime):
|
|
return datetime.combine(d, datetime.min.time())
|
|
return d
|
|
|
|
with DAG(
|
|
'job_bot_api_dag2',
|
|
default_args=default_args,
|
|
description='A DAG to fetch data from job-bot API and process it',
|
|
schedule='*/10 * * * *', # Every 10 minutes
|
|
start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day
|
|
catchup=False,
|
|
max_active_runs=1,
|
|
tags=['job-bot', 'api'],
|
|
) as dag:
|
|
|
|
@task()
|
|
def fetch_jobs():
|
|
# Load configuration
|
|
config = load_search_config('/opt/airflow/dags/search_criteria.yaml')
|
|
|
|
jobs = pd.DataFrame()
|
|
|
|
# Process each search configuration
|
|
for search in config['searches']:
|
|
try:
|
|
_jobs = process_jobs(search)
|
|
if len(_jobs) > 0:
|
|
# Apply filters from search configuration if they exist
|
|
if 'filter' in search:
|
|
filter_field = search['filter']['field']
|
|
exclude_list = search['filter']['exclude']
|
|
_jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)]
|
|
jobs = pd.concat([jobs, _jobs])
|
|
logger.info(f"Found {len(_jobs)} jobs for search '{search['name']}'")
|
|
except Exception as e:
|
|
logger.error(f"Error processing search '{search['name']}': {str(e)}")
|
|
continue
|
|
|
|
# Connect to MongoDB
|
|
mongo_username = "root"
|
|
mongo_password = "passwd"
|
|
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
|
|
db = client["jobs-etl"]
|
|
collection = db["incoming_jobs"]
|
|
|
|
# Insert into MongoDB
|
|
jobs = jobs.where(pd.notnull(jobs), None)
|
|
jobs_data = jobs.to_dict('records')
|
|
|
|
for job in jobs_data:
|
|
job['date_posted'] = date_to_datetime(job['date_posted'])
|
|
result = collection.update_one(
|
|
{"id": job["id"]}, # filter by id field
|
|
{
|
|
"$set": job,
|
|
"$setOnInsert": {"created_at": datetime.now()},
|
|
}, # set created_at on insert
|
|
upsert=True,
|
|
)
|
|
logger.info(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}")
|
|
|
|
# TaskFlow dependencies
|
|
api_results = fetch_jobs()
|
|
|
|
if __name__ == "__main__":
|
|
dag.test() |