diff --git a/extract_v2.py b/extract_v2.py index a277d09..7d57cb6 100644 --- a/extract_v2.py +++ b/extract_v2.py @@ -80,6 +80,30 @@ with DAG( # Basic stats print(f"Found {len(jobs)} jobs") + jobs['processed'] = False + + # Connect to MongoDB + mongo_username = "root" + mongo_password = "passwd" + client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/") + db = client["jobs-etl"] + collection = db["incoming_jobs"] + + # Insert into MongoDB + jobs = jobs.where(pd.notnull(jobs), None) + jobs_data = jobs.to_dict('records') + for item in jobs_data: + item['date_posted'] = date_to_datetime(item['date_posted']) + + for job in jobs_data: + result = collection.update_one( + {"id": job["id"]}, # filter by id field + {"$set": job}, # update/insert the full job document + upsert=True # create if doesn't exist, update if it does + ) + print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}") + + # TaskFlow dependencies api_results = fetch_jobs()