mirror of
https://github.com/believethehype/nostrdvm.git
synced 2025-03-19 14:21:46 +01:00
fixing media converter tools
This commit is contained in:
parent
39821ac42e
commit
3043c188f5
@ -4,7 +4,7 @@ from nostr_dvm.interfaces.dvmtaskinterface import DVMTaskInterface
|
||||
from nostr_dvm.utils.admin_utils import AdminConfig
|
||||
from nostr_dvm.utils.definitions import EventDefinitions
|
||||
from nostr_dvm.utils.dvmconfig import DVMConfig, build_default_config
|
||||
from nostr_dvm.utils.nip89_utils import NIP89Config, check_and_set_d_tag
|
||||
from nostr_dvm.utils.nip89_utils import NIP89Config
|
||||
from nostr_dvm.utils.mediasource_utils import organize_input_media_data
|
||||
from nostr_dvm.utils.output_utils import upload_media_to_hoster
|
||||
|
||||
|
@ -20,7 +20,7 @@ def input_data_file_duration(event, dvm_config, client, start=0, end=0):
|
||||
input_type = tag.as_vec()[2]
|
||||
|
||||
if input_type == "text":
|
||||
# For now, ingore length of any text, just return 1.
|
||||
# For now, ignore length of any text, just return 1.
|
||||
return 1
|
||||
|
||||
if input_type == "event": # NIP94 event
|
||||
|
@ -18,7 +18,15 @@ def XitterDownload(source_url, target_location):
|
||||
features, variables = request_details["features"], request_details["variables"]
|
||||
|
||||
def get_tokens(tweet_url):
|
||||
html = requests.get(tweet_url)
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0",
|
||||
"Accept": "*/*",
|
||||
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"TE": "trailers",
|
||||
}
|
||||
|
||||
html = requests.get(tweet_url, headers=headers)
|
||||
|
||||
assert (
|
||||
html.status_code == 200
|
||||
@ -34,7 +42,6 @@ def XitterDownload(source_url, target_location):
|
||||
), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
|
||||
|
||||
mainjs_url = mainjs_url[0]
|
||||
|
||||
mainjs = requests.get(mainjs_url)
|
||||
|
||||
assert (
|
||||
@ -80,9 +87,11 @@ def XitterDownload(source_url, target_location):
|
||||
variables["tweetId"] = tweet_id
|
||||
|
||||
return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}"
|
||||
# return f"https://api.twitter.com/graphql/ncDeACNGIApPMaqGVuF_rw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}"
|
||||
|
||||
def get_tweet_details(tweet_url, guest_token, bearer_token):
|
||||
tweet_id = re.findall(r"(?<=status/)\d+", tweet_url)
|
||||
|
||||
assert (
|
||||
tweet_id is not None and len(tweet_id) == 1
|
||||
), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
|
||||
@ -172,7 +181,7 @@ def XitterDownload(source_url, target_location):
|
||||
pattern = (
|
||||
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/'
|
||||
+ sid
|
||||
+ '/[^"]+",\s*"id_str"\s*:\s*"\d+",'
|
||||
+ r'/[^"]+",\s*"id_str"\s*:\s*"\d+",'
|
||||
)
|
||||
matches = re.findall(pattern, j)
|
||||
if len(matches) > 0:
|
||||
@ -240,17 +249,49 @@ def XitterDownload(source_url, target_location):
|
||||
urls = [x["url"] for x in results.values()]
|
||||
urls += container_matches
|
||||
return urls
|
||||
|
||||
return [x["url"] for x in results.values()]
|
||||
|
||||
def extract_mp4_fmp4(j):
|
||||
"""
|
||||
Extract the URL of the MP4 video from the detailed information of the tweet.
|
||||
Returns a list of URLs, tweet IDs, and resolution information (dictionary type)
|
||||
and a list of tweet IDs as return values.
|
||||
"""
|
||||
|
||||
# Empty list to store tweet IDs
|
||||
tweet_id_list = []
|
||||
mp4_info_dict_list = []
|
||||
amplitude_pattern = re.compile(
|
||||
r"(https://video.twimg.com/amplify_video/(\d+)/vid/(avc1/)(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
|
||||
)
|
||||
ext_tw_pattern = re.compile(
|
||||
r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
|
||||
)
|
||||
tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+')
|
||||
container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4')
|
||||
|
||||
matches = amplitude_pattern.findall(j)
|
||||
matches += ext_tw_pattern.findall(j)
|
||||
container_matches = container_pattern.findall(j)
|
||||
tweet_video_url_list = tweet_video_pattern.findall(j)
|
||||
|
||||
for match in matches:
|
||||
url, tweet_id, _, resolution = match
|
||||
tweet_id_list.append(int(tweet_id))
|
||||
mp4_info_dict_list.append({"resolution": resolution, "url": url})
|
||||
|
||||
tweet_id_list = list(dict.fromkeys(tweet_id_list))
|
||||
|
||||
if len(container_matches) > 0:
|
||||
for url in container_matches:
|
||||
mp4_info_dict_list.append({"url": url})
|
||||
|
||||
return tweet_id_list, mp4_info_dict_list, tweet_video_url_list
|
||||
|
||||
def download_parts(url, output_filename):
|
||||
resp = requests.get(url, stream=True)
|
||||
|
||||
# container begins with / ends with fmp4 and has a resolution in it we want to capture
|
||||
pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)")
|
||||
|
||||
matches = pattern.findall(resp.text)
|
||||
|
||||
max_res = 0
|
||||
max_res_url = None
|
||||
|
||||
@ -301,12 +342,9 @@ def XitterDownload(source_url, target_location):
|
||||
|
||||
def repost_check(j, exclude_replies=True):
|
||||
try:
|
||||
# This line extract the index of the first reply
|
||||
reply_index = j.index('"conversationthread-')
|
||||
except ValueError:
|
||||
# If there are no replies we use the enrire response data length
|
||||
reply_index = len(j)
|
||||
# We truncate the response data to exclude replies
|
||||
if exclude_replies:
|
||||
j = j[0:reply_index]
|
||||
|
||||
@ -360,6 +398,7 @@ def XitterDownload(source_url, target_location):
|
||||
bearer_token, guest_token = get_tokens(tweet_url)
|
||||
resp = get_tweet_details(tweet_url, guest_token, bearer_token)
|
||||
mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos)
|
||||
|
||||
if target_all_videos:
|
||||
video_counter = 1
|
||||
original_urls = repost_check(resp.text, exclude_replies=False)
|
||||
@ -377,6 +416,7 @@ def XitterDownload(source_url, target_location):
|
||||
download_parts(mp4, output_file)
|
||||
|
||||
else:
|
||||
# use a stream to download the file
|
||||
r = requests.get(mp4, stream=True)
|
||||
with open(output_file, "wb") as f:
|
||||
for chunk in r.iter_content(chunk_size=1024):
|
||||
@ -475,7 +515,7 @@ def TiktokDownloadAll(linkList, path) -> str:
|
||||
for i in linkList:
|
||||
try:
|
||||
data['url'] = i
|
||||
result = TikTokDownload(cookies, headers, data, "tiktok", path) # str(linkList.index(i))
|
||||
result = TikTokDownload(cookies, headers, data, "tiktok", path) # str(linkList.index(i))
|
||||
return result
|
||||
except IndexError:
|
||||
parseDict = getDict()
|
||||
|
Loading…
x
Reference in New Issue
Block a user