mirror of
https://github.com/Unidata/netcdf-c.git
synced 2025-01-18 15:55:12 +08:00
df3636b959
This PR started as an attempt to add unlimited dimensions to NCZarr. It did that, but this exposed significant problems with test interference. So this PR is mostly about fixing -- well mitigating anyway -- test interference. The problem of test interference is now documented in the document docs/internal.md. The solutions implemented here are also describe in that document. The solution is somewhat fragile but multiple cleanup mechanisms are provided. Note that this feature requires that the AWS command line utility must be installed. ## Unlimited Dimensions. The existing NCZarr extensions to Zarr are modified to support unlimited dimensions. NCzarr extends the Zarr meta-data for the ".zgroup" object to include netcdf-4 model extensions. This information is stored in ".zgroup" as dictionary named "_nczarr_group". Inside "_nczarr_group", there is a key named "dims" that stores information about netcdf-4 named dimensions. The value of "dims" is a dictionary whose keys are the named dimensions. The value associated with each dimension name has one of two forms Form 1 is a special case of form 2, and is kept for backward compatibility. Whenever a new file is written, it uses format 1 if possible, otherwise format 2. * Form 1: An integer representing the size of the dimension, which is used for simple named dimensions. * Form 2: A dictionary with the following keys and values" - "size" with an integer value representing the (current) size of the dimension. - "unlimited" with a value of either "1" or "0" to indicate if this dimension is an unlimited dimension. For Unlimited dimensions, the size is initially zero, and as variables extend the length of that dimension, the size value for the dimension increases. That dimension size is shared by all arrays referencing that dimension, so if one array extends an unlimited dimension, it is implicitly extended for all other arrays that reference that dimension. This is the standard semantics for unlimited dimensions. Adding unlimited dimensions required a number of other changes to the NCZarr code-base. These included the following. * Did a partial refactor of the slice handling code in zwalk.c to clean it up. * Added a number of tests for unlimited dimensions derived from the same test in nc_test4. * Added several NCZarr specific unlimited tests; more are needed. * Add test of endianness. ## Misc. Other Changes * Modify libdispatch/ncs3sdk_aws.cpp to optionally support use of the AWS Transfer Utility mechanism. This is controlled by the ```#define TRANSFER```` command in that file. It defaults to being disabled. * Parameterize both the standard Unidata S3 bucket (S3TESTBUCKET) and the netcdf-c test data prefix (S3TESTSUBTREE). * Fixed an obscure memory leak in ncdump. * Removed some obsolete unit testing code and test cases. * Uncovered a bug in the netcdf-c handling of big-endian floats and doubles. Have not fixed yet. See tst_h5_endians.c. * Renamed some nczarr_tests testcases to avoid name conflicts with nc_test4. * Modify the semantics of zmap\#ncsmap_write to only allow total rewrite of objects. * Modify the semantics of zodom to properly handle stride > 1. * Add a truncate operation to the libnczarr zmap code.
827 lines
25 KiB
C++
827 lines
25 KiB
C++
/*
|
|
* Copyright 2018, University Corporation for Atmospheric Research
|
|
* See netcdf/COPYRIGHT file for copying and redistribution conditions.
|
|
*/
|
|
|
|
/* WARNING: changes to this file may need to be propagated to libsrc/s3sdk.cpp */
|
|
|
|
#undef DEBUG
|
|
|
|
/* Use Aws::Transfer instead of direct REST API */
|
|
#undef TRANSFER
|
|
|
|
/* Disable bucket creation/deletion */
|
|
#define NOOPBUCKET
|
|
|
|
#include "awsincludes.h"
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <iostream>
|
|
#include <streambuf>
|
|
#include "netcdf.h"
|
|
#include "ncrc.h"
|
|
|
|
#ifdef TRANSFER
|
|
#include <aws/core/utils/memory/AWSMemory.h>
|
|
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
|
|
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
|
|
#include <aws/transfer/TransferManager.h>
|
|
#include <aws/transfer/TransferHandle.h>
|
|
#endif
|
|
|
|
#include "ncs3sdk.h"
|
|
|
|
#undef NCTRACING
|
|
#ifdef NCTRACING
|
|
#define NCTRACE(level,fmt,...) nctrace((level),__func__,fmt,##__VA_ARGS__)
|
|
#define NCTRACEMORE(level,fmt,...) nctracemore((level),fmt,##__VA_ARGS__)
|
|
#define NCUNTRACE(e) ncuntrace(__func__,THROW(e),NULL)
|
|
#define NCUNTRACEX(e,fmt,...) ncuntrace(__func__,THROW(e),fmt,##__VA_ARGS__)
|
|
#define NCUNTRACENOOP(e) NCUNTRACE(e)
|
|
#else
|
|
#define NCTRACE(level,fmt,...)
|
|
#define NCTRACEMORE(level,fmt,...)
|
|
#define NCUNTRACE(e) (e)
|
|
#define NCUNTRACEX(e,fmt,...) (e)
|
|
#define NCUNTRACENOOP(e)
|
|
#endif
|
|
|
|
#ifdef __CYGWIN__
|
|
extern char* strdup(const char*);
|
|
#endif
|
|
|
|
#define size64_t unsigned long long
|
|
|
|
#ifdef TRANSFER
|
|
#define AWSS3CLIENT std::shared_ptr<Aws::S3::S3Client>*
|
|
#define AWSS3GET(x) (((AWSS3CLIENT)(x))->get())
|
|
#else
|
|
#define AWSS3CLIENT Aws::S3::S3Client*
|
|
#define AWSS3GET(x) ((AWSS3CLIENT)(x))
|
|
#endif
|
|
|
|
struct KeySet {
|
|
size_t nkeys;
|
|
size_t alloc;
|
|
char** keys;
|
|
KeySet() {nkeys = 0; alloc = 0; keys = NULL;}
|
|
~KeySet() {clear();}
|
|
void push(char* key) {
|
|
if(alloc == 0) {
|
|
keys = (char**)calloc(10,sizeof(char*));
|
|
alloc = 10;
|
|
}
|
|
if(nkeys >= alloc) {
|
|
char** newkeys = (char**)calloc(alloc*2,sizeof(char*));
|
|
memcpy(newkeys,keys,sizeof(char*)*nkeys);
|
|
alloc *= 2;
|
|
free(keys);
|
|
keys = newkeys;
|
|
}
|
|
keys[nkeys++] = key;
|
|
}
|
|
char** extractkeys() {char** outkeys = keys; keys = NULL; clear(); return outkeys;}
|
|
char* extractithkey(size_t i) {
|
|
if(i >= nkeys) return NULL;
|
|
char* k = keys[i];
|
|
keys[i] = NULL;
|
|
return k;
|
|
}
|
|
size_t getnkeys() {return nkeys;}
|
|
void clear() {
|
|
if(keys) {
|
|
for(size_t i=0;i<nkeys;i++)
|
|
{if(keys[i] != NULL) free(keys[i]);}
|
|
free(keys);
|
|
}
|
|
nkeys = 0;
|
|
alloc = 0;
|
|
keys = NULL;
|
|
}
|
|
};
|
|
|
|
static int ncs3_initialized = 0;
|
|
static int ncs3_finalized = 0;
|
|
|
|
static Aws::SDKOptions ncs3options;
|
|
|
|
/* Forward */
|
|
static Aws::S3::Model::BucketLocationConstraint s3findregion(const char* name);
|
|
static int s3objectinfo1(const Aws::S3::Model::Object& s3_object, char** fullkeyp);
|
|
static int s3objectsinfo(Aws::Vector<Aws::S3::Model::Object> list, KeySet* keys);
|
|
static int s3commonprefixes(Aws::Vector<Aws::S3::Model::CommonPrefix> list, KeySet* keys);
|
|
static int makes3key(const char* pathkey, const char** keyp);
|
|
static int makes3keydir(const char* prefix, char** prefixdirp);
|
|
static int mergekeysets(KeySet* keys1, KeySet* keys2, KeySet* merge);
|
|
|
|
const char*
|
|
NCS3_dumps3info(NCS3INFO* info)
|
|
{
|
|
static char text[8192];
|
|
snprintf(text,sizeof(text),"host=%s region=%s bucket=%s rootkey=%s profile=%s",
|
|
(info->host?info->host:"null"),
|
|
(info->region?info->region:"null"),
|
|
(info->bucket?info->bucket:"null"),
|
|
(info->rootkey?info->rootkey:"null"),
|
|
(info->profile?info->profile:"null"));
|
|
return text;
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkinitialize(void)
|
|
{
|
|
if(!ncs3_initialized) {
|
|
ncs3_initialized = 1;
|
|
ncs3_finalized = 0;
|
|
NCTRACE(11,NULL);
|
|
Aws::InitAPI(ncs3options);
|
|
#ifdef DEBUG
|
|
ncs3options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug;
|
|
#endif
|
|
}
|
|
return NCUNTRACE(NC_NOERR);
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkfinalize(void)
|
|
{
|
|
if(!ncs3_finalized) {
|
|
ncs3_initialized = 0;
|
|
ncs3_finalized = 1;
|
|
NCTRACE(11,NULL);
|
|
Aws::ShutdownAPI(ncs3options);
|
|
}
|
|
return NCUNTRACE(NC_NOERR);
|
|
}
|
|
|
|
static char*
|
|
makeerrmsg(const Aws::Client::AWSError<Aws::S3::S3Errors> err, const char* key="")
|
|
{
|
|
char* errmsg;
|
|
size_t len;
|
|
len = strlen(err.GetExceptionName().c_str()) + strlen(err.GetMessage().c_str()) + strlen(key) + 10;
|
|
if((errmsg = (char*)malloc(len+1))==NULL)
|
|
return NULL;
|
|
snprintf(errmsg,len,"%s %s key=%s",
|
|
err.GetExceptionName().c_str(),
|
|
err.GetMessage().c_str(),
|
|
key);
|
|
return errmsg;
|
|
}
|
|
|
|
|
|
static Aws::Client::ClientConfiguration
|
|
s3sdkcreateconfig(NCS3INFO* info)
|
|
{
|
|
NCTRACE(11,"info=%s", dumps3info(info));
|
|
|
|
Aws::Client::ClientConfiguration config;
|
|
|
|
if(info->profile)
|
|
config.profileName = info->profile;
|
|
config.scheme = Aws::Http::Scheme::HTTPS;
|
|
config.connectTimeoutMs = 300000;
|
|
config.requestTimeoutMs = 600000;
|
|
if(info->region) config.region = info->region;
|
|
if(info->host) config.endpointOverride = info->host;
|
|
config.enableEndpointDiscovery = true;
|
|
config.followRedirects = Aws::Client::FollowRedirectsPolicy::ALWAYS;
|
|
NCUNTRACENOOP(NC_NOERR);
|
|
return config;
|
|
}
|
|
|
|
static AWSS3CLIENT
|
|
buildclient(Aws::Client::ClientConfiguration* config, Aws::Auth::AWSCredentials* creds)
|
|
{
|
|
AWSS3CLIENT s3client = NULL;
|
|
#ifdef TRANSFER
|
|
std::shared_ptr<Aws::S3::S3Client> client;
|
|
if(creds != NULL) {
|
|
client = Aws::MakeShared<Aws::S3::S3Client>("S3Client",*creds,*config,
|
|
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent,
|
|
false);
|
|
} else {
|
|
client = Aws::MakeShared<Aws::S3::S3Client>("S3Client",*config,
|
|
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent,
|
|
false);
|
|
}
|
|
// Create allocated client space
|
|
s3client = new std::shared_ptr<Aws::S3::S3Client>;
|
|
*s3client = client;
|
|
#else
|
|
if(creds != NULL) {
|
|
s3client = new Aws::S3::S3Client(*creds,*config,
|
|
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent,
|
|
false);
|
|
} else {
|
|
s3client = new Aws::S3::S3Client(*config,
|
|
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent,
|
|
false);
|
|
}
|
|
#endif
|
|
return s3client;
|
|
}
|
|
|
|
EXTERNL void*
|
|
NC_s3sdkcreateclient(NCS3INFO* info)
|
|
{
|
|
NCTRACE(11,NULL);
|
|
|
|
Aws::Client::ClientConfiguration config = s3sdkcreateconfig(info);
|
|
AWSS3CLIENT s3client;
|
|
|
|
if(info->profile == NULL || strcmp(info->profile,"none")==0) {
|
|
Aws::Auth::AWSCredentials creds;
|
|
creds.SetAWSAccessKeyId(Aws::String(""));
|
|
creds.SetAWSSecretKey(Aws::String(""));
|
|
|
|
s3client = buildclient(&config,&creds);
|
|
} else {
|
|
s3client = buildclient(&config,NULL);
|
|
}
|
|
// delete config;
|
|
NCUNTRACENOOP(NC_NOERR);
|
|
return (void*)s3client;
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkbucketexists(void* s3client0, const char* bucket, int* existsp, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
int exists = 0;
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
|
|
NCTRACE(11,"bucket=%s",bucket);
|
|
if(errmsgp) *errmsgp = NULL;
|
|
#if 0
|
|
char* errmsg = NULL;
|
|
auto result = s3client->ListBuckets();
|
|
if(!result.IsSuccess()) {
|
|
auto re = result.GetError();
|
|
errmsg = makeerrmsg(re);
|
|
fprintf(stderr,">>> errmsg=%s\n",errmsg);
|
|
if(errmsgp) {*errmsgp = errmsg; errmsg = NULL;}
|
|
stat = NC_ES3;
|
|
} else {
|
|
Aws::Vector<Aws::S3::Model::Bucket> bucket_list = result.GetResult().GetBuckets();
|
|
for(auto const &awsbucket : bucket_list) {
|
|
auto name = awsbucket.GetName();
|
|
if(name == bucket) {exists = 1; break;}
|
|
}
|
|
}
|
|
#else
|
|
Aws::S3::Model::HeadBucketRequest request;
|
|
request.SetBucket(bucket);
|
|
auto result = AWSS3GET(s3client)->HeadBucket(request);
|
|
exists = result.IsSuccess() ? 1 : 0;
|
|
#endif
|
|
if(existsp) *existsp = exists;
|
|
return NCUNTRACEX(stat,"exists=%d",*existsp);
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkbucketcreate(void* s3client0, const char* region, const char* bucket, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
|
|
NCTRACE(11,"region=%s bucket=%s",region,bucket);
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
if(errmsgp) *errmsgp = NULL;
|
|
const Aws::S3::Model::BucketLocationConstraint &awsregion = s3findregion(region);
|
|
if(awsregion == Aws::S3::Model::BucketLocationConstraint::NOT_SET)
|
|
return NC_EURL;
|
|
/* Set up the request */
|
|
Aws::S3::Model::CreateBucketRequest create_request;
|
|
create_request.SetBucket(bucket);
|
|
if(region) {
|
|
/* Specify the region as a location constraint */
|
|
Aws::S3::Model::CreateBucketConfiguration bucket_config;
|
|
bucket_config.SetLocationConstraint(awsregion);
|
|
create_request.SetCreateBucketConfiguration(bucket_config);
|
|
}
|
|
#ifdef NOOPBUCKET
|
|
/* Create the bucket */
|
|
auto create_result = AWSS3GET(s3client)->CreateBucket(create_request);
|
|
if(!create_result.IsSuccess()) {
|
|
if(errmsgp) *errmsgp = makeerrmsg(create_result.GetError());
|
|
stat = NC_ES3;;
|
|
}
|
|
#else
|
|
fprintf(stderr,"create bucket: %s\n",bucket); fflush(stderr);
|
|
#endif
|
|
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkbucketdelete(void* s3client0, NCS3INFO* info, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
|
|
NCTRACE(11,"info=%s%s",dumps3info(info));
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
|
|
if(errmsgp) *errmsgp = NULL;
|
|
const Aws::S3::Model::BucketLocationConstraint &awsregion = s3findregion(info->region);
|
|
if(awsregion == Aws::S3::Model::BucketLocationConstraint::NOT_SET)
|
|
return NC_EURL;
|
|
/* Set up the request */
|
|
Aws::S3::Model::DeleteBucketRequest request;
|
|
request.SetBucket(info->bucket);
|
|
|
|
#ifdef NOOPBUCKET
|
|
/* Delete the bucket */
|
|
auto result = AWSS3GET(s3client)->DeleteBucket(request);
|
|
if(!result.IsSuccess()) {
|
|
if(errmsgp) *errmsgp = makeerrmsg(result.GetError());
|
|
stat = NC_ES3;;
|
|
}
|
|
#else
|
|
fprintf(stderr,"delete bucket: %s\n",bucket); fflush(stderr);
|
|
#endif
|
|
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
/**************************************************/
|
|
/* Object API */
|
|
|
|
/*
|
|
@return NC_NOERR if key points to a content-bearing object.
|
|
@return NC_EEMPTY if object at key has no content.
|
|
@return NC_EXXX return true error
|
|
*/
|
|
EXTERNL int
|
|
NC_s3sdkinfo(void* s3client0, const char* bucket, const char* pathkey, size64_t* lenp, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
const char* key = NULL;
|
|
|
|
NCTRACE(11,"bucket=%s pathkey=%s",bucket,pathkey);
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
Aws::S3::Model::HeadObjectRequest head_request;
|
|
if(*pathkey != '/') return NC_EINTERNAL;
|
|
/* extract the true s3 key*/
|
|
if((stat = makes3key(pathkey,&key))) return NCUNTRACE(stat);
|
|
|
|
if(errmsgp) *errmsgp = NULL;
|
|
head_request.SetBucket(bucket);
|
|
head_request.SetKey(key);
|
|
auto head_outcome = AWSS3GET(s3client)->HeadObject(head_request);
|
|
if(head_outcome.IsSuccess()) {
|
|
long long l = head_outcome.GetResult().GetContentLength();
|
|
if(lenp) *lenp = (size64_t)l;
|
|
} else {
|
|
if(lenp) *lenp = 0;
|
|
/* Distinquish not-found from other errors */
|
|
switch (head_outcome.GetError().GetErrorType()) {
|
|
case Aws::S3::S3Errors::RESOURCE_NOT_FOUND:
|
|
stat = NC_EEMPTY;
|
|
break;
|
|
case Aws::S3::S3Errors::ACCESS_DENIED:
|
|
stat = NC_EACCESS;
|
|
/* fall thru */
|
|
default:
|
|
if(!stat) stat = NC_ES3;
|
|
if(errmsgp) *errmsgp = makeerrmsg(head_outcome.GetError(),key);
|
|
break;
|
|
}
|
|
}
|
|
return NCUNTRACEX(stat,"len=%d",(int)(lenp?*lenp:-1));
|
|
}
|
|
|
|
/**
|
|
* In-memory stream implementations
|
|
*/
|
|
class DownLoadStream : public Aws::IOStream
|
|
{
|
|
public:
|
|
using Base = Aws::IOStream;
|
|
// Provide a customer-controlled streambuf to hold data from the bucket.
|
|
explicit DownLoadStream(std::streambuf* buf) : Base(buf) {}
|
|
~DownLoadStream() override = default;
|
|
};
|
|
|
|
class UpLoadStream : public Aws::IOStream
|
|
{
|
|
public:
|
|
using Base = Aws::IOStream;
|
|
// Provide a customer-controlled streambuf to hold data from the bucket.
|
|
explicit UpLoadStream(std::streambuf* buf) : Base(buf) {}
|
|
~UpLoadStream() override = default;
|
|
};
|
|
|
|
/*
|
|
@return NC_NOERR if success
|
|
@return NC_EXXX if fail
|
|
*/
|
|
EXTERNL int
|
|
NC_s3sdkread(void* s3client0, const char* bucket, const char* pathkey, size64_t start, size64_t count, void* content, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
const char* key = NULL;
|
|
|
|
NCTRACE(11,"bucket=%s pathkey=%s start=%llu count=%llu content=%p",bucket,pathkey,start,count,content);
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
|
|
if(count == 0) return NCUNTRACE(stat);
|
|
if(errmsgp) *errmsgp = NULL;
|
|
if(*pathkey != '/') return NC_EINTERNAL;
|
|
if((stat = makes3key(pathkey,&key))) return NCUNTRACE(stat);
|
|
|
|
#ifdef TRANSFER
|
|
auto executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>("executor", 25);
|
|
Aws::Transfer::TransferManagerConfiguration transfer_config(executor.get());
|
|
transfer_config.s3Client = *s3client;
|
|
// The local variable downloadBuffer' is captured by reference in a lambda.
|
|
// It must persist until all downloading by the 'transfer_manager' is complete.
|
|
Aws::Utils::Stream::PreallocatedStreamBuf downloadBuffer((unsigned char*)content, count - start);
|
|
auto transfer_manager = Aws::Transfer::TransferManager::Create(transfer_config);
|
|
auto creationFunction = [&downloadBuffer]() { //Define a lambda expression for the callback method parameter to stream back the data.
|
|
return Aws::New<DownLoadStream>("downloadHandle", &downloadBuffer);};
|
|
auto downloadHandle = transfer_manager->DownloadFile(bucket, key, start, count, creationFunction);
|
|
downloadHandle->WaitUntilFinished();
|
|
bool success = downloadHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED;
|
|
if (!success) {
|
|
auto err = downloadHandle->GetLastError();
|
|
if(errmsgp) *errmsgp = makeerrmsg(err,key);
|
|
stat = NC_ES3;
|
|
} else {
|
|
size64_t transferred = downloadHandle->GetBytesTransferred();
|
|
size64_t totalsize = downloadHandle->GetBytesTotalSize();
|
|
assert(count == totalsize && transferred == totalsize);
|
|
}
|
|
#else
|
|
char range[1024];
|
|
size64_t rangeend;
|
|
Aws::S3::Model::GetObjectRequest object_request;
|
|
object_request.SetBucket(bucket);
|
|
object_request.SetKey(key);
|
|
rangeend = (start+count)-1;
|
|
snprintf(range,sizeof(range),"bytes=%llu-%llu",start,rangeend);
|
|
object_request.SetRange(range);
|
|
auto get_object_result = AWSS3GET(s3client)->GetObject(object_request);
|
|
if(!get_object_result.IsSuccess()) {
|
|
if(errmsgp) *errmsgp = makeerrmsg(get_object_result.GetError(),key);
|
|
stat = NC_ES3;
|
|
} else {
|
|
/* Get the whole result */
|
|
Aws::IOStream &result = get_object_result.GetResultWithOwnership().GetBody();
|
|
std::string str((std::istreambuf_iterator<char>(result)),std::istreambuf_iterator<char>());
|
|
/* Verify actual result size */
|
|
size_t slen = str.size();
|
|
if(slen > count) return NC_ES3;
|
|
const char* s = str.c_str();
|
|
if(content)
|
|
memcpy(content,s,slen);
|
|
}
|
|
#endif
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
/*
|
|
For S3, I can see no way to do a byterange write;
|
|
so we are effectively writing the whole object
|
|
*/
|
|
EXTERNL int
|
|
NC_s3sdkwriteobject(void* s3client0, const char* bucket, const char* pathkey, size64_t count, const void* content, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
const char* key = NULL;
|
|
|
|
NCTRACE(11,"bucket=%s pathkey=%s count=%lld content=%p",bucket,pathkey,count,content);
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
|
|
if(errmsgp) *errmsgp = NULL;
|
|
if(*pathkey != '/') return NCUNTRACE(NC_EINTERNAL);
|
|
if((stat = makes3key(pathkey,&key))) return NCUNTRACE(stat);
|
|
|
|
#ifdef TRANSFER
|
|
auto executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>("executor", 25);
|
|
Aws::Transfer::TransferManagerConfiguration transfer_config(executor.get());
|
|
transfer_config.s3Client = *s3client;
|
|
auto transfer_manager = Aws::Transfer::TransferManager::Create(transfer_config);
|
|
std::shared_ptr<Aws::IOStream> uploadStream = std::shared_ptr<Aws::IOStream>(new Aws::StringStream());
|
|
uploadStream->rdbuf()->pubsetbuf((char*)content,count);
|
|
auto uploadHandle = transfer_manager->UploadFile(uploadStream, bucket, key, "application/octet-stream", Aws::Map<Aws::String, Aws::String>());
|
|
uploadHandle->WaitUntilFinished();
|
|
bool success = uploadHandle->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED;
|
|
if (!success) {
|
|
auto err = uploadHandle->GetLastError();
|
|
if(errmsgp) *errmsgp = makeerrmsg(err,key);
|
|
stat = NC_ES3;
|
|
} else {
|
|
size64_t transferred = uploadHandle->GetBytesTransferred();
|
|
size64_t totalsize = uploadHandle->GetBytesTotalSize();
|
|
assert(count == totalsize && transferred == totalsize);
|
|
}
|
|
#else
|
|
Aws::S3::Model::PutObjectRequest put_request;
|
|
put_request.SetBucket(bucket);
|
|
put_request.SetKey(key);
|
|
put_request.SetContentLength((long long)count);
|
|
|
|
std::shared_ptr<Aws::IOStream> data = std::shared_ptr<Aws::IOStream>(new Aws::StringStream());
|
|
data->rdbuf()->pubsetbuf((char*)content,count);
|
|
put_request.SetBody(data);
|
|
auto put_result = AWSS3GET(s3client)->PutObject(put_request);
|
|
if(!put_result.IsSuccess()) {
|
|
if(errmsgp) *errmsgp = makeerrmsg(put_result.GetError(),key);
|
|
stat = NC_ES3;
|
|
}
|
|
#endif
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkclose(void* s3client0, NCS3INFO* info, int deleteit, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
|
|
NCTRACE(11,"info=%s rootkey=%s deleteit=%d",dumps3info(info),deleteit);
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
if(deleteit) {
|
|
/* Delete the root key; ok it if does not exist */
|
|
switch (stat = NC_s3sdkdeletekey(s3client0,info->bucket,info->rootkey,errmsgp)) {
|
|
case NC_NOERR: break;
|
|
case NC_EEMPTY: case NC_ENOTFOUND: stat = NC_NOERR; break;
|
|
default: break;
|
|
}
|
|
}
|
|
#ifdef TRANSFER
|
|
delete s3client;
|
|
#else
|
|
delete s3client;
|
|
#endif
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
/*
|
|
Return a list of names of legal objects immediately below a specified key.
|
|
In theory, the returned list should be sorted in lexical order,
|
|
but it possible that it is not.
|
|
*/
|
|
static int
|
|
getkeys(void* s3client0, const char* bucket, const char* prefixkey0, const char* delim, size_t* nkeysp, char*** keysp, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
const char* prefix = NULL;
|
|
char* prefixdir = NULL;
|
|
bool istruncated = false;
|
|
char* continuetoken = NULL;
|
|
AWSS3CLIENT s3client = NULL;
|
|
KeySet commonkeys;
|
|
KeySet realkeys;
|
|
KeySet allkeys;
|
|
|
|
NCTRACE(11,"bucket=%s prefixkey0=%s",bucket,prefixkey0);
|
|
|
|
if(*prefixkey0 != '/') {stat = NC_EINTERNAL; goto done;}
|
|
if(errmsgp) *errmsgp = NULL;
|
|
|
|
s3client = (AWSS3CLIENT)s3client0;
|
|
|
|
do {
|
|
Aws::S3::Model::ListObjectsV2Request objects_request;
|
|
if(prefixdir != NULL) free(prefixdir);
|
|
realkeys.clear();
|
|
commonkeys.clear();
|
|
/* Make sure that the prefix ends with '/' */
|
|
if((stat = makes3keydir(prefixkey0,&prefixdir))) goto done;
|
|
/* remove leading '/' */
|
|
if((stat = makes3key(prefixdir,&prefix))) goto done;
|
|
objects_request.SetBucket(bucket);
|
|
objects_request.SetPrefix(prefix);
|
|
if(delim != NULL) {
|
|
objects_request.SetDelimiter(delim); /* Force return of common prefixes */
|
|
}
|
|
if(istruncated && continuetoken != NULL) {
|
|
objects_request.SetContinuationToken(continuetoken);
|
|
free(continuetoken); continuetoken = NULL;
|
|
}
|
|
auto objects_outcome = AWSS3GET(s3client)->ListObjectsV2(objects_request);
|
|
if(objects_outcome.IsSuccess()) {
|
|
const Aws::S3::Model::ListObjectsV2Result& result = objects_outcome.GetResult();
|
|
istruncated = result.GetIsTruncated();
|
|
if(istruncated)
|
|
continuetoken = strdup(result.GetNextContinuationToken().c_str());
|
|
Aws::Vector<Aws::S3::Model::Object> object_list =
|
|
objects_outcome.GetResult().GetContents();
|
|
if((stat = s3objectsinfo(object_list,&realkeys))) goto done;
|
|
/* Add common prefixes */
|
|
Aws::Vector<Aws::S3::Model::CommonPrefix> common_list =
|
|
objects_outcome.GetResult().GetCommonPrefixes();
|
|
if((stat = s3commonprefixes(common_list,&commonkeys))) goto done;
|
|
/* merge the two lists; target list might not be empty */
|
|
if((stat=mergekeysets(&realkeys, &commonkeys, &allkeys))) goto done;
|
|
} else {
|
|
if(errmsgp) *errmsgp = makeerrmsg(objects_outcome.GetError());
|
|
stat = NC_ES3;
|
|
goto done;
|
|
}
|
|
} while(istruncated);
|
|
if(nkeysp) {*nkeysp = allkeys.getnkeys();}
|
|
if(keysp) {*keysp = allkeys.extractkeys();}
|
|
done:
|
|
realkeys.clear();
|
|
commonkeys.clear();
|
|
allkeys.clear();
|
|
if(prefixdir != NULL) free(prefixdir);
|
|
if(continuetoken != NULL) free(continuetoken);
|
|
if(stat)
|
|
return NCUNTRACE(stat);
|
|
else
|
|
return NCUNTRACEX(stat,"nkeys=%u",(unsigned)*nkeysp);
|
|
}
|
|
|
|
/*
|
|
Return a list of full keys of legal objects immediately below a specified key.
|
|
Not necessarily sorted.
|
|
*/
|
|
EXTERNL int
|
|
NC_s3sdkgetkeys(void* s3client0, const char* bucket, const char* prefixkey0, size_t* nkeysp, char*** keysp, char** errmsgp)
|
|
{
|
|
return getkeys(s3client0, bucket, prefixkey0, "/", nkeysp, keysp, errmsgp);
|
|
}
|
|
|
|
/*
|
|
Return a list of full keys of legal objects immediately below a specified key.
|
|
Not necessarily sorted.
|
|
*/
|
|
EXTERNL int
|
|
NC_s3sdksearch(void* s3client0, const char* bucket, const char* prefixkey0, size_t* nkeysp, char*** keysp, char** errmsgp)
|
|
{
|
|
return getkeys(s3client0, bucket, prefixkey0, NULL, nkeysp, keysp, errmsgp);
|
|
}
|
|
|
|
EXTERNL int
|
|
NC_s3sdkdeletekey(void* s3client0, const char* bucket, const char* pathkey, char** errmsgp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
const char* key = NULL;
|
|
|
|
NCTRACE(11,"bucket=%s pathkey=%s",bucket,pathkey);
|
|
|
|
AWSS3CLIENT s3client = (AWSS3CLIENT)s3client0;
|
|
Aws::S3::Model::DeleteObjectRequest delete_request;
|
|
|
|
assert(pathkey != NULL && *pathkey == '/');
|
|
if((stat = makes3key(pathkey,&key))) return NCUNTRACE(stat);
|
|
/* Delete this key object */
|
|
delete_request.SetBucket(bucket);
|
|
delete_request.SetKey(key);
|
|
auto delete_result = AWSS3GET(s3client)->DeleteObject(delete_request);
|
|
if(!delete_result.IsSuccess()) {
|
|
if(errmsgp) *errmsgp = makeerrmsg(delete_result.GetError(),key);
|
|
stat = NC_ES3;
|
|
}
|
|
return NCUNTRACE(stat);
|
|
}
|
|
|
|
/*
|
|
Get Info about a single object from a vector
|
|
*/
|
|
static int
|
|
s3objectinfo1(const Aws::S3::Model::Object& s3_object, char** fullkeyp)
|
|
{
|
|
int stat = NC_NOERR;
|
|
char* cstr = NULL;
|
|
|
|
if(fullkeyp) {
|
|
const char* key = NULL;
|
|
char* cstr = NULL;
|
|
auto s = s3_object.GetKey();
|
|
key = s.c_str();
|
|
if(key[0] == '/') {
|
|
cstr = strdup(key);
|
|
} else {
|
|
size_t len = strlen(key);
|
|
cstr = (char*)malloc(len+1+1);
|
|
if(cstr != NULL) {
|
|
cstr[0] = '/';
|
|
memcpy((void*)(cstr+1),(void*)key,len);
|
|
cstr[len+1] = '\0';
|
|
}
|
|
}
|
|
if(cstr == NULL) {
|
|
stat = NC_ENOMEM;
|
|
} else if(fullkeyp) {
|
|
*fullkeyp = cstr;
|
|
cstr = NULL;
|
|
}
|
|
}
|
|
if(cstr) free(cstr);
|
|
return stat;
|
|
}
|
|
|
|
/*
|
|
Get Info about a vector of objects; Keys are fixed up to start with a '/'
|
|
*/
|
|
static int
|
|
s3objectsinfo(Aws::Vector<Aws::S3::Model::Object> list, KeySet* keys)
|
|
{
|
|
int stat = NC_NOERR;
|
|
int i;
|
|
|
|
i = 0;
|
|
for (auto const &s3_object : list) {
|
|
char* akey = NULL;
|
|
stat = s3objectinfo1(s3_object,&akey);
|
|
i++;
|
|
if(stat) break;
|
|
keys->push(akey);
|
|
}
|
|
return stat;
|
|
}
|
|
|
|
static int
|
|
s3commonprefixes(Aws::Vector<Aws::S3::Model::CommonPrefix> list, KeySet* keys)
|
|
{
|
|
int stat = NC_NOERR;
|
|
|
|
for (auto const &s3prefix : list) {
|
|
char* p; const char* px; char* p1;
|
|
size_t len, alloc;
|
|
const Aws::String& prefix = s3prefix.GetPrefix();
|
|
len = prefix.length();
|
|
alloc = len + 1 + 1; /* for nul + leading '/' */
|
|
if((p = (char*) malloc(alloc))==NULL)
|
|
stat = NC_ENOMEM;
|
|
if(stat == NC_NOERR) {
|
|
px = prefix.c_str();
|
|
p1 = p;
|
|
if(*px != '/') *p1++ = '/';
|
|
memcpy(p1,px,len);
|
|
p1 += len;
|
|
*p1 = '\0';
|
|
keys->push(p);
|
|
}
|
|
}
|
|
return stat;
|
|
}
|
|
|
|
static Aws::S3::Model::BucketLocationConstraint
|
|
s3findregion(const char* name)
|
|
{
|
|
return Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(name);
|
|
}
|
|
|
|
struct MemBuf: std::streambuf {
|
|
MemBuf(char* base, size_t size) {
|
|
char* p(base);
|
|
this->setg(p, p, p + size);
|
|
}
|
|
};
|
|
|
|
#if 0
|
|
static void
|
|
freestringenvv(char** ss)
|
|
{
|
|
char** p;
|
|
if(ss != NULL) {
|
|
for(p=ss;*p;p++)
|
|
if(*p) free(*p);
|
|
free(ss);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
static int
|
|
makes3key(const char* pathkey, const char** keyp)
|
|
{
|
|
assert(pathkey != NULL && pathkey[0] == '/');
|
|
if(keyp) *keyp = pathkey+1;
|
|
return NC_NOERR;
|
|
}
|
|
|
|
static int
|
|
makes3keydir(const char* prefix, char** prefixdirp)
|
|
{
|
|
size_t plen = strlen(prefix);
|
|
char* prefixdir = (char*)malloc(plen+1+1); /* '/' + nul */
|
|
if(prefixdir == NULL) return NC_ENOMEM;
|
|
memcpy(prefixdir,prefix,plen);
|
|
if(prefixdir[plen-1] != '/') {
|
|
prefixdir[plen++] = '/';
|
|
}
|
|
prefixdir[plen] = '\0';
|
|
*prefixdirp = prefixdir; prefixdir = NULL;
|
|
return NC_NOERR;
|
|
}
|
|
|
|
static int
|
|
mergekeysets(KeySet* keys1, KeySet* keys2, KeySet* merge)
|
|
{
|
|
size_t i;
|
|
for(i=0;i<keys1->getnkeys();i++)
|
|
merge->push(keys1->extractithkey(i));
|
|
for(i=0;i<keys2->getnkeys();i++)
|
|
merge->push(keys2->extractithkey(i));
|
|
return NC_NOERR;
|
|
}
|