Saturday, December 10, 2016

Job Recommendation System

Job Recommendation System

Emily Rayfield, Shuai Hao

MATH 5800-031 Fall 2016

Link to Presentation: https://www.youtube.com/watch?v=zSgpmDkTJ3M&feature=youtu.be

Background

Our system is used to recommend job seekers jobs based on their click history when job ads are sent to users. In our system we use the dataset on the Kaggle website, which includes basic information for users and jobs. The objective of our system is to recommend jobs to users based on job descriptions. To realize our objective, we use the Python and Spark framework. The basic method we use in the system is TF-IDF using a separate stopword removal function and Naive Bayes Classification. We use TF-IDF to text mine to reflect the importance of terms to our users. The Naive Bayes Classifiers are trained using the results from the TF-IDF and used to predict jobs.

Data

We use the datasets on the Kaggle website, which include information on jobs and users as well as users' click history. The 'users' dataset contains User IDs as well as city, state, country, zip code, degree type, major, graduation date, work history, total years experience, and whether the user is currently employed. The 'clicks' dataset contains the job IDs that each user clicked on. Finally the 'jobs' dataset includes job title, description, requirements, city, state, country, zip code, start date, and end date. There are 389708 users and 1092096 jobs total. The system is built to predict whether the users will click the job. There is a challenge for us to analyze the data since we have several non numeric variables and we need to analyze the importance of these variables.

Methodology

TF-IDF
Since we have several aspects that influence the choices of users, we use the TF-IDF (term frequency - inverse document frequency) to text mine to reflect the importance of the term in the document. The IDF (inverse document frequency) is a numeric measure of how much information a term provides. Denote a term by t, a document by d, and the corpus by D. Term frequency TF(t, d) is the number of times that term t appears in document d. Document frequency DF(t, D) is the number of documents that contain term t.
We use the Tokenizer from pyspark.ml.feature to create a term column. Then we use the HashingTF function to take sets of terms and convert those sets into fixed-length feature vectors. Then we use the IDF model to take feature vectors. Then we calculate the TF-IDF, which will be used in the second part of the system.
Naive Bayes Classifier
The Naive Bayes classifier is a probabilistic framework for solving classification problems. This classifier is based on Bayes Theorem, which describes the probability of an event based on prior knowledge of conditions that might be related to the event. Naive Bayes assumes that each pair of features are independent from one another. We use the Naive Bayes algorithm to predict whether a user will click on a job link. We choose labels 1 and 0 to represent whether a user clicks a job, with 1 meaning yes and 0 meaning no. Then we use the model to predict the outcome for each job, get a list of recommended jobs for each user, and save the output to a file.

Results

The code for all users is below, but we will take one user as an example. We ran the model on User ID 47 and it returned 2453 recommended jobs total. The first part of the output, showing the Job IDs of the first 10 predicted jobs, was [Row(_2=u'245'), Row(_2=u'252'), Row(_2=u'964'), Row(_2=u'1095'), Row(_2=u'1732'), Row(_2=u'1832'), Row(_2=u'1838'), Row(_2=u'2095'), Row(_2=u'2100'), Row(_2=u'2106')].
Finally, our system can be improved. It currently uses job titles and descriptions to predict jobs, but factoring in other variables with more user information such as degree type, major, or years of experience could help narrow down job recommendations.

Code

We first unload the datasets and create a list of common words to remove:
 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer 
from pyspark.ml.classification import NaiveBayes 
from pyspark.sql import SQLContext, Row 
import re 
 
sqlContext = SQLContext(sc) 
 
PATTERN = re.compile(r'''((?:[^,"']|"[^"]*"|'[^']*')+)''') 
 
