added db code
This commit is contained in:
parent
fb62c02e5d
commit
2e7a3dca31
@ -80,6 +80,30 @@ with DAG(
|
|||||||
# Basic stats
|
# Basic stats
|
||||||
print(f"Found {len(jobs)} jobs")
|
print(f"Found {len(jobs)} jobs")
|
||||||
|
|
||||||
|
jobs['processed'] = False
|
||||||
|
|
||||||
|
# Connect to MongoDB
|
||||||
|
mongo_username = "root"
|
||||||
|
mongo_password = "passwd"
|
||||||
|
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
|
||||||
|
db = client["jobs-etl"]
|
||||||
|
collection = db["incoming_jobs"]
|
||||||
|
|
||||||
|
# Insert into MongoDB
|
||||||
|
jobs = jobs.where(pd.notnull(jobs), None)
|
||||||
|
jobs_data = jobs.to_dict('records')
|
||||||
|
for item in jobs_data:
|
||||||
|
item['date_posted'] = date_to_datetime(item['date_posted'])
|
||||||
|
|
||||||
|
for job in jobs_data:
|
||||||
|
result = collection.update_one(
|
||||||
|
{"id": job["id"]}, # filter by id field
|
||||||
|
{"$set": job}, # update/insert the full job document
|
||||||
|
upsert=True # create if doesn't exist, update if it does
|
||||||
|
)
|
||||||
|
print(f"Job {job['id']}: {'Updated' if result.modified_count > 0 else 'Inserted' if result.upserted_id else 'No change'}")
|
||||||
|
|
||||||
|
|
||||||
# TaskFlow dependencies
|
# TaskFlow dependencies
|
||||||
api_results = fetch_jobs()
|
api_results = fetch_jobs()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user