From 86a6e7d388aae5c5e58b0209f9c0e6f9c43691cd Mon Sep 17 00:00:00 2001 From: Jason Date: Fri, 30 May 2025 21:50:11 -0700 Subject: [PATCH] tracking dates and better logging --- extract_v2.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/extract_v2.py b/extract_v2.py index 5c2b30f..c63f90a 100644 --- a/extract_v2.py +++ b/extract_v2.py @@ -30,13 +30,14 @@ logger.setLevel(logging.INFO) graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201) graylog_handler.include_logger_name = True logger.addHandler(graylog_handler) +logger.addHandler(logging.StreamHandler()) def load_search_config(config_path): with open(config_path, 'r') as file: return yaml.safe_load(file) def process_jobs(search_params): - print(f"Scraping jobs with parameters: {search_params['name']}") + logger.info(f"Scraping jobs with parameters: {search_params['name']}") jobs = scrape_jobs(**search_params['params']) return jobs @@ -74,12 +75,13 @@ with DAG( exclude_list = search['filter']['exclude'] _jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)] jobs = pd.concat([jobs, _jobs]) - print(f"Found {len(_jobs)} jobs for search '{search['name']}'") + logger.info(f"Found {len(_jobs)} jobs for search '{search['name']}'") except Exception as e: - print(f"Error processing search '{search['name']}': {str(e)}") + logger.error(f"Error processing search '{search['name']}': {str(e)}") continue jobs['processed'] = False + jobs['created_at'] = datetime.now() # Connect to MongoDB mongo_username = "root" @@ -91,17 +93,18 @@ with DAG( # Insert into MongoDB jobs = jobs.where(pd.notnull(jobs), None) jobs_data = jobs.to_dict('records') - for item in jobs_data: - item['date_posted'] = date_to_datetime(item['date_posted']) for job in jobs_data: + job['date_posted'] = date_to_datetime(job['date_posted']) result = collection.update_one( {"id": job["id"]}, # filter by id field - {"$set": job}, # update/insert the full job document - upsert=True # create if doesn't exist, update if it does + { + "$set": job, + "$setOnInsert": {"created_at": datetime.now()}, + }, # set created_at on insert + upsert=True, ) - print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") - + logger.info(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") # TaskFlow dependencies api_results = fetch_jobs()