From b4c2d0922a22e22621f8c60ba4922d26c52bc805 Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 20 May 2025 00:42:12 -0700 Subject: [PATCH] first commit --- Dockerfile | 27 +++++ job-bot.py | 292 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+) create mode 100644 Dockerfile create mode 100644 job-bot.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b76053f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM python:3.12-slim + +# Install system dependencies +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + build-essential \ + python3-dev \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Create and set working directory +WORKDIR /app + +# Install Python packages +RUN pip install --no-cache-dir \ + apache-airflow==2.7.3 \ + pymongo==4.6.1 \ + graypy==2.1.0 \ + openai==1.12.0 \ + requests==2.31.0 + +# Create a non-root user +RUN useradd -m airflow +USER airflow + +# Keep container running +CMD ["sleep", "infinity"] \ No newline at end of file diff --git a/job-bot.py b/job-bot.py new file mode 100644 index 0000000..6790a22 --- /dev/null +++ b/job-bot.py @@ -0,0 +1,292 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable +from datetime import datetime, timedelta +from pymongo import MongoClient + +import requests +import os +import json +import openai +import graypy +import logging +from airflow.operators.python import PythonVirtualenvOperator + +DEBUG = True + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +# Configure logging +logger = logging.getLogger('JobBot') +logger.setLevel(logging.INFO) + +# Configure graypy handler for GELF UDP +graylog_handler = graypy.GELFUDPHandler('graylog.localdomain', 12201) +graylog_handler.include_logger_name = True +logger.addHandler(graylog_handler) + +requirements = [ + "pymongo==4.6.1", + "graypy==2.1.0", + "openai==1.12.0" +] + +with DAG( + 'job_bot_api_dag', + default_args=default_args, + description='A DAG to fetch data from job-bot API and process it', + schedule=timedelta(hours=1), + start_date=datetime.now() - timedelta(days=1), # Changed to today-1 day + catchup=False, + tags=['job-bot', 'api'], +) as dag: + + @task() + def fetch_api_data(): + cache_file = '/tmp/job_bot_api_cache.json' + api_results = None + + logger.info("Fetching data from job-bot API...") + + # # Check if cache file exists and is recent + if os.path.exists(cache_file): + cache_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file)) + #if cache_age < timedelta(hours=1): + print("Using cached API response.") + with open(cache_file, 'r') as f: + api_results = json.load(f) + + if api_results is None: + url = 'http://job-bot:8000/api/v1/search_jobs' + params = { + 'search_term': 'senior engineering manager', + 'location': 'San Francisco, CA', + 'description_format': 'html' + } + response = requests.get(url, params=params) + response.raise_for_status() + api_results = response.json() + with open(cache_file, 'w') as f: + json.dump(api_results, f) + + return api_results + + @task() + def insert_into_mongodb(api_results): + mongo_username = "root" + mongo_password = "passwd" + client = MongoClient(f"mongodb://{mongo_username}:{mongo_password}@localhost:27017/") + db = client["job_bot_db"] + collection = db["job-bot"] + + new_jobs = [] + for job in api_results.get("jobs", []): + result = collection.update_one( + {"id": job["id"]}, + {"$set": job}, + upsert=True + ) + if result.upserted_id: + new_jobs.append(job) + + # If no new jobs, always pass at least one job (the first one) to the LLM + # Test by searching for a specific job ID + test_job = collection.find_one({"id": "in-20166e6a5b1aeede"}) + if test_job and DEBUG: + # Convert MongoDB document to dict and remove _id field + test_job = dict(test_job) + test_job.pop('_id', None) + logger.info("Found test job, using it for LLM testing") + new_jobs = [test_job] + if not new_jobs and api_results.get("jobs") and DEBUG: + logger.info("No new jobs inserted, passing the first job to LLM for debugging.") + new_jobs = [api_results["jobs"][0]] + else: + logger.info(f"Inserted/Updated {len(new_jobs)} new jobs into MongoDB.") + + return new_jobs + + @task() + def query_llm(new_jobs): + results = [] # Store results for downstream tasks + openai.api_key = Variable.get("OPENAI_API_KEY") + if not new_jobs: + logger.info("No new jobs to process with LLM.") + return results + + for job in new_jobs: + prompt = f""" + Please analyze this job description and evaluate if it's a good match for my background as a senior engineering manager. Structure the response with a machine-readable first line followed by the detailed analysis. + + START_RESPONSE::{{"success": boolean, "location_pass": boolean, "salary_pass": boolean, "title_pass": boolean, "match_rating": "Strong|Medium|Poor"}} + + ## DEAL BREAKERS: + 1. **Location:** For non-remote/hybrid roles, must be within reasonable driving distance of Mill Valley, CA (acceptable areas: San Francisco, North Bay, East Bay within 1 hour drive) + 2. **Company:** Exclude positions at Amazon/AWS as an employer (working with AWS technology is fine) + 3. **Compensation:** Base salary must meet or exceed $250k + - If salary range is provided: Check against minimum requirement + - If salary not provided: Evaluate based on: + * Company Tier Assessment: + - Tier 1 (Automatic Pass): FAANG/MANGA (except Amazon), Major Tech Leaders (Uber, Lyft, Airbnb, Stripe, etc.), Well-funded Unicorns + - Tier 2 (Likely Pass): Established public tech companies, Later-stage startups with significant funding ($500M+ valuation) + - Tier 3 (Requires Verification): Early-stage startups, smaller public companies, non-tech enterprises + * Additional Factors: + * Location-based pay scales (SF Bay Area premium) + * Industry standards for similar roles + * Company's known compensation practices + * Recent funding rounds or financial health + 4. **Title/Role Type:** Must be engineering leadership position + - Title MUST contain: + * "Engineering Manager" OR "Engineering Director" OR "Director of Engineering" OR "VP of Engineering" OR "Head of Engineering" + * If contains "Senior" or "Lead", must be in conjunction with above terms + + - AUTOMATICALLY REJECT if title contains: + * "Structural", "Mechanical", "Civil", "Hardware", "Electrical" + * "TPM" or "Technical Program Manager" + * "Sales Engineer" or "Solutions Engineer" + * "Technical Account Manager" + * "Professional Services" + * "Project Manager" + * "Development Manager" (without "Software" or "Engineering") + * "Product Manager" + * "Site Reliability" or "SRE" + * "DevOps Manager" (without "Engineering") + + - Valid Example Titles: + * "Senior Engineering Manager" + * "Engineering Manager" + * "Director of Engineering" + * "Senior Director, Engineering" + * "VP, Engineering" + * "Head of Engineering" + + - Invalid Example Titles: + * "Senior Structural Engineer" + * "Lead Software Developer" + * "Technical Program Manager" + * "Engineering Project Manager" + * "Development Manager" + - Exclude roles like: + * Technical Program Manager (TPM) + * Sales Engineering Manager + * Solutions Engineering Manager + * Technical Account Manager + * Professional Services Manager + * Engineering Project Manager + + ## Technical/Role Matches should include: + 1. Cloud-native technology companies or enterprises with significant cloud presence + 2. Leadership roles managing software engineering teams (8+ engineers) + 3. Focus on web services, distributed systems, or enterprise SaaS applications + 4. Technologies involving: Cloud platforms (AWS), microservices, modern web technologies (React, Node.js), or enterprise security + 5. Responsibilities for technical strategy, team scaling, and organizational growth + + ## Flag as poor matches if the role: + 1. Primarily focuses on gaming, embedded systems, or hardware engineering + 2. Requires specific domain expertise I lack (like automotive, aerospace, or networking hardware) + 3. Emphasizes technologies not in my stack (like .NET, Windows development) + 4. Is focused on individual contributor work rather than engineering leadership + 5. Requires industry-specific certifications I don't have + + ## Analysis should include: + 1. Initial dealbreaker screening (Location, Company, Salary, Title) + 2. Overall match rating (Strong/Medium/Poor) + 3. Key matching points + 4. Any significant gaps or misalignments + 5. Final recommendation + + ## Success Criteria: + The success boolean should be true only if: + - All dealbreakers pass (location, company, salary, title) + - Match rating is Strong or Medium + - No significant gaps exist + - Final recommendation is positive + + + Job Title: {job['title']} + Company: {job['company']} + Location: {job['location']} + Description: {job['description']} + Salary Range: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')} + + """ + logger.info(f"Querying LLM for Job ID {job['id']} with prompt:\n{prompt}") + try: + response = openai.chat.completions.create( + model="gpt-4.1-mini", + messages=[ + {"role": "system", "content": "You are a helpful assistant that reads job postings."}, + {"role": "user", "content": prompt} + ], + max_tokens=512 + ) + response_text = response.choices[0].message.content + logger.info(f"LLM Response for Job ID {job['id']}:\n{response_text}") + + # Store job and response for notification processing + results.append({ + "job": job, + "analysis": response_text + }) + except Exception as e: + logger.error(f"Error querying LLM for Job ID {job['id']}: {e}") + + return results + + @task() + def send_notifications(llm_results): + api_token = Variable.get("PUSHBULLET_API_TOKEN") # Store token in Airflow variables + + for result in llm_results: + try: + # Extract the JSON response line + analysis_lines = result["analysis"].strip().split("\n") + response_line = next(line for line in analysis_lines if "START_RESPONSE" in line) + + # Parse the JSON response + json_str = response_line.split("START_RESPONSE::")[1].strip() + analysis = json.loads(json_str) + + if analysis.get("success"): + job = result["job"] + message = f"Title: {job['title']}\nCompany: {job['company']}\nLocation: {job['location']}\nSalary: {job.get('min_amount', 'N/A')}-{job.get('max_amount', 'N/A')}" + + headers = { + 'Access-Token': api_token, + 'Content-Type': 'application/json' + } + + data = { + 'body': message, + 'title': 'Job Alert - Good Match Found!', + 'type': 'link', + 'url': job['job_url'] + } + + response = requests.post('https://api.pushbullet.com/v2/pushes', + headers=headers, + json=data) + + if response.status_code == 200: + logger.info(f"Notification sent successfully for job ID {job['id']}") + else: + logger.error(f"Failed to send notification: {response.status_code}, {response.text}") + + except Exception as e: + logger.error(f"Error processing notification: {e}") + + # TaskFlow dependencies + api_results = fetch_api_data() + new_jobs = insert_into_mongodb(api_results) + llm_results = query_llm(new_jobs) + send_notifications(llm_results) + + if __name__ == "__main__": + dag.test() \ No newline at end of file