Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions linkedin_scraper/company.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .person import Person
import time
import os
from urllib.parse import quote_plus
import json

AD_BANNER_CLASSNAME = ('ad-banner-container', '__ad')
Expand Down Expand Up @@ -49,7 +50,7 @@ class Company(Scraper):
employees = []
headcount = None

def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True):
def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True, employee_search_keywords = None, timeout = 10):
self.linkedin_url = linkedin_url
self.name = name
self.about_us = about_us
Expand All @@ -63,6 +64,12 @@ def __init__(self, linkedin_url = None, name = None, about_us =None, website = N
self.specialties = specialties
self.showcase_pages = showcase_pages
self.affiliated_companies = affiliated_companies
self.employee_search_keywords = employee_search_keywords
self.timeout = timeout

# Validation: Check if keywords provided but get_employees is False
if employee_search_keywords and not get_employees:
raise ValueError("Cannot use employee_search_keywords when get_employees=False. Set get_employees=True to filter employees by keywords.")

if driver is None:
try:
Expand Down Expand Up @@ -118,53 +125,67 @@ def __parse_employee__(self, employee_raw):
# print(e)
return None

def get_employees(self, wait_time=10):

def get_employees(self, keywords=None):
total = []
list_css = "list-style-none"
employee_xpath = '//div[contains(@class, "artdeco-entity-lockup")]'
next_xpath = '//button[@aria-label="Next"]'
driver = self.driver

try:
see_all_employees = driver.find_element(By.XPATH,'//a[@data-control-name="topcard_see_all_employees"]')
except:
pass
driver.get(os.path.join(self.linkedin_url, "people"))

# Construct URL with keyword search if provided
people_url = f"{self.linkedin_url}/people"
if keywords:
# Join keywords and URL encode them
keyword_string = " ".join(keywords) if isinstance(keywords, list) else str(keywords)
encoded_keywords = quote_plus(keyword_string)
people_url = f"{people_url}/?keywords={encoded_keywords}"

driver.get(people_url)

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//span[@dir="ltr"]')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, employee_xpath)))

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")
time.sleep(1)
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight*3/4));")
time.sleep(1)

results_list = driver.find_element(By.CLASS_NAME, list_css)
results_li = results_list.find_elements(By.TAG_NAME, "li")
for res in results_li:
total.append(self.__parse_employee__(res))
# Get employee elements directly
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
for res in employee_elements:
employee = self.__parse_employee__(res)
if employee:
total.append(employee)

def is_loaded(previous_results):
loop = 0
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
results_li = results_list.find_elements(By.TAG_NAME, "li")
while len(results_li) == previous_results and loop <= 5:
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
while len(employee_elements) == previous_results and loop <= 5:
time.sleep(1)
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
results_li = results_list.find_elements(By.TAG_NAME, "li")
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
loop += 1
return loop <= 5

def get_data(previous_results):
results_li = results_list.find_elements(By.TAG_NAME, "li")
for res in results_li[previous_results:]:
total.append(self.__parse_employee__(res))

results_li_len = len(results_li)
while is_loaded(results_li_len):
employee_elements = driver.find_elements(By.XPATH, employee_xpath)
for res in employee_elements[previous_results:]:
employee = self.__parse_employee__(res)
if employee:
total.append(employee)

employee_count = len(employee_elements)
while is_loaded(employee_count):
try:
driver.find_element(By.XPATH,next_xpath).click()
except:
pass
_ = WebDriverWait(driver, wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, list_css)))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.XPATH, employee_xpath)))

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")
time.sleep(1)
Expand All @@ -175,8 +196,8 @@ def get_data(previous_results):
driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));")
time.sleep(1)

get_data(results_li_len)
results_li_len = len(total)
get_data(employee_count)
employee_count = len(total)
return total


Expand All @@ -186,7 +207,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):

driver.get(self.linkedin_url)

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]')))

navigation = driver.find_element(By.CLASS_NAME, "org-page-navigation__items ")

Expand All @@ -201,7 +222,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):
except:
driver.get(os.path.join(self.linkedin_url, "about"))

_ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section')))
time.sleep(3)

if 'Cookie Policy' in driver.find_elements(By.TAG_NAME, "section")[1].text or any(classname in driver.find_elements(By.TAG_NAME, "section")[1].get_attribute('class') for classname in AD_BANNER_CLASSNAME):
Expand Down Expand Up @@ -255,9 +276,8 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):

driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));")


try:
_ = WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list')))
_ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list')))
showcase, affiliated = driver.find_elements(By.CLASS_NAME, "company-list")
driver.find_element(By.ID,"org-related-companies-module__show-more-btn").click()

Expand All @@ -284,7 +304,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True):
pass

if get_employees:
self.employees = self.get_employees()
self.employees = self.get_employees(keywords=self.employee_search_keywords)

driver.get(self.linkedin_url)

Expand Down Expand Up @@ -313,7 +333,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e
# get showcase
try:
driver.find_element(By.ID,"view-other-showcase-pages-dialog").click()
WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.ID, 'dialog')))
WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.ID, 'dialog')))

showcase_pages = driver.find_elements(By.CLASS_NAME, "company-showcase-pages")[1]
for showcase_company in showcase_pages.find_elements(By.TAG_NAME, "li"):
Expand Down Expand Up @@ -344,7 +364,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e
pass

if get_employees:
self.employees = self.get_employees()
self.employees = self.get_employees(keywords=self.employee_search_keywords)

driver.get(self.linkedin_url)

Expand Down