first commit
This commit is contained in:
commit
b4c2d0922a
27
Dockerfile
Normal file
27
Dockerfile
Normal file
@ -0,0 +1,27 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
python3-dev \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create and set working directory
|
||||
WORKDIR /app
|
||||
|
||||
# Install Python packages
|
||||
RUN pip install --no-cache-dir \
|
||||
apache-airflow==2.7.3 \
|
||||
pymongo==4.6.1 \
|
||||
graypy==2.1.0 \
|
||||
openai==1.12.0 \
|
||||
requests==2.31.0
|
||||
|
||||
# Create a non-root user
|
||||
RUN useradd -m airflow
|
||||
USER airflow
|
||||
|
||||
# Keep container running
|
||||
CMD ["sleep", "infinity"]
|
||||
292
job-bot.py
Normal file
292
job-bot.py
Normal file
@ -0,0 +1,292 @@
|
||||
from airflow import DAG
|
||||
from airflow.decorators import task
|
||||
from airflow.models import Variable
|
||||
from datetime import datetime, timedelta
|
||||
from pymongo import MongoClient
|
||||
|
||||
import requests
|
||||
import os
|
||||
import json
|
||||
import openai
|
||||
import graypy
|
||||
import logging
|
||||
from airflow.operators.python import PythonVirtualenvOperator
|
||||
|
||||
DEBUG = True
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'depends_on_past': False,
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': timedelta(minutes=5),
|
||||
}
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger('JobBot')
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Configure graypy handler for GELF UDP
|
||||
graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201)
|
||||
graylog_handler.include_logger_name = True
|
||||
logger.addHandler(graylog_handler)
|
||||
|
||||
requirements = [
|
||||
"pymongo==4.6.1",
|
||||
"graypy==2.1.0",
|
||||
"openai==1.12.0"
|
||||
]
|
||||
|
||||
with DAG(
|
||||
'job_bot_api_dag',
|
||||
default_args=default_args,
|
||||
description='A DAG to fetch data from job-bot API and process it',
|
||||
schedule=timedelta(hours=1),
|
||||
start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day
|
||||
catchup=False,
|
||||
tags=['job-bot', 'api'],
|
||||
) as dag:
|
||||
|
||||
@task()
|
||||
def fetch_api_data():
|
||||
cache_file = '/tmp/job_bot_api_cache.json'
|
||||
api_results = None
|
||||
|
||||
logger.info("Fetching data from job-bot API...")
|
||||
|
||||
# # Check if cache file exists and is recent
|
||||
if os.path.exists(cache_file):
|
||||
cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
|
||||
#if cache_age < timedelta(hours=1):
|
||||
print("Using cached API response.")
|
||||
with open(cache_file, 'r') as f:
|
||||
api_results = json.load(f)
|
||||
|
||||
if api_results is None:
|
||||
url = 'http://job-bot:8000/api/v1/search_jobs'
|
||||
params = {
|
||||
'search_term': 'senior engineering manager',
|
||||
'location': 'San Francisco, CA',
|
||||
'description_format': 'html'
|
||||
}
|
||||
response = requests.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
api_results = response.json()
|
||||
with open(cache_file, 'w') as f:
|
||||
json.dump(api_results, f)
|
||||
|
||||
return api_results
|
||||
|
||||
@task()
|
||||
def insert_into_mongodb(api_results):
|
||||
mongo_username = "root"
|
||||
mongo_password = "passwd"
|
||||
client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@localhost:27017/")
|
||||
db = client["job_bot_db"]
|
||||
collection = db["job-bot"]
|
||||
|
||||
new_jobs = []
|
||||
for job in api_results.get("jobs", []):
|
||||
result = collection.update_one(
|
||||
{"id": job["id"]},
|
||||
{"$set": job},
|
||||
upsert=True
|
||||
)
|
||||
if result.upserted_id:
|
||||
new_jobs.append(job)
|
||||
|
||||
# If no new jobs, always pass at least one job (the first one) to the LLM
|
||||
# Test by searching for a specific job ID
|
||||
test_job = collection.find_one({"id": "in-20166e6a5b1aeede"})
|
||||
if test_job and DEBUG:
|
||||
# Convert MongoDB document to dict and remove _id field
|
||||
test_job = dict(test_job)
|
||||
test_job.pop('_id', None)
|
||||
logger.info("Found test job, using it for LLM testing")
|
||||
new_jobs = [test_job]
|
||||
if not new_jobs and api_results.get("jobs") and DEBUG:
|
||||
logger.info("No new jobs inserted, passing the first job to LLM for debugging.")
|
||||
new_jobs = [api_results["jobs"][0]]
|
||||
else:
|
||||
logger.info(f"Inserted/Updated {len(new_jobs)} new jobs into MongoDB.")
|
||||
|
||||
return new_jobs
|
||||
|
||||
@task()
|
||||
def query_llm(new_jobs):
|
||||
results = [] # Store results for downstream tasks
|
||||
openai.api_key = Variable.get("OPENAI_API_KEY")
|
||||
if not new_jobs:
|
||||
logger.info("No new jobs to process with LLM.")
|
||||
return results
|
||||
|
||||
for job in new_jobs:
|
||||
prompt = f"""
|
||||
Please analyze this job description and evaluate if it's a good match for my background as a senior engineering manager. Structure the response with a machine-readable first line followed by the detailed analysis.
|
||||
|
||||
START_RESPONSE::{{"success": boolean, "location_pass": boolean, "salary_pass": boolean, "title_pass": boolean, "match_rating": "Strong|Medium|Poor"}}
|
||||
|
||||
## DEAL BREAKERS:
|
||||
1. **Location:** For non-remote/hybrid roles, must be within reasonable driving distance of Mill Valley, CA (acceptable areas: San Francisco, North Bay, East Bay within 1 hour drive)
|
||||
2. **Company:** Exclude positions at Amazon/AWS as an employer (working with AWS technology is fine)
|
||||
3. **Compensation:** Base salary must meet or exceed $250k
|
||||
- If salary range is provided: Check against minimum requirement
|
||||
- If salary not provided: Evaluate based on:
|
||||
* Company Tier Assessment:
|
||||
- Tier 1 (Automatic Pass): FAANG/MANGA (except Amazon), Major Tech Leaders (Uber, Lyft, Airbnb, Stripe, etc.), Well-funded Unicorns
|
||||
- Tier 2 (Likely Pass): Established public tech companies, Later-stage startups with significant funding ($500M+ valuation)
|
||||
- Tier 3 (Requires Verification): Early-stage startups, smaller public companies, non-tech enterprises
|
||||
* Additional Factors:
|
||||
* Location-based pay scales (SF Bay Area premium)
|
||||
* Industry standards for similar roles
|
||||
* Company's known compensation practices
|
||||
* Recent funding rounds or financial health
|
||||
4. **Title/Role Type:** Must be engineering leadership position
|
||||
- Title MUST contain:
|
||||
* "Engineering Manager" OR "Engineering Director" OR "Director of Engineering" OR "VP of Engineering" OR "Head of Engineering"
|
||||
* If contains "Senior" or "Lead", must be in conjunction with above terms
|
||||
|
||||
- AUTOMATICALLY REJECT if title contains:
|
||||
* "Structural", "Mechanical", "Civil", "Hardware", "Electrical"
|
||||
* "TPM" or "Technical Program Manager"
|
||||
* "Sales Engineer" or "Solutions Engineer"
|
||||
* "Technical Account Manager"
|
||||
* "Professional Services"
|
||||
* "Project Manager"
|
||||
* "Development Manager" (without "Software" or "Engineering")
|
||||
* "Product Manager"
|
||||
* "Site Reliability" or "SRE"
|
||||
* "DevOps Manager" (without "Engineering")
|
||||
|
||||
- Valid Example Titles:
|
||||
* "Senior Engineering Manager"
|
||||
* "Engineering Manager"
|
||||
* "Director of Engineering"
|
||||
* "Senior Director, Engineering"
|
||||
* "VP, Engineering"
|
||||
* "Head of Engineering"
|
||||
|
||||
- Invalid Example Titles:
|
||||
* "Senior Structural Engineer"
|
||||
* "Lead Software Developer"
|
||||
* "Technical Program Manager"
|
||||
* "Engineering Project Manager"
|
||||
* "Development Manager"
|
||||
- Exclude roles like:
|
||||
* Technical Program Manager (TPM)
|
||||
* Sales Engineering Manager
|
||||
* Solutions Engineering Manager
|
||||
* Technical Account Manager
|
||||
* Professional Services Manager
|
||||
* Engineering Project Manager
|
||||
|
||||
## Technical/Role Matches should include:
|
||||
1. Cloud-native technology companies or enterprises with significant cloud presence
|
||||
2. Leadership roles managing software engineering teams (8+ engineers)
|
||||
3. Focus on web services, distributed systems, or enterprise SaaS applications
|
||||
4. Technologies involving: Cloud platforms (AWS), microservices, modern web technologies (React, Node.js), or enterprise security
|
||||
5. Responsibilities for technical strategy, team scaling, and organizational growth
|
||||
|
||||
## Flag as poor matches if the role:
|
||||
1. Primarily focuses on gaming, embedded systems, or hardware engineering
|
||||
2. Requires specific domain expertise I lack (like automotive, aerospace, or networking hardware)
|
||||
3. Emphasizes technologies not in my stack (like .NET, Windows development)
|
||||
4. Is focused on individual contributor work rather than engineering leadership
|
||||
5. Requires industry-specific certifications I don't have
|
||||
|
||||
## Analysis should include:
|
||||
1. Initial dealbreaker screening (Location, Company, Salary, Title)
|
||||
2. Overall match rating (Strong/Medium/Poor)
|
||||
3. Key matching points
|
||||
4. Any significant gaps or misalignments
|
||||
5. Final recommendation
|
||||
|
||||
## Success Criteria:
|
||||
The success boolean should be true only if:
|
||||
- All dealbreakers pass (location, company, salary, title)
|
||||
- Match rating is Strong or Medium
|
||||
- No significant gaps exist
|
||||
- Final recommendation is positive
|
||||
|
||||
<job_description>
|
||||
Job Title: {job['title']}
|
||||
Company: {job['company']}
|
||||
Location: {job['location']}
|
||||
Description: {job['description']}
|
||||
Salary Range: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}
|
||||
</job_description>
|
||||
"""
|
||||
logger.info(f"Querying LLM for Job ID {job['id']} with prompt:\n{prompt}")
|
||||
try:
|
||||
response = openai.chat.completions.create(
|
||||
model="gpt-4.1-mini",
|
||||
messages=[
|
||||
{"role": "system", "content": "You are a helpful assistant that reads job postings."},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
max_tokens=512
|
||||
)
|
||||
response_text = response.choices[0].message.content
|
||||
logger.info(f"LLM Response for Job ID {job['id']}:\n{response_text}")
|
||||
|
||||
# Store job and response for notification processing
|
||||
results.append({
|
||||
"job": job,
|
||||
"analysis": response_text
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Error querying LLM for Job ID {job['id']}: {e}")
|
||||
|
||||
return results
|
||||
|
||||
@task()
|
||||
def send_notifications(llm_results):
|
||||
api_token = Variable.get("PUSHBULLET_API_TOKEN") # Store token in Airflow variables
|
||||
|
||||
for result in llm_results:
|
||||
try:
|
||||
# Extract the JSON response line
|
||||
analysis_lines = result["analysis"].strip().split("\n")
|
||||
response_line = next(line for line in analysis_lines if "START_RESPONSE" in line)
|
||||
|
||||
# Parse the JSON response
|
||||
json_str = response_line.split("START_RESPONSE::")[1].strip()
|
||||
analysis = json.loads(json_str)
|
||||
|
||||
if analysis.get("success"):
|
||||
job = result["job"]
|
||||
message = f"Title: {job['title']}\nCompany: {job['company']}\nLocation: {job['location']}\nSalary: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}"
|
||||
|
||||
headers = {
|
||||
'Access-Token': api_token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
data = {
|
||||
'body': message,
|
||||
'title': 'Job Alert - Good Match Found!',
|
||||
'type': 'link',
|
||||
'url': job['job_url']
|
||||
}
|
||||
|
||||
response = requests.post('https://api.pushbullet.com/v2/pushes',
|
||||
headers=headers,
|
||||
json=data)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Notification sent successfully for job ID {job['id']}")
|
||||
else:
|
||||
logger.error(f"Failed to send notification: {response.status_code}, {response.text}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing notification: {e}")
|
||||
|
||||
# TaskFlow dependencies
|
||||
api_results = fetch_api_data()
|
||||
new_jobs = insert_into_mongodb(api_results)
|
||||
llm_results = query_llm(new_jobs)
|
||||
send_notifications(llm_results)
|
||||
|
||||
if __name__ == "__main__":
|
||||
dag.test()
|
||||
Loading…
x
Reference in New Issue
Block a user