import json import os import re import sys import urllib.parse from typing import Any from urllib.request import urlopen, Request import requests import instaloader from pytube import YouTube def XitterDownload(source_url, target_location): script_dir = os.path.dirname(os.path.realpath(__file__)) request_details_file = f"{script_dir}{os.sep}request_details.json" request_details = json.load(open(request_details_file, "r")) # test features, variables = request_details["features"], request_details["variables"] def get_tokens(tweet_url): html = requests.get(tweet_url) assert ( html.status_code == 200 ), f"Failed to get tweet page. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {html.status_code}. Tweet url: {tweet_url}" mainjs_url = re.findall( r"https://abs.twimg.com/responsive-web/client-web-legacy/main.[^\.]+.js", html.text, ) assert ( mainjs_url is not None and len(mainjs_url) > 0 ), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" mainjs_url = mainjs_url[0] mainjs = requests.get(mainjs_url) assert ( mainjs.status_code == 200 ), f"Failed to get main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {mainjs.status_code}. Tweet url: {tweet_url}" bearer_token = re.findall(r'AAAAAAAAA[^"]+', mainjs.text) assert ( bearer_token is not None and len(bearer_token) > 0 ), f"Failed to find bearer token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" bearer_token = bearer_token[0] # get the guest token with requests.Session() as s: s.headers.update( { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0", "accept": "*/*", "accept-language": "de,en-US;q=0.7,en;q=0.3", "accept-encoding": "gzip, deflate, br", "te": "trailers", } ) s.headers.update({"authorization": f"Bearer {bearer_token}"}) # activate bearer token and get guest token guest_token = s.post("https://api.twitter.com/1.1/guest/activate.json").json()[ "guest_token" ] assert ( guest_token is not None ), f"Failed to find guest token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" return bearer_token, guest_token def get_details_url(tweet_id, features, variables): # create a copy of variables - we don't want to modify the original variables = {**variables} variables["tweetId"] = tweet_id return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}" def get_tweet_details(tweet_url, guest_token, bearer_token): tweet_id = re.findall(r"(?<=status/)\d+", tweet_url) assert ( tweet_id is not None and len(tweet_id) == 1 ), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" tweet_id = tweet_id[0] # the url needs a url encoded version of variables and features as a query string url = get_details_url(tweet_id, features, variables) details = requests.get( url, headers={ "authorization": f"Bearer {bearer_token}", "x-guest-token": guest_token, }, ) max_retries = 10 cur_retry = 0 while details.status_code == 400 and cur_retry < max_retries: try: error_json = json.loads(details.text) except json.JSONDecodeError: assert ( False ), f"Failed to parse json from details error. details text: {details.text} If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" assert ( "errors" in error_json ), f"Failed to find errors in details error json. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" needed_variable_pattern = re.compile(r"Variable '([^']+)'") needed_features_pattern = re.compile( r'The following features cannot be null: ([^"]+)' ) for error in error_json["errors"]: needed_vars = needed_variable_pattern.findall(error["message"]) for needed_var in needed_vars: variables[needed_var] = True needed_features = needed_features_pattern.findall(error["message"]) for nf in needed_features: for feature in nf.split(","): features[feature.strip()] = True url = get_details_url(tweet_id, features, variables) details = requests.get( url, headers={ "authorization": f"Bearer {bearer_token}", "x-guest-token": guest_token, }, ) cur_retry += 1 if details.status_code == 200: # save new variables request_details["variables"] = variables request_details["features"] = features with open(request_details_file, "w") as f: json.dump(request_details, f, indent=4) assert ( details.status_code == 200 ), f"Failed to get tweet details. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" return details def get_tweet_status_id(tweet_url): sid_patern = r"https://x\.com/[^/]+/status/(\d+)" if tweet_url[len(tweet_url) - 1] != "/": tweet_url = tweet_url + "/" match = re.findall(sid_patern, tweet_url) if len(match) == 0: print("error, could not get status id from this tweet url :", tweet_url) exit() status_id = match[0] return status_id def get_associated_media_id(j, tweet_url): sid = get_tweet_status_id(tweet_url) pattern = ( r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + sid + '/[^"]+",\s*"id_str"\s*:\s*"\d+",' ) matches = re.findall(pattern, j) if len(matches) > 0: target = matches[0] target = target[0: len(target) - 1] # remove the coma at the end return json.loads("{" + target + "}")["id_str"] return None def extract_mp4s(j, tweet_url, target_all_mp4s=False): # pattern looks like https://video.twimg.com/amplify_video/1638969830442237953/vid/1080x1920/lXSFa54mAVp7KHim.mp4?tag=16 or https://video.twimg.com/ext_tw_video/1451958820348080133/pu/vid/720x1280/GddnMJ7KszCQQFvA.mp4?tag=12 amplitude_pattern = re.compile( r"(https://video.twimg.com/amplify_video/(\d+)/vid/(\d+x\d+)/[^.]+.mp4\?tag=\d+)" ) ext_tw_pattern = re.compile( r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)" ) # format - https://video.twimg.com/tweet_video/Fvh6brqWAAQhU9p.mp4 tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+') # https://video.twimg.com/ext_tw_video/1451958820348080133/pu/pl/b-CiC-gZClIwXgDz.m3u8?tag=12&container=fmp4 container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4') media_id = get_associated_media_id(j, tweet_url) # find all the matches matches = amplitude_pattern.findall(j) matches += ext_tw_pattern.findall(j) container_matches = container_pattern.findall(j) tweet_video_matches = tweet_video_pattern.findall(j) if len(matches) == 0 and len(tweet_video_matches) > 0: return tweet_video_matches results = {} for match in matches: url, tweet_id, _, resolution = match if tweet_id not in results: results[tweet_id] = {"resolution": resolution, "url": url} else: # if we already have a higher resolution video, then don't overwrite it my_dims = [int(x) for x in resolution.split("x")] their_dims = [int(x) for x in results[tweet_id]["resolution"].split("x")] if my_dims[0] * my_dims[1] > their_dims[0] * their_dims[1]: results[tweet_id] = {"resolution": resolution, "url": url} if media_id: all_urls = [] for twid in results: all_urls.append(results[twid]["url"]) all_urls += container_matches url_with_media_id = [] for url in all_urls: if url.__contains__(media_id): url_with_media_id.append(url) if len(url_with_media_id) > 0: return url_with_media_id if len(container_matches) > 0 and not target_all_mp4s: return container_matches if target_all_mp4s: urls = [x["url"] for x in results.values()] urls += container_matches return urls return [x["url"] for x in results.values()] def download_parts(url, output_filename): resp = requests.get(url, stream=True) # container begins with / ends with fmp4 and has a resolution in it we want to capture pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)") matches = pattern.findall(resp.text) max_res = 0 max_res_url = None for match in matches: url, resolution = match width, height = resolution.split("x") res = int(width) * int(height) if res > max_res: max_res = res max_res_url = url assert ( max_res_url is not None ), f"Could not find a url to download from. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {url}" video_part_prefix = "https://video.twimg.com" resp = requests.get(video_part_prefix + max_res_url, stream=True) mp4_pattern = re.compile(r"(/[^\n]*\.mp4)") mp4_parts = mp4_pattern.findall(resp.text) assert ( len(mp4_parts) == 1 ), f"There should be exactly 1 mp4 container at this point. Instead, found {len(mp4_parts)}. Please open a GitHub issue and copy and paste this message into it. Tweet url: {url}" mp4_url = video_part_prefix + mp4_parts[0] m4s_part_pattern = re.compile(r"(/[^\n]*\.m4s)") m4s_parts = m4s_part_pattern.findall(resp.text) with open(output_filename, "wb") as f: r = requests.get(mp4_url, stream=True) for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() for part in m4s_parts: part_url = video_part_prefix + part r = requests.get(part_url, stream=True) for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() return True def repost_check(j, exclude_replies=True): try: # This line extract the index of the first reply reply_index = j.index('"conversationthread-') except ValueError: # If there are no replies we use the enrire response data length reply_index = len(j) # We truncate the response data to exclude replies if exclude_replies: j = j[0:reply_index] # We use this regular expression to extract the source status source_status_pattern = r'"source_status_id_str"\s*:\s*"\d+"' matches = re.findall(source_status_pattern, j) if len(matches) > 0 and exclude_replies: # We extract the source status id (ssid) ssid = json.loads("{" + matches[0] + "}")["source_status_id_str"] # We plug it in this regular expression to find expanded_url (the original tweet url) expanded_url_pattern = ( r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"' ) matches2 = re.findall(expanded_url_pattern, j) if len(matches2) > 0: # We extract the url and return it status_url = json.loads("{" + matches2[0] + "}")["expanded_url"] return status_url if not exclude_replies: # If we include replies we'll have to get all ssids and remove duplicates ssids = [] for match in matches: ssids.append(json.loads("{" + match + "}")["source_status_id_str"]) # we remove duplicates (this line is messy but it's the easiest way to do it) ssids = list(set(ssids)) if len(ssids) > 0: for ssid in ssids: expanded_url_pattern = ( r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"' ) matches2 = re.findall(expanded_url_pattern, j) if len(matches2) > 0: status_urls = [] for match in matches2: status_urls.append( json.loads("{" + match + "}")["expanded_url"] ) # We remove duplicates another time status_urls = list(set(status_urls)) return status_urls # If we don't find source_status_id_str, the tweet doesn't feature a reposted video return None def download_video_from_x(tweet_url, output_file, target_all_videos=False): bearer_token, guest_token = get_tokens(tweet_url) resp = get_tweet_details(tweet_url, guest_token, bearer_token) mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos) if target_all_videos: video_counter = 1 original_urls = repost_check(resp.text, exclude_replies=False) if len(original_urls) > 0: for url in original_urls: download_video_from_x( url, output_file.replace(".mp4", f"_{video_counter}.mp4") ) video_counter += 1 if len(mp4s) > 0: for mp4 in mp4s: output_file = output_file.replace(".mp4", f"_{video_counter}.mp4") if "container" in mp4: download_parts(mp4, output_file) else: r = requests.get(mp4, stream=True) with open(output_file, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() video_counter += 1 else: original_url = repost_check(resp.text) if original_url: download_video_from_x(original_url, output_file) else: assert ( len(mp4s) > 0 ), f"Could not find any mp4s to download. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" mp4 = mp4s[0] if "container" in mp4: download_parts(mp4, output_file) else: # use a stream to download the file r = requests.get(mp4, stream=True) with open(output_file, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() return target_location return download_video_from_x(source_url, target_location) # TIKTOK/INSTA def getDict() -> dict: response = requests.get('https://ttdownloader.com/') point = response.text.find(' str: response = requests.post('https://ttdownloader.com/search/', cookies=cookies, headers=headers, data=data) parsed_link = [i for i in str(response.text).split() if i.startswith("href=")][0] response = requests.get(parsed_link[6:-10]) with open(path + "\\" + name + ".mp4", "wb") as f: f.write(response.content) return path + "\\" + name + ".mp4" def TiktokDownloadAll(linkList, path) -> str: parseDict = getDict() cookies, headers, data = createHeader(parseDict) # linkList = getLinkDict()['tiktok'] for i in linkList: try: data['url'] = i result = TikTokDownload(cookies, headers, data, "tiktok", path) # str(linkList.index(i)) return result except IndexError: parseDict = getDict() cookies, headers, data = createHeader(parseDict) except Exception as err: print(err) exit(1) def InstagramDownload(url, name, path) -> str: obj = instaloader.Instaloader() post = instaloader.Post.from_shortcode(obj.context, url.split("/")[-2]) photo_url = post.url video_url = post.video_url print(video_url) if video_url: response = requests.get(video_url) with open(path + "\\" + name + ".mp4", "wb") as f: f.write(response.content) return path + "\\" + name + ".mp4" elif photo_url: response = requests.get(photo_url) with open(path + "\\" + name + ".jpg", "wb") as f: f.write(response.content) return path + "\\" + name + ".jpg" def InstagramDownloadAll(linklist, path) -> str: for i in linklist: try: print(str(linklist.index(i))) print(str(linklist[i])) result = InstagramDownload(i, str(linklist.index(i)), path) return result except Exception as err: print(err) exit(1) # YOUTUBE def YouTubeDownload(link, path, audio_only=True): youtubeObject = YouTube(link) if audio_only: youtubeObject = youtubeObject.streams.get_audio_only() youtubeObject.download(path, "yt.mp3") print("Download is completed successfully") return path + "yt.mp3" else: youtubeObject = youtubeObject.streams.get_highest_resolution() youtubeObject.download(path, "yt.mp4") print("Download is completed successfully") return path + "yt.mp4" def checkYoutubeLinkValid(link): try: # TODO find a way to test without fully downloading the file youtubeObject = YouTube(link) youtubeObject = youtubeObject.streams.get_audio_only() youtubeObject.download(".", "yt.mp3") os.remove("yt.mp3") return True except Exception as e: print(str(e)) return False # OVERCAST def OvercastDownload(source_url, target_location): def get_title(html_str): """Get the title from the meta tags""" title = re.findall(r".... tag""" url = re.findall(r"