Skip to content

Commit

Permalink
change send connection api
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuttam committed Oct 23, 2024
1 parent be97322 commit 5e144f9
Showing 1 changed file with 37 additions and 99 deletions.
136 changes: 37 additions & 99 deletions linkedin_api/linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
get_list_posts_sorted_without_promoted,
parse_list_raw_posts,
parse_list_raw_urns,
generate_trackingId,
generate_trackingId_as_charString,
)

Expand Down Expand Up @@ -47,9 +46,7 @@ class Linkedin(object):
_MAX_POST_COUNT = 100 # max seems to be 100 posts per page
_MAX_UPDATE_COUNT = 100 # max seems to be 100
_MAX_SEARCH_COUNT = 49 # max seems to be 49, and min seems to be 2
_MAX_REPEATED_REQUESTS = (
200 # VERY conservative max requests count to avoid rate-limit
)
_MAX_REPEATED_REQUESTS = 200 # VERY conservative max requests count to avoid rate-limit

def __init__(
self,
Expand Down Expand Up @@ -132,9 +129,7 @@ def get_profile_posts(
profile_urn = f"urn:li:fsd_profile:{urn_id}"
else:
profile = self.get_profile(public_id=public_id)
profile_urn = profile["profile_urn"].replace(
"fs_miniProfile", "fsd_profile"
)
profile_urn = profile["profile_urn"].replace("fs_miniProfile", "fsd_profile")
url_params["profileUrn"] = profile_urn
url = f"/identity/profileUpdatesV2"
res = self._fetch(url, params=url_params)
Expand Down Expand Up @@ -235,9 +230,7 @@ def search(self, params: Dict, limit=-1, offset=0) -> List:
default_params.update(params)

keywords = (
f"keywords:{default_params['keywords']},"
if "keywords" in default_params
else ""
f"keywords:{default_params['keywords']}," if "keywords" in default_params else ""
)

res = self._fetch(
Expand Down Expand Up @@ -271,10 +264,7 @@ def search(self, params: Dict, limit=-1, offset=0) -> List:
continue

for el in it.get("items", []):
if (
not el.get("_type", [])
== "com.linkedin.voyager.dash.search.SearchItem"
):
if not el.get("_type", []) == "com.linkedin.voyager.dash.search.SearchItem":
continue

e = el.get("item", {}).get("entityResult", [])
Expand Down Expand Up @@ -306,9 +296,7 @@ def search_people(
self,
keywords: Optional[str] = None,
connection_of: Optional[str] = None,
network_depths: Optional[
List[Union[Literal["F"], Literal["S"], Literal["O"]]]
] = None,
network_depths: Optional[List[Union[Literal["F"], Literal["S"], Literal["O"]]]] = None,
current_company: Optional[List[str]] = None,
past_companies: Optional[List[str]] = None,
nonprofit_interests: Optional[List[str]] = None,
Expand Down Expand Up @@ -432,17 +420,13 @@ def search_people(
for item in data:
if (
not include_private_profiles
and (item.get("entityCustomTrackingInfo") or {}).get(
"memberDistance", None
)
and (item.get("entityCustomTrackingInfo") or {}).get("memberDistance", None)
== "OUT_OF_NETWORK"
):
continue
results.append(
{
"urn_id": get_id_from_urn(
get_urn_from_raw_update(item.get("entityUrn", None))
),
"urn_id": get_id_from_urn(get_urn_from_raw_update(item.get("entityUrn", None))),
"distance": (item.get("entityCustomTrackingInfo") or {}).get(
"memberDistance", None
),
Expand Down Expand Up @@ -562,9 +546,7 @@ def search_jobs(
if limit is None:
limit = -1

query: Dict[str, Union[str, Dict[str, str]]] = {
"origin": "JOB_SEARCH_PAGE_QUERY_EXPANSION"
}
query: Dict[str, Union[str, Dict[str, str]]] = {"origin": "JOB_SEARCH_PAGE_QUERY_EXPANSION"}
if keywords:
query["keywords"] = "KEYWORD_PLACEHOLDER"
if location_name:
Expand Down Expand Up @@ -635,9 +617,7 @@ def search_jobs(

elements = data.get("included", [])
new_data = [
i
for i in elements
if i["$type"] == "com.linkedin.voyager.dash.jobs.JobPosting"
i for i in elements if i["$type"] == "com.linkedin.voyager.dash.jobs.JobPosting"
]
# break the loop if we're done searching or no results returned
if not new_data:
Expand Down Expand Up @@ -668,9 +648,7 @@ def get_profile_contact_info(
:return: Contact data
:rtype: dict
"""
res = self._fetch(
f"/identity/profiles/{public_id or urn_id}/profileContactInfo"
)
res = self._fetch(f"/identity/profiles/{public_id or urn_id}/profileContactInfo")
data = res.json()

contact_info = {
Expand All @@ -689,9 +667,9 @@ def get_profile_contact_info(
"com.linkedin.voyager.identity.profile.StandardWebsite"
]["category"]
elif "" in item["type"]:
item["label"] = item["type"][
"com.linkedin.voyager.identity.profile.CustomWebsite"
]["label"]
item["label"] = item["type"]["com.linkedin.voyager.identity.profile.CustomWebsite"][
"label"
]

del item["type"]

Expand All @@ -714,9 +692,7 @@ def get_profile_skills(
:rtype: list
"""
params = {"count": 100, "start": 0}
res = self._fetch(
f"/identity/profiles/{public_id or urn_id}/skills", params=params
)
res = self._fetch(f"/identity/profiles/{public_id or urn_id}/skills", params=params)
data = res.json()

skills = data.get("elements", [])
Expand All @@ -725,9 +701,7 @@ def get_profile_skills(

return skills

def get_profile(
self, public_id: Optional[str] = None, urn_id: Optional[str] = None
) -> Dict:
def get_profile(self, public_id: Optional[str] = None, urn_id: Optional[str] = None) -> Dict:
"""Fetch data for a given LinkedIn profile.
:param public_id: LinkedIn public ID for a profile
Expand Down Expand Up @@ -755,9 +729,9 @@ def get_profile(
"com.linkedin.common.VectorImage"
]["rootUrl"]

images_data = profile["miniProfile"]["picture"][
"com.linkedin.common.VectorImage"
]["artifacts"]
images_data = profile["miniProfile"]["picture"]["com.linkedin.common.VectorImage"][
"artifacts"
]
for img in images_data:
w, h, url_segment = itemgetter(
"width", "height", "fileIdentifyingUrlPathSegment"
Expand Down Expand Up @@ -874,12 +848,8 @@ def get_profile_experiences(self, urn_id: str) -> List:
:rtype: list
"""
profile_urn = f"urn:li:fsd_profile:{urn_id}"
variables = ",".join(
[f"profileUrn:{quote(profile_urn)}", "sectionType:experience"]
)
query_id = (
"voyagerIdentityDashProfileComponents.7af5d6f176f11583b382e37e5639e69e"
)
variables = ",".join([f"profileUrn:{quote(profile_urn)}", "sectionType:experience"])
query_id = "voyagerIdentityDashProfileComponents.7af5d6f176f11583b382e37e5639e69e"

res = self._fetch(
f"/graphql?variables=({variables})&queryId={query_id}&includeWebMetadata=true",
Expand Down Expand Up @@ -910,11 +880,7 @@ def parse_item(item, is_group_item=False):
duration_parts = duration_text.split(" · ")
date_parts = duration_parts[0].split(" - ")

duration = (
duration_parts[1]
if duration_parts and len(duration_parts) > 1
else None
)
duration = duration_parts[1] if duration_parts and len(duration_parts) > 1 else None
start_date = date_parts[0] if date_parts else None
end_date = date_parts[1] if date_parts and len(date_parts) > 1 else None

Expand All @@ -933,9 +899,7 @@ def parse_item(item, is_group_item=False):

# Extract additional description
description = (
fixed_list_text_component["text"]["text"]
if fixed_list_text_component
else None
fixed_list_text_component["text"]["text"] if fixed_list_text_component else None
)

# Create a dictionary with the extracted information
Expand All @@ -955,19 +919,14 @@ def parse_item(item, is_group_item=False):
def get_grouped_item_id(item):
sub_components = item["components"]["entityComponent"]["subComponents"]
sub_components_components = (
sub_components["components"][0]["components"]
if sub_components
else None
sub_components["components"][0]["components"] if sub_components else None
)
paged_list_component_id = (
sub_components_components.get("*pagedListComponent", "")
if sub_components_components
else None
)
if (
paged_list_component_id
and "fsd_profilePositionGroup" in paged_list_component_id
):
if paged_list_component_id and "fsd_profilePositionGroup" in paged_list_component_id:
pattern = r"urn:li:fsd_profilePositionGroup:\([A-z0-9]+,[A-z0-9]+\)"
match = re.search(pattern, paged_list_component_id)
return match.group(0) if match else None
Expand All @@ -984,16 +943,10 @@ def get_grouped_item_id(item):
# use the company and location from the main item
company = component["titleV2"]["text"]["text"]

location = (
component["caption"]["text"] if component["caption"] else None
)
location = component["caption"]["text"] if component["caption"] else None

# find the group
group = [
i
for i in data["included"]
if grouped_item_id in i.get("entityUrn", "")
]
group = [i for i in data["included"] if grouped_item_id in i.get("entityUrn", "")]
if not group:
continue
for group_item in group[0]["components"]["elements"]:
Expand Down Expand Up @@ -1062,9 +1015,7 @@ def get_company_updates(
max_results=max_results,
)

def get_profile_updates(
self, public_id=None, urn_id=None, max_results=None, results=None
):
def get_profile_updates(self, public_id=None, urn_id=None, max_results=None, results=None):
"""Fetch profile updates (newsfeed activity) for a given LinkedIn profile.
:param public_id: LinkedIn public ID for a profile
Expand Down Expand Up @@ -1196,9 +1147,7 @@ def follow_company(self, following_state_urn, following=True):
"""
payload = json.dumps({"patch": {"$set": {"following": following}}})

res = self._post(
f"/feed/dash/followingStates/{following_state_urn}", data=payload
)
res = self._post(f"/feed/dash/followingStates/{following_state_urn}", data=payload)

return res.status_code != 200

Expand Down Expand Up @@ -1326,9 +1275,7 @@ def mark_conversation_as_seen(self, conversation_urn_id: str):
"""
payload = json.dumps({"patch": {"$set": {"read": True}}})

res = self._post(
f"/messaging/conversations/{conversation_urn_id}", data=payload
)
res = self._post(f"/messaging/conversations/{conversation_urn_id}", data=payload)

return res.status_code != 200

Expand Down Expand Up @@ -1429,27 +1376,20 @@ def add_connection(self, profile_public_id: str, message="", profile_urn=None):
return False

if not profile_urn:
profile_urn_string = self.get_profile(public_id=profile_public_id)[
"profile_urn"
]
profile_urn_string = self.get_profile(public_id=profile_public_id)["profile_urn"]
# Returns string of the form 'urn:li:fs_miniProfile:ACoAACX1hoMBvWqTY21JGe0z91mnmjmLy9Wen4w'
# We extract the last part of the string
profile_urn = profile_urn_string.split(":")[-1]

trackingId = generate_trackingId()
if not profile_urn.startswith("urn:li:fsd_profile:"):
profile_urn = f"urn:li:fsd_profile:{profile_urn}"

payload = {
"trackingId": trackingId,
"message": message,
"invitations": [],
"excludeInvitations": [],
"invitee": {
"com.linkedin.voyager.growth.invitation.InviteeProfile": {
"profileId": profile_urn
}
},
"invitee": {"inviteeUnion": {"memberProfile": f"{profile_urn}"}},
}

res = self._post(
"/growth/normInvitations",
"voyagerRelationshipsDashMemberRelationships?action=verifyQuotaAndCreateV2&decorationId=com.linkedin.voyager.dash.deco.relationships.InvitationCreationResultWithInvitee-2",
data=json.dumps(payload),
headers={"accept": "application/vnd.linkedin.normalized+json+2.1"},
)
Expand Down Expand Up @@ -1629,9 +1569,7 @@ def _get_list_feed_posts_and_list_feed_urns(
l_raw_posts = res.json().get("included", {})
l_raw_urns = res.json().get("data", {}).get("*elements", [])

l_new_posts = parse_list_raw_posts(
l_raw_posts, self.client.LINKEDIN_BASE_URL
)
l_new_posts = parse_list_raw_posts(l_raw_posts, self.client.LINKEDIN_BASE_URL)
l_posts.extend(l_new_posts)

l_urns.extend(parse_list_raw_urns(l_raw_urns))
Expand Down

0 comments on commit 5e144f9

Please sign in to comment.