# Regular expressions to remove stop words 
STOP_WORDS_LIST = ["a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "aren't", 
                   "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", 
                   "can't", "cannot", "could", "couldn't", "did", "didn't", "do", "does", "doesn't", "doing", "don't", 
                   "down", "during", "each", "few", "for", "from", "further", "had", "hadn't", "has", "hasn't", "have", 
                   "haven't", "having", "he", "he'd", "he'll", "he's", "her", "here", "here's", "hers", "herself", 
                   "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", 
                   "isn't", "it", "it's", "its", "itself", "let's", "me", "more", "most", "mustn't", "my", "myself", 
                   "no", "nor", "not", "of", "off", "on", "once", "only", "or", "other", "ought", "our", "ours", 
                   "ourselves", "out", "over", "own", "same", "shan't", "she", "she'd", "she'll", "she's", "should", 
                   "shouldn't", "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", 
                   "themselves", "then", "there", "there's", "these", "they", "they'd", "they'll", "they're", "they've", 
                   "this", "those", "through", "to", "too", "under", "until", "up", "very", "was", "wasn't", "we", 
                   "we'd", "we'll", "we're", "we've", "were", "weren't", "what", "what's", "when", "when's", "where", 
                   "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", "won't", "would", 
                   "wouldn't", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves"] 
 
# Unload users dataset 
users = sc.textFile("file:///home//emr15007//users.tsv").map(lambda line: line.split("\t")) 
header_users = users.first()  # Extract header 
usersRDD = users.filter(lambda row: row != header_users)  # Filter out header 
 
# Unload clicks dataset 
clicks = sc.textFile("file:///home//emr15007//clicks.tsv").map(lambda line: line.split("\t")) 
header_clicks = clicks.first()  # Extract header 
clicksRDD = clicks.filter(lambda row: row != header_clicks)  # Filter out header 
 
# Unload jobs dataset 
jobs = sc.textFile("file:///home//emr15007//jobs.tsv").map(lambda line: line.split("\t")) 
header_jobs = jobs.first()  # Extract header 
jobsRDD = jobs.filter(lambda row: row != header_jobs)  # Filter out header 
 
# Convert to DataFrame 
jobsDF = jobsRDD.toDF() 
 
Next we create two functions. The remover function removes stopwords from a string and the clean_html function removes unnecessary characters:
 
 
def remover(cstr): 
    keywords_list = cstr.lower().split()  # Split a string into individual words 
    resarr = list(set(keywords_list).difference(set(STOP_WORDS_LIST)))  # Get list of non-stopwords 
    return " ".join(resarr)  # Combine non-stopwords back into one string 
 
 
# Remove unwanted characters 
def clean_html(raw_html): 
    clean_r = re.compile('.*?') 
    clean_text = re.sub(clean_r, '', raw_html) 
    clean_text = clean_text.replace('\\r', '').replace('\\n', '').replace(' ', '').replace('ojp’s', '') 
    return clean_text 
 
In the next part of the code we clean the job descriptions using the above functions and perform the TF-IDF:
 
# Create a DataFrame with concatenated job title and description, as well as job ID 
jobs_features = jobsDF.rdd.map(lambda x: (remover(clean_html(x[2] + ' ' + x[3])), x[0])) 
jobs_featuresDF = sqlContext.createDataFrame(jobs_features) 
 
# Tokenizer to create a column of individual terms 
tokenizer = Tokenizer(inputCol="_1", outputCol="terms") 
termsData = tokenizer.transform(jobs_featuresDF) 
 
# Generate the term frequency vectors using HashingTF 
tf = HashingTF(inputCol="terms", outputCol="rawFeatures").transform(termsData) 
 
# IDF (down-weights columns which appear frequently in a corpus) 
idf = IDF(inputCol="rawFeatures", outputCol="features").fit(tf) 
 
# TF-IDF 
tfidf = idf.transform(tf) 
tfidf.cache() 
Finally we generate a Naive Bayes Classifier for each user and use it to get a list of predicted jobs:
 
# RDDs to be used for labels 
one = sc.parallelize([1.0]) 
zero = sc.parallelize([0.0]) 
 
# Get a list of user IDs 
user_IDs = usersRDD.map(lambda x: x[0]) 
user_list = user_IDs.collect() 
 
# Convert to a set 
user_set = set(user_list) 
 
recommended_jobs = [] 
 
# Loop through all users to generate a Naive Bayes Classifier and predict jobs for each one 
for user in user_set: 
    # Get job IDs of jobs that the user clicked 
    clicks_sample = clicksRDD.filter(lambda row: row[0] == user) 
    jobs_clicked = [item[2] for item in clicks_sample.collect()] 
    # Get jobs that the user did and did not click on; set labels 1.0 = clicked on; 0.0 = did not click on 
    yes = jobsRDD.filter(lambda row: row[0] in jobs_clicked) 
    jobs_yes = yes.cartesian(one) 
    no = jobsRDD.filter(lambda row: row[0] not in jobs_clicked) 
    jobs_no = no.cartesian(zero) 
    jobs_labeled = jobs_yes.union(jobs_no) 
    # Put labeled jobs back in original order 
    jobs_sorted = jobs_labeled.sortBy(lambda x: x[0][0]) 
    labels = jobs_sorted.map(lambda x: (x[1], x[0][0]))  # Put labels into another RDD along with Job ID 
    labelsDF = labels.toDF()  # Convert to DataFrame 
    labelsDF_new = labelsDF.selectExpr('_1 as label', '_2 as _2')  # Rename label column 
    # Train a Naive Bayes model 
    trainingDF = labelsDF_new.join(tfidf, '_2', 'inner') 
    nb = NaiveBayes(featuresCol="features", labelCol="label") 
    model = nb.fit(trainingDF) 
    # Use model to predict outcome of each job 
    predictions = model.transform(tfidf) 
    # Get job IDs for only jobs that model predicts user will click on 
    jobs_predicted = predictions.filter(predictions.prediction == 1.0).select('_2') 
    user_and_jobs = (user[0], jobs_predicted)  # Combine user ID and suggested job IDs into tuple 
    recommended_jobs.append(user_and_jobs) 
 
# Save predicted jobs for all users to file 
results = sc.parallelize(recommended_jobs) 
results.saveAsTextFile('file:///home//emr15007//jobs_output.txt')