job-bot/job-bot.py
2025-05-30 10:38:04 -07:00

287 lines
13 KiB
Python

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime, timedelta
from pymongo import MongoClient
import requests
import os
import json
import openai
import graypy
import logging
DEBUG = False
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Configure logging
logger = logging.getLogger('JobBot')
logger.setLevel(logging.INFO)
# Configure graypy handler for GELF UDP
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
graylog_handler.include_logger_name = True
logger.addHandler(graylog_handler)
with DAG(
'job_bot_api_dag',
default_args=default_args,
description='A DAG to fetch data from job-bot API and process it',
schedule='*/10 * * * *', # Every 10 minutes
start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day
catchup=False,
max_active_runs=1,
tags=['job-bot', 'api'],
) as dag:
@task()
def fetch_api_data():
cache_file = '/tmp/job_bot_api_cache.json'
api_results = None
logger.info("Fetching data from job-bot API...")
# # Check if cache file exists and is recent
if DEBUG and os.path.exists(cache_file):
cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
#if cache_age < timedelta(hours=1):
print("Using cached API response.")
with open(cache_file, 'r') as f:
api_results = json.load(f)
if api_results is None:
url = 'http://job-bot:8000/api/v1/search_jobs'
params = {
'search_term': 'senior engineering manager',
'location': 'San Francisco, CA',
'description_format': 'html'
}
response = requests.get(url, params=params)
response.raise_for_status()
api_results = response.json()
with open(cache_file, 'w') as f:
json.dump(api_results, f)
logger.info(f"API returned {len(api_results.get('jobs', []))} jobs")
return api_results
@task()
def insert_into_mongodb(api_results):
mongo_username = "root"
mongo_password = "passwd"
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
db = client["job_bot_db"]
collection = db["job-bot"]
new_jobs = []
for job in api_results.get("jobs", []):
result = collection.update_one(
{"id": job["id"]},
{"$set": job},
upsert=True
)
if result.upserted_id:
new_jobs.append(job)
# If no new jobs, always pass at least one job (the first one) to the LLM
# Test by searching for a specific job ID
test_job = collection.find_one({"id": "in-20166e6a5b1aeede"})
if test_job and DEBUG:
# Convert MongoDB document to dict and remove _id field
test_job = dict(test_job)
test_job.pop('_id', None)
logger.info("Found test job, using it for LLM testing")
new_jobs = [test_job]
if not new_jobs and api_results.get("jobs") and DEBUG:
logger.info("No new jobs inserted, passing the first job to LLM for debugging.")
new_jobs = [api_results["jobs"][0]]
else:
logger.info(f"Inserted/Updated {len(new_jobs)} new jobs into MongoDB.")
return new_jobs
@task()
def query_llm(new_jobs):
results = [] # Store results for downstream tasks
openai.api_key = Variable.get("OPENAI_API_KEY")
if not new_jobs:
logger.info("No new jobs to process with LLM.")
return results
for job in new_jobs:
prompt = f"""
Please analyze this job description and evaluate if it's a good match for my background as a senior engineering manager. Structure the response with a machine-readable first line followed by the detailed analysis.
START_RESPONSE::{{"success": boolean, "location_pass": boolean, "salary_pass": boolean, "title_pass": boolean, "match_rating": "Strong|Medium|Poor"}}
## DEAL BREAKERS:
1. **Location:** For non-remote/hybrid roles, must be within reasonable driving distance of Mill Valley, CA (acceptable areas: San Francisco, North Bay, East Bay within 1 hour drive)
2. **Company:** Exclude positions at Amazon/AWS as an employer (working with AWS technology is fine)
3. **Compensation:** Base salary must meet or exceed $250k
- If salary range is provided: Check against minimum requirement
- If salary not provided: Evaluate based on:
* Company Tier Assessment:
- Tier 1 (Automatic Pass): FAANG/MANGA (except Amazon), Major Tech Leaders (Uber, Lyft, Airbnb, Stripe, etc.), Well-funded Unicorns
- Tier 2 (Likely Pass): Established public tech companies, Later-stage startups with significant funding ($500M+ valuation)
- Tier 3 (Requires Verification): Early-stage startups, smaller public companies, non-tech enterprises
* Additional Factors:
* Location-based pay scales (SF Bay Area premium)
* Industry standards for similar roles
* Company's known compensation practices
* Recent funding rounds or financial health
4. **Title/Role Type:** Must be engineering leadership position
- Title MUST contain:
* "Engineering Manager" OR "Engineering Director" OR "Director of Engineering" OR "VP of Engineering" OR "Head of Engineering"
* If contains "Senior" or "Lead", must be in conjunction with above terms
- AUTOMATICALLY REJECT if title contains:
* "Structural", "Mechanical", "Civil", "Hardware", "Electrical"
* "TPM" or "Technical Program Manager"
* "Sales Engineer" or "Solutions Engineer"
* "Technical Account Manager"
* "Professional Services"
* "Project Manager"
* "Development Manager" (without "Software" or "Engineering")
* "Product Manager"
* "Site Reliability" or "SRE"
* "DevOps Manager" (without "Engineering")
- Valid Example Titles:
* "Senior Engineering Manager"
* "Engineering Manager"
* "Director of Engineering"
* "Senior Director, Engineering"
* "VP, Engineering"
* "Head of Engineering"
- Invalid Example Titles:
* "Senior Structural Engineer"
* "Lead Software Developer"
* "Technical Program Manager"
* "Engineering Project Manager"
* "Development Manager"
- Exclude roles like:
* Technical Program Manager (TPM)
* Sales Engineering Manager
* Solutions Engineering Manager
* Technical Account Manager
* Professional Services Manager
* Engineering Project Manager
## Technical/Role Matches should include:
1. Cloud-native technology companies or enterprises with significant cloud presence
2. Leadership roles managing software engineering teams (8+ engineers)
3. Focus on web services, distributed systems, or enterprise SaaS applications
4. Technologies involving: Cloud platforms (AWS), microservices, modern web technologies (React, Node.js), or enterprise security
5. Responsibilities for technical strategy, team scaling, and organizational growth
## Flag as poor matches if the role:
1. Primarily focuses on gaming, embedded systems, or hardware engineering
2. Requires specific domain expertise I lack (like automotive, aerospace, or networking hardware)
3. Emphasizes technologies not in my stack (like .NET, Windows development)
4. Is focused on individual contributor work rather than engineering leadership
5. Requires industry-specific certifications I don't have
## Analysis should include:
1. Initial dealbreaker screening (Location, Company, Salary, Title)
2. Overall match rating (Strong/Medium/Poor)
3. Key matching points
4. Any significant gaps or misalignments
5. Final recommendation
## Success Criteria:
The success boolean should be true only if:
- All dealbreakers pass (location, company, salary, title)
- Match rating is Strong or Medium
- No significant gaps exist
- Final recommendation is positive
<job_description>
Job Title: {job['title']}
Company: {job['company']}
Location: {job['location']}
Description: {job['description']}
Salary Range: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}
</job_description>
"""
logger.info(f"Querying LLM for Job ID {job['id']} with prompt:\n{prompt}")
try:
response = openai.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant that reads job postings."},
{"role": "user", "content": prompt}
],
max_tokens=512
)
response_text = response.choices[0].message.content
logger.info(f"LLM Response for Job ID {job['id']}:\n{response_text}")
# Store job and response for notification processing
results.append({
"job": job,
"analysis": response_text
})
except Exception as e:
logger.error(f"Error querying LLM for Job ID {job['id']}: {e}")
return results
@task()
def send_notifications(llm_results):
api_token = Variable.get("PUSHBULLET_API_TOKEN") # Store token in Airflow variables
for result in llm_results:
try:
# Extract the JSON response line
analysis_lines = result["analysis"].strip().split("\n")
response_line = next(line for line in analysis_lines if "START_RESPONSE" in line)
# Parse the JSON response
json_str = response_line.split("START_RESPONSE::")[1].strip()
analysis = json.loads(json_str)
if analysis.get("success"):
job = result["job"]
message = f"Title: {job['title']}\nCompany: {job['company']}\nLocation: {job['location']}\nSalary: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}"
headers = {
'Access-Token': api_token,
'Content-Type': 'application/json'
}
data = {
'body': message,
'title': 'Job Alert - Good Match Found!',
'type': 'link',
'url': job['job_url']
}
response = requests.post('https://api.pushbullet.com/v2/pushes',
headers=headers,
json=data)
if response.status_code == 200:
logger.info(f"Notification sent successfully for job ID {job['id']}")
else:
logger.error(f"Failed to send notification: {response.status_code}, {response.text}")
except Exception as e:
logger.error(f"Error processing notification: {e}")
# TaskFlow dependencies
api_results = fetch_api_data()
new_jobs = insert_into_mongodb(api_results)
llm_results = query_llm(new_jobs)
send_notifications(llm_results)
if __name__ == "__main__":
dag.test()