mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2025-11-11 04:26:45 +01:00
Add streaming hash utility function to quartz multiplatform, follow the existing pool/worker design
Change hashing in ImageDownloader.kt to use streaming
This commit is contained in:
@@ -20,6 +20,9 @@
|
|||||||
*/
|
*/
|
||||||
package com.vitorpamplona.amethyst.service.uploads
|
package com.vitorpamplona.amethyst.service.uploads
|
||||||
|
|
||||||
|
import com.vitorpamplona.quartz.nip01Core.core.HexKey
|
||||||
|
import com.vitorpamplona.quartz.nip01Core.core.toHexKey
|
||||||
|
import com.vitorpamplona.quartz.utils.sha256.sha256Stream
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
@@ -34,6 +37,46 @@ class ImageDownloader {
|
|||||||
val contentType: String?,
|
val contentType: String?,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result of streaming verification - hash and metadata without storing full file
|
||||||
|
*/
|
||||||
|
class StreamVerification(
|
||||||
|
val hash: HexKey,
|
||||||
|
val size: Long,
|
||||||
|
val contentType: String?,
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream download and calculate hash for verification without loading entire file into memory.
|
||||||
|
* This is memory-efficient for large files (videos, high-res images, etc.)
|
||||||
|
*/
|
||||||
|
suspend fun waitAndVerifyStream(
|
||||||
|
imageUrl: String,
|
||||||
|
okHttpClient: (url: String) -> OkHttpClient,
|
||||||
|
): StreamVerification? =
|
||||||
|
withContext(Dispatchers.IO) {
|
||||||
|
var verification: StreamVerification? = null
|
||||||
|
var tentatives = 0
|
||||||
|
|
||||||
|
// Servers are usually not ready, so tries to download it for 15 times/seconds.
|
||||||
|
while (verification == null && tentatives < 15) {
|
||||||
|
verification =
|
||||||
|
try {
|
||||||
|
tryStreamAndVerify(imageUrl, okHttpClient)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
if (e is CancellationException) throw e
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
if (verification == null) {
|
||||||
|
tentatives++
|
||||||
|
delay(1000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return@withContext verification
|
||||||
|
}
|
||||||
|
|
||||||
suspend fun waitAndGetImage(
|
suspend fun waitAndGetImage(
|
||||||
imageUrl: String,
|
imageUrl: String,
|
||||||
okHttpClient: (url: String) -> OkHttpClient,
|
okHttpClient: (url: String) -> OkHttpClient,
|
||||||
@@ -61,6 +104,78 @@ class ImageDownloader {
|
|||||||
return@withContext imageData
|
return@withContext imageData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private suspend fun tryStreamAndVerify(
|
||||||
|
imageUrl: String,
|
||||||
|
okHttpClient: (url: String) -> OkHttpClient,
|
||||||
|
): StreamVerification? =
|
||||||
|
withContext(Dispatchers.IO) {
|
||||||
|
// TODO: Migrate to OkHttp
|
||||||
|
HttpURLConnection.setFollowRedirects(true)
|
||||||
|
var url = URL(imageUrl)
|
||||||
|
var clientProxy = okHttpClient(imageUrl).proxy
|
||||||
|
var huc =
|
||||||
|
if (clientProxy != null) {
|
||||||
|
url.openConnection(clientProxy) as HttpURLConnection
|
||||||
|
} else {
|
||||||
|
url.openConnection() as HttpURLConnection
|
||||||
|
}
|
||||||
|
huc.instanceFollowRedirects = true
|
||||||
|
var responseCode = huc.responseCode
|
||||||
|
|
||||||
|
if (responseCode in 300..400) {
|
||||||
|
val newUrl: String = huc.getHeaderField("Location")
|
||||||
|
|
||||||
|
// open the new connection again
|
||||||
|
url = URL(newUrl)
|
||||||
|
clientProxy = okHttpClient(newUrl).proxy
|
||||||
|
huc =
|
||||||
|
if (clientProxy != null) {
|
||||||
|
url.openConnection(clientProxy) as HttpURLConnection
|
||||||
|
} else {
|
||||||
|
url.openConnection() as HttpURLConnection
|
||||||
|
}
|
||||||
|
responseCode = huc.responseCode
|
||||||
|
}
|
||||||
|
|
||||||
|
return@withContext if (responseCode in 200..300) {
|
||||||
|
var totalBytes = 0L
|
||||||
|
|
||||||
|
// Wrap the input stream to count bytes while hashing
|
||||||
|
val countingStream =
|
||||||
|
object : java.io.InputStream() {
|
||||||
|
val inner = huc.inputStream
|
||||||
|
|
||||||
|
override fun read(): Int {
|
||||||
|
val byte = inner.read()
|
||||||
|
if (byte != -1) totalBytes++
|
||||||
|
return byte
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun read(
|
||||||
|
b: ByteArray,
|
||||||
|
off: Int,
|
||||||
|
len: Int,
|
||||||
|
): Int {
|
||||||
|
val bytesRead = inner.read(b, off, len)
|
||||||
|
if (bytesRead > 0) totalBytes += bytesRead
|
||||||
|
return bytesRead
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() = inner.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
val hash = countingStream.use { sha256Stream(it).toHexKey() }
|
||||||
|
|
||||||
|
StreamVerification(
|
||||||
|
hash = hash,
|
||||||
|
size = totalBytes,
|
||||||
|
contentType = huc.headerFields.get("Content-Type")?.firstOrNull(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun tryGetTheImage(
|
private suspend fun tryGetTheImage(
|
||||||
imageUrl: String,
|
imageUrl: String,
|
||||||
okHttpClient: (url: String) -> OkHttpClient,
|
okHttpClient: (url: String) -> OkHttpClient,
|
||||||
|
|||||||
@@ -231,23 +231,26 @@ class UploadOrchestrator {
|
|||||||
|
|
||||||
updateState(0.6, UploadingState.Downloading)
|
updateState(0.6, UploadingState.Downloading)
|
||||||
|
|
||||||
val imageData: ImageDownloader.Blob? = ImageDownloader().waitAndGetImage(uploadResult.url, okHttpClient)
|
// Use streaming verification for memory efficiency with large files
|
||||||
|
val verification: ImageDownloader.StreamVerification? = ImageDownloader().waitAndVerifyStream(uploadResult.url, okHttpClient)
|
||||||
|
|
||||||
if (imageData != null) {
|
if (verification != null) {
|
||||||
updateState(0.8, UploadingState.Hashing)
|
updateState(0.8, UploadingState.Hashing)
|
||||||
|
|
||||||
val result =
|
// Create FileHeader with hash from streaming verification
|
||||||
FileHeader.prepare(
|
// Note: We skip blurhash/dimensions since we already have them from upload
|
||||||
imageData.bytes,
|
val fileHeader =
|
||||||
uploadResult.type ?: localContentType ?: imageData.contentType,
|
FileHeader(
|
||||||
uploadResult.dimension,
|
mimeType = uploadResult.type ?: localContentType ?: verification.contentType,
|
||||||
|
hash = verification.hash,
|
||||||
|
size = verification.size.toInt(),
|
||||||
|
dim = uploadResult.dimension,
|
||||||
|
blurHash = null, // Skip blurhash generation for verification
|
||||||
)
|
)
|
||||||
|
|
||||||
result.fold(
|
|
||||||
onSuccess = {
|
|
||||||
return finish(
|
return finish(
|
||||||
OrchestratorResult.ServerResult(
|
OrchestratorResult.ServerResult(
|
||||||
it,
|
fileHeader,
|
||||||
uploadResult.url,
|
uploadResult.url,
|
||||||
uploadResult.magnet,
|
uploadResult.magnet,
|
||||||
uploadResult.sha256,
|
uploadResult.sha256,
|
||||||
@@ -255,11 +258,6 @@ class UploadOrchestrator {
|
|||||||
originalHash,
|
originalHash,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
},
|
|
||||||
onFailure = {
|
|
||||||
return error(R.string.could_not_prepare_local_file_to_upload, it.message ?: it.javaClass.simpleName)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
return error(R.string.could_not_download_from_the_server)
|
return error(R.string.could_not_download_from_the_server)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ import com.vitorpamplona.quartz.nip01Core.core.toHexKey
|
|||||||
import com.vitorpamplona.quartz.nipB7Blossom.BlossomAuthorizationEvent
|
import com.vitorpamplona.quartz.nipB7Blossom.BlossomAuthorizationEvent
|
||||||
import com.vitorpamplona.quartz.nipB7Blossom.BlossomUploadResult
|
import com.vitorpamplona.quartz.nipB7Blossom.BlossomUploadResult
|
||||||
import com.vitorpamplona.quartz.utils.RandomInstance
|
import com.vitorpamplona.quartz.utils.RandomInstance
|
||||||
|
import com.vitorpamplona.quartz.utils.sha256.sha256Stream
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import okhttp3.MediaType.Companion.toMediaType
|
import okhttp3.MediaType.Companion.toMediaType
|
||||||
@@ -47,7 +48,6 @@ import okio.BufferedSink
|
|||||||
import okio.source
|
import okio.source
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.security.MessageDigest
|
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
|
|
||||||
class BlossomUploader {
|
class BlossomUploader {
|
||||||
@@ -61,17 +61,29 @@ class BlossomUploader {
|
|||||||
* to avoid loading the entire file into memory.
|
* to avoid loading the entire file into memory.
|
||||||
*/
|
*/
|
||||||
private fun calculateHashAndSize(inputStream: InputStream): StreamInfo {
|
private fun calculateHashAndSize(inputStream: InputStream): StreamInfo {
|
||||||
val digest = MessageDigest.getInstance("SHA-256")
|
|
||||||
val buffer = ByteArray(8192) // 8KB buffer
|
|
||||||
var bytesRead: Int
|
|
||||||
var totalBytes = 0L
|
var totalBytes = 0L
|
||||||
|
|
||||||
while (inputStream.read(buffer).also { bytesRead = it } != -1) {
|
// Wrap the input stream to count bytes while hashing
|
||||||
digest.update(buffer, 0, bytesRead)
|
val countingStream =
|
||||||
totalBytes += bytesRead
|
object : InputStream() {
|
||||||
|
override fun read(): Int {
|
||||||
|
val byte = inputStream.read()
|
||||||
|
if (byte != -1) totalBytes++
|
||||||
|
return byte
|
||||||
}
|
}
|
||||||
|
|
||||||
val hash = digest.digest().toHexKey()
|
override fun read(
|
||||||
|
b: ByteArray,
|
||||||
|
off: Int,
|
||||||
|
len: Int,
|
||||||
|
): Int {
|
||||||
|
val bytesRead = inputStream.read(b, off, len)
|
||||||
|
if (bytesRead > 0) totalBytes += bytesRead
|
||||||
|
return bytesRead
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val hash = sha256Stream(countingStream).toHexKey()
|
||||||
return StreamInfo(hash, totalBytes)
|
return StreamInfo(hash, totalBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,22 @@
|
|||||||
*/
|
*/
|
||||||
package com.vitorpamplona.quartz.utils.sha256
|
package com.vitorpamplona.quartz.utils.sha256
|
||||||
|
|
||||||
|
import java.io.InputStream
|
||||||
|
|
||||||
val pool = Sha256Pool(5) // max parallel operations
|
val pool = Sha256Pool(5) // max parallel operations
|
||||||
|
|
||||||
actual fun sha256(data: ByteArray) = pool.hash(data)
|
actual fun sha256(data: ByteArray) = pool.hash(data)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate SHA256 hash by streaming the input in chunks.
|
||||||
|
* This avoids loading the entire input into memory at once.
|
||||||
|
* Useful for hashing large files without running out of memory.
|
||||||
|
*
|
||||||
|
* @param inputStream The input stream to hash
|
||||||
|
* @param bufferSize Size of chunks to read (default 8KB)
|
||||||
|
* @return SHA256 hash bytes
|
||||||
|
*/
|
||||||
|
fun sha256Stream(
|
||||||
|
inputStream: InputStream,
|
||||||
|
bufferSize: Int = 8192,
|
||||||
|
) = pool.hashStream(inputStream, bufferSize)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
*/
|
*/
|
||||||
package com.vitorpamplona.quartz.utils.sha256
|
package com.vitorpamplona.quartz.utils.sha256
|
||||||
|
|
||||||
|
import java.io.InputStream
|
||||||
import java.security.MessageDigest
|
import java.security.MessageDigest
|
||||||
|
|
||||||
class Sha256Hasher {
|
class Sha256Hasher {
|
||||||
@@ -30,4 +31,26 @@ class Sha256Hasher {
|
|||||||
fun digest(byteArray: ByteArray) = digest.digest(byteArray)
|
fun digest(byteArray: ByteArray) = digest.digest(byteArray)
|
||||||
|
|
||||||
fun reset() = digest.reset()
|
fun reset() = digest.reset()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate SHA256 hash by streaming the input in chunks.
|
||||||
|
* This avoids loading the entire input into memory at once.
|
||||||
|
*
|
||||||
|
* @param inputStream The input stream to hash
|
||||||
|
* @param bufferSize Size of chunks to read (default 8KB)
|
||||||
|
* @return SHA256 hash bytes
|
||||||
|
*/
|
||||||
|
fun hashStream(
|
||||||
|
inputStream: InputStream,
|
||||||
|
bufferSize: Int = 8192,
|
||||||
|
): ByteArray {
|
||||||
|
val buffer = ByteArray(bufferSize)
|
||||||
|
var bytesRead: Int
|
||||||
|
|
||||||
|
while (inputStream.read(buffer).also { bytesRead = it } != -1) {
|
||||||
|
digest.update(buffer, 0, bytesRead)
|
||||||
|
}
|
||||||
|
|
||||||
|
return digest.digest().also { digest.reset() }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@
|
|||||||
package com.vitorpamplona.quartz.utils.sha256
|
package com.vitorpamplona.quartz.utils.sha256
|
||||||
|
|
||||||
import com.vitorpamplona.quartz.utils.Log
|
import com.vitorpamplona.quartz.utils.Log
|
||||||
|
import java.io.InputStream
|
||||||
import java.util.concurrent.ArrayBlockingQueue
|
import java.util.concurrent.ArrayBlockingQueue
|
||||||
|
|
||||||
class Sha256Pool(
|
class Sha256Pool(
|
||||||
@@ -54,4 +55,24 @@ class Sha256Pool(
|
|||||||
release(hasher)
|
release(hasher)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate SHA256 hash by streaming the input in chunks.
|
||||||
|
* This avoids loading the entire input into memory at once.
|
||||||
|
*
|
||||||
|
* @param inputStream The input stream to hash
|
||||||
|
* @param bufferSize Size of chunks to read (default 8KB)
|
||||||
|
* @return SHA256 hash bytes
|
||||||
|
*/
|
||||||
|
fun hashStream(
|
||||||
|
inputStream: InputStream,
|
||||||
|
bufferSize: Int = 8192,
|
||||||
|
): ByteArray {
|
||||||
|
val hasher = acquire()
|
||||||
|
try {
|
||||||
|
return hasher.hashStream(inputStream, bufferSize)
|
||||||
|
} finally {
|
||||||
|
release(hasher)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user