287 lines
13 KiB
Python
287 lines
13 KiB
Python
from airflow import DAG
|
|
from airflow.decorators import task
|
|
from airflow.models import Variable
|
|
from datetime import datetime, timedelta
|
|
from pymongo import MongoClient
|
|
|
|
import requests
|
|
import os
|
|
import json
|
|
import openai
|
|
import graypy
|
|
import logging
|
|
|
|
DEBUG = False
|
|
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger('JobBot')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Configure graypy handler for GELF UDP
|
|
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
|
|
graylog_handler.include_logger_name = True
|
|
logger.addHandler(graylog_handler)
|
|
|
|
with DAG(
|
|
'job_bot_api_dag',
|
|
default_args=default_args,
|
|
description='A DAG to fetch data from job-bot API and process it',
|
|
schedule='*/10 * * * *', # Every 10 minutes
|
|
start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day
|
|
catchup=False,
|
|
max_active_runs=1,
|
|
tags=['job-bot', 'api'],
|
|
) as dag:
|
|
|
|
@task()
|
|
def fetch_api_data():
|
|
cache_file = '/tmp/job_bot_api_cache.json'
|
|
api_results = None
|
|
|
|
logger.info("Fetching data from job-bot API...")
|
|
|
|
# # Check if cache file exists and is recent
|
|
if DEBUG and os.path.exists(cache_file):
|
|
cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
|
|
#if cache_age < timedelta(hours=1):
|
|
print("Using cached API response.")
|
|
with open(cache_file, 'r') as f:
|
|
api_results = json.load(f)
|
|
|
|
if api_results is None:
|
|
url = 'http://job-bot:8000/api/v1/search_jobs'
|
|
params = {
|
|
'search_term': 'senior engineering manager',
|
|
'location': 'San Francisco, CA',
|
|
'description_format': 'html'
|
|
}
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
api_results = response.json()
|
|
with open(cache_file, 'w') as f:
|
|
json.dump(api_results, f)
|
|
|
|
logger.info(f"API returned {len(api_results.get('jobs', []))} jobs")
|
|
return api_results
|
|
|
|
@task()
|
|
def insert_into_mongodb(api_results):
|
|
mongo_username = "root"
|
|
mongo_password = "passwd"
|
|
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@job-bot.localdomain:27017/")
|
|
db = client["job_bot_db"]
|
|
collection = db["job-bot"]
|
|
|
|
new_jobs = []
|
|
for job in api_results.get("jobs", []):
|
|
result = collection.update_one(
|
|
{"id": job["id"]},
|
|
{"$set": job},
|
|
upsert=True
|
|
)
|
|
if result.upserted_id:
|
|
new_jobs.append(job)
|
|
|
|
# If no new jobs, always pass at least one job (the first one) to the LLM
|
|
# Test by searching for a specific job ID
|
|
test_job = collection.find_one({"id": "in-20166e6a5b1aeede"})
|
|
if test_job and DEBUG:
|
|
# Convert MongoDB document to dict and remove _id field
|
|
test_job = dict(test_job)
|
|
test_job.pop('_id', None)
|
|
logger.info("Found test job, using it for LLM testing")
|
|
new_jobs = [test_job]
|
|
if not new_jobs and api_results.get("jobs") and DEBUG:
|
|
logger.info("No new jobs inserted, passing the first job to LLM for debugging.")
|
|
new_jobs = [api_results["jobs"][0]]
|
|
else:
|
|
logger.info(f"Inserted/Updated {len(new_jobs)} new jobs into MongoDB.")
|
|
|
|
return new_jobs
|
|
|
|
@task()
|
|
def query_llm(new_jobs):
|
|
results = [] # Store results for downstream tasks
|
|
openai.api_key = Variable.get("OPENAI_API_KEY")
|
|
if not new_jobs:
|
|
logger.info("No new jobs to process with LLM.")
|
|
return results
|
|
|
|
for job in new_jobs:
|
|
prompt = f"""
|
|
Please analyze this job description and evaluate if it's a good match for my background as a senior engineering manager. Structure the response with a machine-readable first line followed by the detailed analysis.
|
|
|
|
START_RESPONSE::{{"success": boolean, "location_pass": boolean, "salary_pass": boolean, "title_pass": boolean, "match_rating": "Strong|Medium|Poor"}}
|
|
|
|
## DEAL BREAKERS:
|
|
1. **Location:** For non-remote/hybrid roles, must be within reasonable driving distance of Mill Valley, CA (acceptable areas: San Francisco, North Bay, East Bay within 1 hour drive)
|
|
2. **Company:** Exclude positions at Amazon/AWS as an employer (working with AWS technology is fine)
|
|
3. **Compensation:** Base salary must meet or exceed $250k
|
|
- If salary range is provided: Check against minimum requirement
|
|
- If salary not provided: Evaluate based on:
|
|
* Company Tier Assessment:
|
|
- Tier 1 (Automatic Pass): FAANG/MANGA (except Amazon), Major Tech Leaders (Uber, Lyft, Airbnb, Stripe, etc.), Well-funded Unicorns
|
|
- Tier 2 (Likely Pass): Established public tech companies, Later-stage startups with significant funding ($500M+ valuation)
|
|
- Tier 3 (Requires Verification): Early-stage startups, smaller public companies, non-tech enterprises
|
|
* Additional Factors:
|
|
* Location-based pay scales (SF Bay Area premium)
|
|
* Industry standards for similar roles
|
|
* Company's known compensation practices
|
|
* Recent funding rounds or financial health
|
|
4. **Title/Role Type:** Must be engineering leadership position
|
|
- Title MUST contain:
|
|
* "Engineering Manager" OR "Engineering Director" OR "Director of Engineering" OR "VP of Engineering" OR "Head of Engineering"
|
|
* If contains "Senior" or "Lead", must be in conjunction with above terms
|
|
|
|
- AUTOMATICALLY REJECT if title contains:
|
|
* "Structural", "Mechanical", "Civil", "Hardware", "Electrical"
|
|
* "TPM" or "Technical Program Manager"
|
|
* "Sales Engineer" or "Solutions Engineer"
|
|
* "Technical Account Manager"
|
|
* "Professional Services"
|
|
* "Project Manager"
|
|
* "Development Manager" (without "Software" or "Engineering")
|
|
* "Product Manager"
|
|
* "Site Reliability" or "SRE"
|
|
* "DevOps Manager" (without "Engineering")
|
|
|
|
- Valid Example Titles:
|
|
* "Senior Engineering Manager"
|
|
* "Engineering Manager"
|
|
* "Director of Engineering"
|
|
* "Senior Director, Engineering"
|
|
* "VP, Engineering"
|
|
* "Head of Engineering"
|
|
|
|
- Invalid Example Titles:
|
|
* "Senior Structural Engineer"
|
|
* "Lead Software Developer"
|
|
* "Technical Program Manager"
|
|
* "Engineering Project Manager"
|
|
* "Development Manager"
|
|
- Exclude roles like:
|
|
* Technical Program Manager (TPM)
|
|
* Sales Engineering Manager
|
|
* Solutions Engineering Manager
|
|
* Technical Account Manager
|
|
* Professional Services Manager
|
|
* Engineering Project Manager
|
|
|
|
## Technical/Role Matches should include:
|
|
1. Cloud-native technology companies or enterprises with significant cloud presence
|
|
2. Leadership roles managing software engineering teams (8+ engineers)
|
|
3. Focus on web services, distributed systems, or enterprise SaaS applications
|
|
4. Technologies involving: Cloud platforms (AWS), microservices, modern web technologies (React, Node.js), or enterprise security
|
|
5. Responsibilities for technical strategy, team scaling, and organizational growth
|
|
|
|
## Flag as poor matches if the role:
|
|
1. Primarily focuses on gaming, embedded systems, or hardware engineering
|
|
2. Requires specific domain expertise I lack (like automotive, aerospace, or networking hardware)
|
|
3. Emphasizes technologies not in my stack (like .NET, Windows development)
|
|
4. Is focused on individual contributor work rather than engineering leadership
|
|
5. Requires industry-specific certifications I don't have
|
|
|
|
## Analysis should include:
|
|
1. Initial dealbreaker screening (Location, Company, Salary, Title)
|
|
2. Overall match rating (Strong/Medium/Poor)
|
|
3. Key matching points
|
|
4. Any significant gaps or misalignments
|
|
5. Final recommendation
|
|
|
|
## Success Criteria:
|
|
The success boolean should be true only if:
|
|
- All dealbreakers pass (location, company, salary, title)
|
|
- Match rating is Strong or Medium
|
|
- No significant gaps exist
|
|
- Final recommendation is positive
|
|
|
|
<job_description>
|
|
Job Title: {job['title']}
|
|
Company: {job['company']}
|
|
Location: {job['location']}
|
|
Description: {job['description']}
|
|
Salary Range: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}
|
|
</job_description>
|
|
"""
|
|
logger.info(f"Querying LLM for Job ID {job['id']} with prompt:\n{prompt}")
|
|
try:
|
|
response = openai.chat.completions.create(
|
|
model="gpt-4.1-mini",
|
|
messages=[
|
|
{"role": "system", "content": "You are a helpful assistant that reads job postings."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
max_tokens=512
|
|
)
|
|
response_text = response.choices[0].message.content
|
|
logger.info(f"LLM Response for Job ID {job['id']}:\n{response_text}")
|
|
|
|
# Store job and response for notification processing
|
|
results.append({
|
|
"job": job,
|
|
"analysis": response_text
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error querying LLM for Job ID {job['id']}: {e}")
|
|
|
|
return results
|
|
|
|
@task()
|
|
def send_notifications(llm_results):
|
|
api_token = Variable.get("PUSHBULLET_API_TOKEN") # Store token in Airflow variables
|
|
|
|
for result in llm_results:
|
|
try:
|
|
# Extract the JSON response line
|
|
analysis_lines = result["analysis"].strip().split("\n")
|
|
response_line = next(line for line in analysis_lines if "START_RESPONSE" in line)
|
|
|
|
# Parse the JSON response
|
|
json_str = response_line.split("START_RESPONSE::")[1].strip()
|
|
analysis = json.loads(json_str)
|
|
|
|
if analysis.get("success"):
|
|
job = result["job"]
|
|
message = f"Title: {job['title']}\nCompany: {job['company']}\nLocation: {job['location']}\nSalary: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}"
|
|
|
|
headers = {
|
|
'Access-Token': api_token,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
data = {
|
|
'body': message,
|
|
'title': 'Job Alert - Good Match Found!',
|
|
'type': 'link',
|
|
'url': job['job_url']
|
|
}
|
|
|
|
response = requests.post('https://api.pushbullet.com/v2/pushes',
|
|
headers=headers,
|
|
json=data)
|
|
|
|
if response.status_code == 200:
|
|
logger.info(f"Notification sent successfully for job ID {job['id']}")
|
|
else:
|
|
logger.error(f"Failed to send notification: {response.status_code}, {response.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing notification: {e}")
|
|
|
|
# TaskFlow dependencies
|
|
api_results = fetch_api_data()
|
|
new_jobs = insert_into_mongodb(api_results)
|
|
llm_results = query_llm(new_jobs)
|
|
send_notifications(llm_results)
|
|
|
|
if __name__ == "__main__":
|
|
dag.test() |