From ee8101a248220ed97ff56be3df0105669dddf52b Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 30 May 2025 11:12:35 -0700 Subject: [PATCH] fix path? --- extract.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 extract.py diff --git a/extract.py b/extract.py new file mode 100644 index 0000000..cbf8ffb --- /dev/null +++ b/extract.py @@ -0,0 +1,71 @@ +from jobspy import scrape_jobs +from datetime import datetime, date +import pymongo +import pandas as pd +import yaml + +def load_search_config(config_path): + with open(config_path, 'r') as file: + return yaml.safe_load(file) + +def process_jobs(search_params): + print(f"Scraping jobs with parameters: {search_params['name']}") + jobs = scrape_jobs(**search_params['params']) + return jobs + +def date_to_datetime(d): + if isinstance(d, date) and not isinstance(d, datetime): + return datetime.combine(d, datetime.min.time()) + return d + +def main(): + # Load configuration + config = load_search_config('/opt/airflow/dags/search_criteria.yaml') + + jobs = pd.DataFrame() + + # Process each search configuration + for search in config['searches']: + try: + _jobs = process_jobs(search) + if len(_jobs) > 0: + # Apply filters from search configuration if they exist + if 'filter' in search: + filter_field = search['filter']['field'] + exclude_list = search['filter']['exclude'] + _jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)] + jobs = pd.concat([jobs, _jobs]) + except Exception as e: + print(f"Error processing search '{search['name']}': {str(e)}") + continue + # Basic stats + print(f"Found {len(jobs)} jobs") + + + + jobs['processed'] = False + + # Connect to MongoDB + mongo_username = "root" + mongo_password = "passwd" + client = pymongo.MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/") + db = client["jobs-etl"] + collection = db["incoming_jobs"] + + # Insert into MongoDB + jobs = jobs.where(pd.notnull(jobs), None) + jobs_data = jobs.to_dict('records') + for item in jobs_data: + item['date_posted'] = date_to_datetime(item['date_posted']) + + for job in jobs_data: + result = collection.update_one( + {"id": job["id"]}, # filter by id field + {"$set": job}, # update/insert the full job document + upsert=True # create if doesn't exist, update if it does + ) + print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") + + +if __name__ == "__main__": + main()