fix path?
This commit is contained in:
parent
6056ce116a
commit
ee8101a248
71
extract.py
Normal file
71
extract.py
Normal file
@ -0,0 +1,71 @@
|
||||
from jobspy import scrape_jobs
|
||||
from datetime import datetime, date
|
||||
import pymongo
|
||||
import pandas as pd
|
||||
import yaml
|
||||
|
||||
def load_search_config(config_path):
|
||||
with open(config_path, 'r') as file:
|
||||
return yaml.safe_load(file)
|
||||
|
||||
def process_jobs(search_params):
|
||||
print(f"Scraping jobs with parameters: {search_params['name']}")
|
||||
jobs = scrape_jobs(**search_params['params'])
|
||||
return jobs
|
||||
|
||||
def date_to_datetime(d):
|
||||
if isinstance(d, date) and not isinstance(d, datetime):
|
||||
return datetime.combine(d, datetime.min.time())
|
||||
return d
|
||||
|
||||
def main():
|
||||
# Load configuration
|
||||
config = load_search_config('/opt/airflow/dags/search_criteria.yaml')
|
||||
|
||||
jobs = pd.DataFrame()
|
||||
|
||||
# Process each search configuration
|
||||
for search in config['searches']:
|
||||
try:
|
||||
_jobs = process_jobs(search)
|
||||
if len(_jobs) > 0:
|
||||
# Apply filters from search configuration if they exist
|
||||
if 'filter' in search:
|
||||
filter_field = search['filter']['field']
|
||||
exclude_list = search['filter']['exclude']
|
||||
_jobs = _jobs[~_jobs[filter_field].str.contains('|'.join(exclude_list), case=False, na=False)]
|
||||
jobs = pd.concat([jobs, _jobs])
|
||||
except Exception as e:
|
||||
print(f"Error processing search '{search['name']}': {str(e)}")
|
||||
continue
|
||||
# Basic stats
|
||||
print(f"Found {len(jobs)} jobs")
|
||||
|
||||
|
||||
|
||||
jobs['processed'] = False
|
||||
|
||||
# Connect to MongoDB
|
||||
mongo_username = "root"
|
||||
mongo_password = "passwd"
|
||||
client = pymongo.MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
|
||||
db = client["jobs-etl"]
|
||||
collection = db["incoming_jobs"]
|
||||
|
||||
# Insert into MongoDB
|
||||
jobs = jobs.where(pd.notnull(jobs), None)
|
||||
jobs_data = jobs.to_dict('records')
|
||||
for item in jobs_data:
|
||||
item['date_posted'] = date_to_datetime(item['date_posted'])
|
||||
|
||||
for job in jobs_data:
|
||||
result = collection.update_one(
|
||||
{"id": job["id"]}, # filter by id field
|
||||
{"$set": job}, # update/insert the full job document
|
||||
upsert=True # create if doesn't exist, update if it does
|
||||
)
|
||||
print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user