tracking dates and better logging

This commit is contained in:
Jason 2025-05-30 21:50:11 -07:00
parent 1cf89ef55e
commit 86a6e7d388

View File

@ -30,13 +30,14 @@ logger.setLevel(logging.INFO)
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201) graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
graylog_handler.include_logger_name = True graylog_handler.include_logger_name = True
logger.addHandler(graylog_handler) logger.addHandler(graylog_handler)
logger.addHandler(logging.StreamHandler())
def load_search_config(config_path): def load_search_config(config_path):
with open(config_path, 'r') as file: with open(config_path, 'r') as file:
return yaml.safe_load(file) return yaml.safe_load(file)
def process_jobs(search_params): def process_jobs(search_params):
print(f"Scraping jobs with parameters: {search_params['name']}") logger.info(f"Scraping jobs with parameters: {search_params['name']}")
jobs = scrape_jobs(**search_params['params']) jobs = scrape_jobs(**search_params['params'])
return jobs return jobs
@ -74,12 +75,13 @@ with DAG(
exclude_list = search['filter']['exclude'] exclude_list = search['filter']['exclude']
_jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)] _jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)]
jobs = pd.concat([jobs, _jobs]) jobs = pd.concat([jobs, _jobs])
print(f"Found {len(_jobs)} jobs for search '{search['name']}'") logger.info(f"Found {len(_jobs)} jobs for search '{search['name']}'")
except Exception as e: except Exception as e:
print(f"Error processing search '{search['name']}': {str(e)}") logger.error(f"Error processing search '{search['name']}': {str(e)}")
continue continue
jobs['processed'] = False jobs['processed'] = False
jobs['created_at'] = datetime.now()
# Connect to MongoDB # Connect to MongoDB
mongo_username = "root" mongo_username = "root"
@ -91,17 +93,18 @@ with DAG(
# Insert into MongoDB # Insert into MongoDB
jobs = jobs.where(pd.notnull(jobs), None) jobs = jobs.where(pd.notnull(jobs), None)
jobs_data = jobs.to_dict('records') jobs_data = jobs.to_dict('records')
for item in jobs_data:
item['date_posted'] = date_to_datetime(item['date_posted'])
for job in jobs_data: for job in jobs_data:
job['date_posted'] = date_to_datetime(job['date_posted'])
result = collection.update_one( result = collection.update_one(
{"id": job["id"]}, # filter by id field {"id": job["id"]}, # filter by id field
{"$set": job}, # update/insert the full job document {
upsert=True # create if doesn't exist, update if it does "$set": job,
"$setOnInsert": {"created_at": datetime.now()},
}, # set created_at on insert
upsert=True,
) )
print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") logger.info(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}")
# TaskFlow dependencies # TaskFlow dependencies
api_results = fetch_jobs() api_results = fetch_jobs()