Index: Src/GoogleApis.Core/Apis/Requests/RequestBuilder.cs
===================================================================
--- a/Src/GoogleApis.Core/Apis/Requests/RequestBuilder.cs
+++ b/Src/GoogleApis.Core/Apis/Requests/RequestBuilder.cs
@@ -80,6 +80,7 @@
}
/// Construct a new request builder.
+ /// TODO(peleyal): Consider using the Factory pattern here.
public RequestBuilder()
{
PathParameters = new Dictionary>();
Index: Src/GoogleApis.Tests/Apis/Upload/ResumableUploadTest.cs
===================================================================
--- a/Src/GoogleApis.Tests/Apis/Upload/ResumableUploadTest.cs
+++ b/Src/GoogleApis.Tests/Apis/Upload/ResumableUploadTest.cs
@@ -117,7 +117,7 @@
///
public object ExpectedResponse { get; set; }
- /// Gets or sets the Serializer which is used to serialize and deserialize objects.
+ /// Gets or sets the serializer which is used to serialize and deserialize objects.
public ISerializer Serializer { get; set; }
protected override async Task SendAsyncCore(HttpRequestMessage request,
@@ -310,10 +310,14 @@
/// The request index we are going to cancel.
public int CancelRequestNum { get; set; }
- // on the 4th request - server returns error (if supportedError isn't none)
- // on the 5th request - server returns 308 with "Range" header is "bytes 0-299" (depends on supportedError)
+ // On the 4th request - server returns error (if supportedError isn't none)
+ // On the 5th request - server returns 308 with "Range" header is "bytes 0-299" (depends on supportedError)
internal const int ErrorOnCall = 4;
+ // When we resuming an upload, there should be 3 more calls after the failures.
+ // Uploading 3 more chunks: 200-299, 300-399, 400-453.
+ internal const int CallAfterResume = 3;
+
///
/// Gets or sets the number of bytes the server reads when error occurred. The default value is 0 ,
/// meaning that on server error it won't read any bytes from the stream.
@@ -331,6 +335,12 @@
private string uploadSize;
+ /// Gets or sets the call number after resuming.
+ public int ResumeFromCall { get; set; }
+
+ /// Get or sets indication if the first call after resuming should fail or not.
+ public bool ErrorOnResume { get; set; }
+
public MultipleChunksMessageHandler(bool knownSize, ServerError supportedError, int len, int chunkSize,
bool alwaysFailFromFirstError = false)
{
@@ -417,17 +427,21 @@
ReceivedData.Write(bytes, 0, read);
bytesRecieved += read;
}
- else if (Calls >= ErrorOnCall && alwaysFailFromFirstError)
+ else if ((Calls >= ErrorOnCall && alwaysFailFromFirstError && ResumeFromCall == 0) ||
+ (Calls == ResumeFromCall && ErrorOnResume))
{
if (supportedError == ServerError.Exception)
{
throw new Exception("ERROR");
}
+
Assert.That(request.Content.Headers.GetValues("Content-Range").First(), Is.EqualTo(
string.Format(@"bytes */{0}", uploadSize)));
response.StatusCode = HttpStatusCode.ServiceUnavailable;
}
- else if (Calls == ErrorOnCall + 1 && supportedError != ServerError.None)
+ else if ((Calls == ErrorOnCall + 1 && supportedError != ServerError.None) ||
+ (Calls == ResumeFromCall && !ErrorOnResume) ||
+ (Calls == ResumeFromCall + 1 && ErrorOnResume))
{
Assert.That(request.Content.Headers.GetValues("Content-Range").First(), Is.EqualTo(
string.Format(@"bytes */{0}", uploadSize)));
@@ -463,15 +477,11 @@
private class MockResumableUpload : ResumableUpload
{
public MockResumableUpload(IClientService service, Stream stream, string contentType)
- : this(service, "path", "PUT", stream, contentType)
- {
- }
+ : this(service, "path", "PUT", stream, contentType) { }
public MockResumableUpload(IClientService service, string path, string method, Stream stream,
string contentType)
- : base(service, path, method, stream, contentType)
- {
- }
+ : base(service, path, method, stream, contentType) { }
}
///
@@ -483,9 +493,7 @@
{
public MockResumableUploadWithResponse(IClientService service,
Stream stream, string contentType)
- : base(service, "path", "POST", stream, contentType)
- {
- }
+ : base(service, "path", "POST", stream, contentType) { }
}
/// A resumable upload class which contains query and path parameters.
@@ -566,9 +574,9 @@
{
var stream = new MemoryStream(Encoding.UTF8.GetBytes(UploadTestData));
var handler = new SingleChunkMessageHandler()
- {
- StreamLength = stream.Length
- };
+ {
+ StreamLength = stream.Length
+ };
using (var service = new MockClientService(new BaseClientService.Initializer()
{
HttpClientFactory = new MockHttpClientFactory(handler)
@@ -590,9 +598,9 @@
{
var stream = new MemoryStream(Encoding.UTF8.GetBytes(UploadTestData));
var handler = new SingleChunkMessageHandler()
- {
- StreamLength = stream.Length
- };
+ {
+ StreamLength = stream.Length
+ };
using (var service = new MockClientService(new BaseClientService.Initializer()
{
HttpClientFactory = new MockHttpClientFactory(handler)
@@ -728,9 +736,7 @@
SubtestTestChunkUpload(true, 4, ServerError.NotFound);
}
- ///
- /// Tests a single upload request
- ///
+ /// Tests a single upload request.
/// Defines if the stream size is known
/// How many HTTP calls should be made to the server
/// Defines the type of error this test tests. The default value is none
@@ -824,13 +830,15 @@
}
/// Test helper to test a fail uploading by with the given server error.
- private void SubtestChunkUploadFail(ServerError error)
+ /// The error kind.
+ /// Whether we should resume uploading the stream after the failure.
+ /// Whether the first call after resuming should fail.
+ private void SubtestChunkUploadFail(ServerError error, bool resume = false, bool errorOnResume = false)
{
int chunkSize = 100;
var payload = Encoding.UTF8.GetBytes(UploadTestData);
- var handler = new MultipleChunksMessageHandler(true, error, payload.Length,
- chunkSize, true);
+ var handler = new MultipleChunksMessageHandler(true, error, payload.Length, chunkSize, true);
using (var service = new MockClientService(new BaseClientService.Initializer()
{
HttpClientFactory = new MockHttpClientFactory(handler)
@@ -847,12 +855,31 @@
};
upload.Upload();
+ // Upload should fail.
var exepctedCalls = MultipleChunksMessageHandler.ErrorOnCall +
service.HttpClient.MessageHandler.NumTries - 1;
Assert.That(handler.Calls, Is.EqualTo(exepctedCalls));
Assert.NotNull(lastProgressStatus);
Assert.NotNull(lastProgressStatus.Exception);
Assert.That(lastProgressStatus.Status, Is.EqualTo(UploadStatus.Failed));
+
+ if (resume)
+ {
+ // Hack the handler, so when calling the resume method the upload should succeeded.
+ handler.ResumeFromCall = exepctedCalls + 1;
+ handler.ErrorOnResume = errorOnResume;
+
+ upload.Resume();
+
+ // The first request after resuming is to query the server where the media upload was interrupted.
+ // If errorOnResume is true, the server's first response will be 503.
+ exepctedCalls += MultipleChunksMessageHandler.CallAfterResume + 1 + (errorOnResume ? 1 : 0);
+ Assert.That(handler.Calls, Is.EqualTo(exepctedCalls));
+ Assert.NotNull(lastProgressStatus);
+ Assert.Null(lastProgressStatus.Exception);
+ Assert.That(lastProgressStatus.Status, Is.EqualTo(UploadStatus.Completed));
+ Assert.That(payload, Is.EqualTo(handler.ReceivedData.ToArray()));
+ }
}
}
@@ -865,9 +892,21 @@
SubtestChunkUploadFail(ServerError.ServerUnavailable);
}
- ///
- /// Tests failed uploading media (exception is thrown all the time from some request).
- ///
+ /// Tests the resume method.
+ [Test]
+ public void TestResumeAfterFail()
+ {
+ SubtestChunkUploadFail(ServerError.ServerUnavailable, true);
+ }
+
+ /// Tests the resume method. The first call after resuming returns server unavailable.
+ [Test]
+ public void TestResumeAfterFail_FirstCallAfterResumeIsServerUnavailable()
+ {
+ SubtestChunkUploadFail(ServerError.ServerUnavailable, true, true);
+ }
+
+ /// Tests failed uploading media (exception is thrown all the time from some request).
[Test]
public void TestChunkUploadFail_Exception()
{
@@ -958,11 +997,11 @@
const int id = 123;
var handler = new SingleChunkMessageHandler()
- {
- PathParameters = "testPath/" + id.ToString(),
- QueryParameters = "&queryA=valuea&queryB=VALUEB&time=2002-02-25T12%3A57%3A32.777Z",
- StreamLength = stream.Length
- };
+ {
+ PathParameters = "testPath/" + id.ToString(),
+ QueryParameters = "&queryA=valuea&queryB=VALUEB&time=2002-02-25T12%3A57%3A32.777Z",
+ StreamLength = stream.Length
+ };
using (var service = new MockClientService(new BaseClientService.Initializer()
{
@@ -987,22 +1026,22 @@
public void TestUploadWithRequestAndResponseBody()
{
var body = new TestRequest()
- {
- Name = "test object",
- Description = "the description",
- };
+ {
+ Name = "test object",
+ Description = "the description",
+ };
var handler = new RequestResponseMessageHandler()
- {
- ExpectedRequest = body,
- ExpectedResponse = new TestResponse
- {
- Name = "foo",
- Id = 100,
- Description = "bar",
- },
- Serializer = new NewtonsoftJsonSerializer()
- };
+ {
+ ExpectedRequest = body,
+ ExpectedResponse = new TestResponse
+ {
+ Name = "foo",
+ Id = 100,
+ Description = "bar",
+ },
+ Serializer = new NewtonsoftJsonSerializer()
+ };
using (var service = new MockClientService(new BaseClientService.Initializer()
{
Index: Src/GoogleApis/Apis/Requests/IClientServiceRequest.cs
===================================================================
--- a/Src/GoogleApis/Apis/Requests/IClientServiceRequest.cs
+++ b/Src/GoogleApis/Apis/Requests/IClientServiceRequest.cs
@@ -54,9 +54,8 @@
Task ExecuteAsStreamAsync();
/// Executes the request asynchronously and returns the result stream.
- /// A cancellation token for canceling the request in the middle of the
- /// execution. operation.
- Task ExecuteAsStreamAsync(CancellationToken cencellationToken);
+ /// A cancellation token to cancel operation.
+ Task ExecuteAsStreamAsync(CancellationToken cancellationToken);
/// Executes the request and returns the result stream.
Stream ExecuteAsStream();
@@ -72,9 +71,8 @@
Task ExecuteAsync();
/// Executes the request asynchronously and returns the result object.
- /// A cancellation token for canceling the request in the middle of execution.
- ///
- Task ExecuteAsync(CancellationToken cencellationToken);
+ /// A cancellation token to cancel operation.
+ Task ExecuteAsync(CancellationToken cancellationToken);
/// Executes the request and returns the result object.
TResponse Execute();
Index: Src/GoogleApis/Apis/[Media]/Upload/ResumableUpload.cs
===================================================================
--- a/Src/GoogleApis/Apis/[Media]/Upload/ResumableUpload.cs
+++ b/Src/GoogleApis/Apis/[Media]/Upload/ResumableUpload.cs
@@ -351,24 +351,108 @@
return UploadAsync(CancellationToken.None).Result;
}
+ /// Uploads the content asynchronously to the server.
+ public Task UploadAsync()
+ {
+ return UploadAsync(CancellationToken.None);
+ }
+
/// Uploads the content to the server using the given cancellation token.
///
/// In case the upload fails will contain the exception that
/// cause the failure. The only exception which will be thrown is
/// which indicates that the task was canceled.
///
+ /// A cancellation token to cancel operation.
public async Task UploadAsync(CancellationToken cancellationToken)
{
+ BytesServerReceived = 0;
+ UpdateProgress(new ResumableUploadProgress(UploadStatus.Starting, 0));
+ // Check if the stream length is known.
+ StreamLength = ContentStream.CanSeek ? ContentStream.Length : UnknownSize;
+
+ try
+ {
+ UploadUri = await InitializeUpload(cancellationToken).ConfigureAwait(false);
+ Logger.Debug("MediaUpload[{0}] - Start uploading...", UploadUri);
+ }
+ catch (Exception ex)
+ {
+ Logger.Error(ex, "MediaUpload - Exception occurred while initializing the upload");
+ UpdateProgress(new ResumableUploadProgress(ex, BytesServerReceived));
+ }
+
+ return await UploadCoreAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ /// Resumes the upload form the last point it was interrupted.
+ public IUploadProgress Resume()
+ {
+ return ResumeAsync(CancellationToken.None).Result;
+ }
+
+ /// Asynchronously resumes the upload form the last point it was interrupted.
+ public Task ResumeAsync()
+ {
+ return ResumeAsync(CancellationToken.None);
+ }
+
+ /// Asynchronously resumes the upload form the last point it was interrupted.
+ /// A cancellation token to cancel operation.
+ public async Task ResumeAsync(CancellationToken cancellationToken)
+ {
+ if (UploadUri == null)
+ {
+ Logger.Info("There isn't any upload in progress, so starting to upload again");
+ return await UploadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ // The first "resuming" request is to query the server in which point the upload was interrupted.
+ var range = String.Format("bytes */{0}", StreamLength < 0 ? "*" : StreamLength.ToString());
+ HttpRequestMessage request = new RequestBuilder()
+ {
+ BaseUri = UploadUri,
+ Method = HttpConsts.Put
+ }.CreateRequest();
+ request.SetEmptyContent().Headers.Add("Content-Range", range);
+
+ try
+ {
+ HttpResponseMessage response;
+ using (var callback = new ServerErrorCallback(this))
+ {
+ response = await Service.HttpClient.SendAsync(request, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ if (await HandleResponse(response).ConfigureAwait(false))
+ {
+ // All the media was successfully upload.
+ UpdateProgress(new ResumableUploadProgress(UploadStatus.Completed, BytesServerReceived));
+ return Progress;
+ }
+ }
+ catch (TaskCanceledException ex)
+ {
+ Logger.Error(ex, "MediaUpload[{0}] - Task was canceled", UploadUri);
+ UpdateProgress(new ResumableUploadProgress(ex, BytesServerReceived));
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ Logger.Error(ex, "MediaUpload[{0}] - Exception occurred while resuming uploading media", UploadUri);
+ UpdateProgress(new ResumableUploadProgress(ex, BytesServerReceived));
+ return Progress;
+ }
+
+ // Continue to upload the media stream.
+ return await UploadCoreAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ /// The core logic for uploading a stream. It is used by the upload and resume methods.
+ private async Task UploadCoreAsync(CancellationToken cancellationToken)
+ {
try
{
- BytesServerReceived = 0;
- UpdateProgress(new ResumableUploadProgress(UploadStatus.Starting, 0));
- // Check if the stream length is known.
- StreamLength = ContentStream.CanSeek ? ContentStream.Length : UnknownSize;
- UploadUri = await InitializeUpload(cancellationToken).ConfigureAwait(false);
-
- Logger.Debug("MediaUpload[{0}] - Start uploading...", UploadUri);
-
using (var callback = new ServerErrorCallback(this))
{
while (!await SendNextChunkAsync(ContentStream, cancellationToken).ConfigureAwait(false))
@@ -393,12 +477,6 @@
return Progress;
}
- /// Uploads the content asynchronously to the server.
- public Task UploadAsync()
- {
- return UploadAsync(CancellationToken.None);
- }
-
///
/// Initializes the resumable upload by calling the resumable rest interface to get a unique upload location.
/// See https://developers.google.com/drive/manage-uploads#start-resumable for more details.
@@ -447,8 +525,15 @@
Logger.Debug("MediaUpload[{0}] - Sending bytes={1}-{2}", UploadUri, BytesServerReceived,
BytesClientSent - 1);
- HttpResponseMessage response = await Service.HttpClient.SendAsync(
- request, cancellationToken).ConfigureAwait(false);
+ HttpResponseMessage response = await Service.HttpClient.SendAsync(request, cancellationToken)
+ .ConfigureAwait(false);
+ return await HandleResponse(response).ConfigureAwait(false);
+ }
+
+ /// Handles a media upload HTTP response.
+ /// True if the entire media has been completely uploaded.
+ private async Task HandleResponse(HttpResponseMessage response)
+ {
if (response.IsSuccessStatusCode)
{
MediaCompleted(response);
@@ -474,9 +559,9 @@
{
Logger.Debug("MediaUpload[{0}] - media was uploaded successfully", UploadUri);
ProcessResponse(response);
- BytesServerReceived += LastMediaLength;
+ BytesServerReceived = StreamLength;
- // clear the last request byte array
+ // Clear the last request byte array.
LastMediaRequest = null;
}
@@ -522,7 +607,7 @@
if (shouldRead)
{
int len = 0;
- // read bytes form the stream to lastMediaRequest byte array
+ // Read bytes form the stream to lastMediaRequest byte array.
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
@@ -533,16 +618,16 @@
LastMediaLength += len;
}
- // check if there is still data to read from stream, and cache the first byte in catchedByte
+ // Check if there is still data to read from stream, and cache the first byte in catchedByte.
if (0 == stream.Read(CachedByte, 0, 1))
{
- // EOF - now we know the stream's length
+ // EOF - now we know the stream's length.
StreamLength = LastMediaLength + BytesServerReceived;
CachedByte = null;
}
}
- // set Content-Length and Content-Range
+ // Set Content-Length and Content-Range.
var byteArrayContent = new ByteArrayContent(LastMediaRequest, 0, LastMediaLength);
byteArrayContent.Headers.Add("Content-Range", GetContentRangeHeader(BytesServerReceived, LastMediaLength));
request.Content = byteArrayContent;
@@ -554,12 +639,12 @@
{
int chunkSize = (int)Math.Min(StreamLength - BytesServerReceived, (long)ChunkSize);
- // stream length is known and it supports seek and position operations.
- // We can change the stream position and read bytes from the last point
+ // Stream length is known and it supports seek and position operations.
+ // We can change the stream position and read bytes from the last point.
byte[] buffer = new byte[Math.Min(chunkSize, BufferSize)];
- // if the number of bytes received by the server isn't equal to the amount of bytes the client sent, we
- // need to change the position of the input stream, otherwise we can continue from the current position
+ // If the number of bytes received by the server isn't equal to the amount of bytes the client sent, we
+ // need to change the position of the input stream, otherwise we can continue from the current position.
if (BytesClientSent != BytesServerReceived)
{
stream.Position = BytesServerReceived;
@@ -571,15 +656,15 @@
{
cancellationToken.ThrowIfCancellationRequested();
- // read from input stream and write to output stream
- // TODO(peleyal): write a utility similar to (.NET 4 Stream.CopyTo method)
+ // Read from input stream and write to output stream.
+ // TODO(peleyal): write a utility similar to (.NET 4 Stream.CopyTo method).
int len = stream.Read(buffer, 0, (int)Math.Min(buffer.Length, chunkSize - bytesRead));
if (len == 0) break;
ms.Write(buffer, 0, len);
bytesRead += len;
}
- // set the stream position to beginning and wrap it with stream content
+ // Set the stream position to beginning and wrap it with stream content.
ms.Position = 0;
request.Content = new StreamContent(ms);
request.Content.Headers.Add("Content-Range", GetContentRangeHeader(BytesServerReceived, chunkSize));