feat: WIP

This commit is contained in:
yuhuihuang 2020-08-26 20:02:00 +08:00
parent 697ec45620
commit 46d502a5fd
3 changed files with 225 additions and 10 deletions

View File

@ -10,50 +10,68 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.jackhuang.hmcl.util.Logging.LOG;
class DownloadManager {
static DownloadState ne(int contentLength, int initialParts) {
}
static DownloadTaskState download(List<String> urls, Path file, int initialParts) throws IOException {
Path downloadingFile = file.resolveSibling(FileUtils.getName(file) + ".download");
Path stateFile = file.resolveSibling(FileUtils.getName(file) + ".status");
DownloadState state;
DownloadState state = null;
if (Files.exists(downloadingFile) && Files.exists(stateFile)) {
// Resume downloading from state
try {
String status = FileUtils.readText(stateFile);
state = JsonUtils.fromNonNullJson(status, DownloadState.class);
} catch (JsonParseException e) {
state =
LOG.log(Level.WARNING, "Failed to parse download state file", e);
}
}
if (state == null || !urls.equals(state.urls)) {
return DownloadTaskState.newWithLengthUnknown(urls, initialParts);
} else {
return new DownloadTaskState(state);
}
}
protected static class DownloadTaskState {
private final List<String> urls;
private final List<DownloadSegment> segments;
private final List<Thread> threads;
private String fastestUrl;
private int retry = 0;
private boolean cancelled = false;
DownloadTaskState(DownloadState state) {
urls = new ArrayList<>(state.urls);
segments = new ArrayList<>(state.segments);
threads = IntStream.range(0, state.segments.size()).mapToObj(x -> (Thread) null).collect(Collectors.toList());
}
DownloadTaskState(List<String> urls, int contentLength, int initialParts) {
urls = new ArrayList<>(urls);
if (urls == null || urls.size() == 0) {
throw new IllegalArgumentException("DownloadTaskState requires at least one url candidate");
}
this.urls = new ArrayList<>(urls);
segments = new ArrayList<>(initialParts);
threads = new ArrayList<>(initialParts);
int partLength = contentLength / initialParts;
for (int i = 0; i < initialParts; i++) {
int begin = partLength * i;
int end = Math.min((partLength + 1) * i, contentLength);
segments.add(new DownloadSegment(begin, end, 0));
threads.add(null);
}
}
public static DownloadTaskState newWithLengthUnknown(List<String> urls, int initialParts) {
return
return new DownloadTaskState(urls, 0, initialParts);
}
public List<String> getUrls() {
@ -63,6 +81,52 @@ class DownloadManager {
public List<DownloadSegment> getSegments() {
return segments;
}
public String getFirstUrl() {
return urls.get(0);
}
/**
* Next url for download runnable to retry.
*
* If some download runnable fails to connect to url, it will call this method
* to acquire next url for retry. Making all download runnable try different
* candidates concurrently to speed up finding fastest download source.
*
* @return next url to retry
*/
public synchronized String getNextUrlToRetry() {
String url = urls.get(retry);
retry = (retry + 1) % urls.size();
return url;
}
/**
* One candidate that is accessible and best network connection qualified.
*
* When some download runnable have started downloading, DownloadManager will
* monitor download speed and make failed download runnable connect to the
* fastest url directly without retry.
*
* In some times, the fastest url may be the first url suceeded in connection.
*
* @return fastest url, null if no url have successfully connected yet.
*/
public synchronized String getFastestUrl() {
return fastestUrl;
}
public synchronized void setFastestUrl(String fastestUrl) {
this.fastestUrl = fastestUrl;
}
public synchronized void cancel() {
cancelled = true;
}
public synchronized boolean isCancelled() {
return cancelled;
}
}
protected static class DownloadState {

View File

@ -1,5 +1,156 @@
package org.jackhuang.hmcl.task;
class DownloadTask {
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Path;
import java.util.logging.Level;
abstract class DownloadTask implements Runnable {
private final DownloadManager.DownloadTaskState state;
private final RandomAccessFile file;
private URLConnection conn;
private DownloadManager.DownloadSegment segment;
protected boolean caching;
protected CacheRepository repository = CacheRepository.getInstance();
public DownloadTask(DownloadManager.DownloadTaskState state, RandomAccessFile file, DownloadManager.DownloadSegment segment) {
this.state = state;
this.file = file;
this.segment = segment;
}
public void setCaching(boolean caching) {
this.caching = caching;
}
public void setCacheRepository(CacheRepository repository) {
this.repository = repository;
}
protected void beforeDownload(URL url) throws IOException {
}
protected abstract void useCachedResult(Path cachedFile) throws IOException;
protected abstract FetchTask.EnumCheckETag shouldCheckETag();
protected abstract FetchTask.Context getContext(URLConnection conn, boolean checkETag) throws IOException;
@Override
public void run() {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (shouldCheckETag()) {
case CHECK_E_TAG:
checkETag = true;
break;
case NOT_CHECK_E_TAG:
checkETag = false;
break;
default:
return;
}
int repeat = 0;
while (true) {
if (state.isCancelled()) {
break;
}
String url = repeat == 0 ? state.getFirstUrl() : state.getNextUrlToRetry();
repeat++;
if (url == null) {
break;
}
try {
beforeDownload(url);
updateProgress(0);
URLConnection conn = NetworkUtils.createConnection(NetworkUtils.toURL(url);
if (checkETag) repository.injectConnection(conn);
if (conn instanceof HttpURLConnection) {
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
int responseCode = ((HttpURLConnection) conn).getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = repository.getCachedRemoteFile(conn);
useCachedResult(cache);
return;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Unable to use cached file, redownload " + url, e);
repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to redownload the file, we must reconnect the server without etag settings.
repeat--;
continue;
}
} else if (responseCode / 100 == 4) {
break; // we will not try this URL again
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(url, responseCode);
}
}
long contentLength = conn.getContentLength();
try (FetchTask.Context context = getContext(conn, checkETag); InputStream stream = conn.getInputStream()) {
int lastDownloaded = 0, downloaded = 0;
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (state.isCancelled()) break;
int len = stream.read(buffer);
if (len == -1) break;
context.write(buffer, 0, len);
downloaded += len;
if (contentLength >= 0) {
// Update progress information per second
updateProgress(downloaded, contentLength);
}
updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
}
if (state.isCancelled()) break;
updateDownloadSpeed(downloaded - lastDownloaded);
if (contentLength >= 0 && downloaded != contentLength)
throw new IOException("Unexpected file size: " + downloaded + ", expected: " + contentLength);
context.withResult(true);
}
return;
} catch (IOException ex) {
failedURL = url;
exception = ex;
Logging.LOG.log(Level.WARNING, "Failed to download " + url + ", repeat times: " + repeat, ex);
}
}
if (exception != null)
throw new DownloadException(failedURL, exception);
}
}

View File

@ -76,7 +76,7 @@ public final class GetTask extends FetchTask<String> {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@Override
public void write(byte[] buffer, int offset, int len) {
public synchronized void write(byte[] buffer, int offset, int len) {
baos.write(buffer, offset, len);
}