from jobspy import scrape_jobs from datetime import datetime, date import pymongo import pandas as pd import yaml def load_search_config(config_path): with open(config_path, 'r') as file: return yaml.safe_load(file) def process_jobs(search_params): print(f"Scraping jobs with parameters: {search_params['name']}") jobs = scrape_jobs(**search_params['params']) return jobs def date_to_datetime(d): if isinstance(d, date) and not isinstance(d, datetime): return datetime.combine(d, datetime.min.time()) return d def main(): # Load configuration config = load_search_config('/opt/airflow/dags/search_criteria.yaml') jobs = pd.DataFrame() # Process each search configuration for search in config['searches']: try: _jobs = process_jobs(search) if len(_jobs) > 0: # Apply filters from search configuration if they exist if 'filter' in search: filter_field = search['filter']['field'] exclude_list = search['filter']['exclude'] _jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)] jobs = pd.concat([jobs, _jobs]) except Exception as e: print(f"Error processing search '{search['name']}': {str(e)}") continue # Basic stats print(f"Found {len(jobs)} jobs") jobs['processed'] = False # Connect to MongoDB mongo_username = "root" mongo_password = "passwd" client = pymongo.MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/") db = client["jobs-etl"] collection = db["incoming_jobs"] # Insert into MongoDB jobs = jobs.where(pd.notnull(jobs), None) jobs_data = jobs.to_dict('records') for item in jobs_data: item['date_posted'] = date_to_datetime(item['date_posted']) for job in jobs_data: result = collection.update_one( {"id": job["id"]}, # filter by id field {"$set": job}, # update/insert the full job document upsert=True # create if doesn't exist, update if it does ) print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") if __name__ == "__main__": main()