Fix major bug in the NCZarr cache management

re: PR https://github.com/Unidata/netcdf-c/pull/2734
re: Issue https://github.com/Unidata/netcdf-c/issues/2733

As a result of an investigation by https://github.com/uweschulzweida,
I discovered a significant bug in the NCZarr cache management.
This PR extends the above PR to fix that bug.

## Change Overview
* Insert extra checks for cache overflow.
* Added test cases contingent on the --enable-large-file-tests option.
* The Columbia server is down, so it has been temporarily disabled.
This commit is contained in:
Dennis Heimbigner 2023-08-16 23:07:05 -06:00
parent 032b910edf
commit 9094d25409
8 changed files with 296 additions and 17 deletions

View File

@ -21,12 +21,14 @@
#define LEAFLEN 32
#define USEPARAMSIZE 0xffffffffffffffff
/* Forward */
static int get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry);
static int put_chunk(NCZChunkCache* cache, NCZCacheEntry*);
static int makeroom(NCZChunkCache* cache);
static int verifycache(NCZChunkCache* cache);
static int flushcache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache);
static int constraincache(NCZChunkCache* cache, size64_t needed);
/**************************************************/
/* Dispatch table per-var cache functions */
@ -298,7 +300,7 @@ NCZ_read_cache_chunk(NCZChunkCache* cache, const size64_t* indices, void** datap
if((stat=get_chunk(cache,entry))) goto done;
assert(entry->data != NULL);
/* Ensure cache constraints not violated; but do it before entry is added */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;
nclistpush(cache->mru,entry);
if((stat = ncxcacheinsert(cache->xcache,entry->hashkey,entry))) goto done;
}
@ -348,7 +350,7 @@ fprintf(stderr,"|cache.write|=%ld\n",nclistlength(cache->mru));
entry = NULL;
/* Ensure cache constraints not violated */
if((stat=makeroom(cache))) goto done;
if((stat=verifycache(cache))) goto done;
done:
if(entry) free_cache_entry(cache,entry);
@ -356,16 +358,18 @@ done:
}
#endif
/* Constrain cache, but allow at least one entry */
/* Constrain cache */
static int
makeroom(NCZChunkCache* cache)
verifycache(NCZChunkCache* cache)
{
int stat = NC_NOERR;
#if 0
/* Sanity check; make sure at least one entry is always allowed */
if(nclistlength(cache->mru) == 1)
goto done;
stat = constraincache(cache);
#endif
if((stat = constraincache(cache,USEPARAMSIZE))) goto done;
done:
return stat;
}
@ -376,10 +380,14 @@ static int
flushcache(NCZChunkCache* cache)
{
int stat = NC_NOERR;
#if 0
size_t oldsize = cache->params.size;
cache->params.size = 0;
stat = constraincache(cache);
stat = constraincache(cache,USEPARAMSIZE);
cache->params.size = oldsize;
#else
stat = constraincache(cache,USEPARAMSIZE);
#endif
return stat;
}
@ -388,21 +396,32 @@ flushcache(NCZChunkCache* cache)
violating any of its constraints.
On entry, constraints might be violated.
Make sure that the entryinuse (NULL => no constraint) is not reclaimed.
@param cache
@param needed make sure there is room for this much space; USEPARAMSIZE => ensure no more than cache params is used.
*/
static int
constraincache(NCZChunkCache* cache)
constraincache(NCZChunkCache* cache, size64_t needed)
{
int stat = NC_NOERR;
size64_t final_size;
/* If the cache is empty then do nothing */
if(cache->used == 0) goto done;
if(needed == USEPARAMSIZE)
final_size = cache->params.size;
else if(cache->used > needed)
final_size = cache->used - needed;
else
final_size = 0;
/* Flush from LRU end if we are at capacity */
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > cache->params.size) {
while(nclistlength(cache->mru) > cache->params.nelems || cache->used > final_size) {
int i;
void* ptr;
NCZCacheEntry* e = ncxcachelast(cache->xcache); /* last entry is the least recently used */
if(e == NULL) break;
if((stat = ncxcacheremove(cache->xcache,e->hashkey,&ptr))) goto done;
assert(e == ptr);
for(i=0;i<nclistlength(cache->mru);i++) {
@ -427,6 +446,12 @@ done:
return stat;
}
/**
Push modified cache entries to disk.
Also make sure the cache size is correct.
@param cache
@return NC_EXXX error
*/
int
NCZ_flush_chunk_cache(NCZChunkCache* cache)
{
@ -441,15 +466,21 @@ NCZ_flush_chunk_cache(NCZChunkCache* cache)
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
if(entry->modified) {
/* Make cache used be consistent across filter application */
cache->used -= entry->size;
/* Write out this chunk in toto*/
if((stat=put_chunk(cache,entry)))
goto done;
cache->used += entry->size;
}
entry->modified = 0;
}
/* Re-compute space used */
cache->used = 0;
for(i=0;i<nclistlength(cache->mru);i++) {
NCZCacheEntry* entry = nclistget(cache->mru,i);
cache->used += entry->size;
}
/* Make sure cache size and nelems are correct */
if((stat=verifycache(cache))) goto done;
done:
return ZUNTRACE(stat);
@ -726,6 +757,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
default: goto done;
}
/* make room in the cache */
if((stat = constraincache(cache,size))) goto done;
if(!empty) {
/* Make sure we have a place to read it */
if((entry->data = (void*)calloc(1,entry->size)) == NULL)
@ -795,6 +829,9 @@ get_chunk(NCZChunkCache* cache, NCZCacheEntry* entry)
entry->isfixedstring = 0;
}
/* track new chunk */
cache->used += entry->size;
done:
nullfree(strchunk);
nullfree(path);
@ -892,5 +929,4 @@ NCZ_printxcache(NCZChunkCache* cache)
strlcat(xs,ncbytescontents(buf),sizeof(xs));
ncbytesfree(buf);
fprintf(stderr,"%s\n",xs);
// return xs;
}

View File

@ -48,13 +48,17 @@ IF(ENABLE_TESTS)
SET_TESTS_PROPERTIES(ncdap_tst_remote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(HAVE_BASH)
add_sh_test(ncdap tst_zero_len_var)
add_sh_test(ncdap tst_encode)
# not yet add_sh_test(ncdap tst_hyrax)
add_sh_test(ncdap tst_fillmismatch)
IF(ENABLE_DAP_LONG_TESTS)
add_sh_test(ncdap tst_longremote3)
SET_TESTS_PROPERTIES(ncdap_tst_longremote3 PROPERTIES RUN_SERIAL TRUE)
ENDIF(ENABLE_DAP_LONG_TESTS)
IF(FALSE)
# Apparently iridl.ldeo.columbia.edu is down for now
add_sh_test(ncdap tst_encode)
# not yet fixed
add_sh_test(ncdap tst_hyrax)
ENDIF()
ENDIF(BUILD_UTILITIES)
IF(ENABLE_EXTERNAL_SERVER_TESTS)
add_bin_test(ncdap test_manyurls)

View File

@ -44,7 +44,12 @@ findtestserver_SOURCES = findtestserver.c
pingurl_SOURCES = pingurl.c
if BUILD_UTILITIES
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh tst_encode.sh
TESTS += tst_ber.sh tst_remote3.sh tst_formatx.sh testurl.sh tst_fillmismatch.sh tst_zero_len_var.sh
endif
if AX_IGNORE
# Apparently iridl.ldeo.columbia.edu is down for now
TESTS += tst_encode.sh
endif
TESTS += test_partvar

View File

@ -120,6 +120,12 @@ IF(ENABLE_TESTS)
TARGET_INCLUDE_DIRECTORIES(tst_chunkcases PUBLIC ../libnczarr)
add_sh_test(nczarr_test run_chunkcases)
if(LARGE_FILE_TESTS)
BUILD_BIN_TEST(test_readcaching})
BUILD_BIN_TEST(test_writecaching})
add_sh_test(nczarr_test run_cachetest)
ENDIF()
add_sh_test(nczarr_test run_purezarr)
add_sh_test(nczarr_test run_interop)
add_sh_test(nczarr_test run_misc)

View File

@ -66,6 +66,11 @@ TESTS += run_nulls.sh
TESTS += run_notzarr.sh
TESTS += run_external.sh
if LARGE_FILE_TESTS
check_PROGRAMS += test_writecaching test_readcaching
TESTS += run_cachetest.sh
endif
endif #BUILD_UTILITIES
if BUILD_UTILITIES

40
nczarr_test/run_cachetest.sh Executable file
View File

@ -0,0 +1,40 @@
#!/bin/sh
if test "x$srcdir" = x ; then srcdir=`pwd`; fi
. ../test_common.sh
. "$top_srcdir/nczarr_test/test_nczarr.sh"
set -e
s3isolate "testdir_cachtest"
THISDIR=`pwd`
cd $ISOPATH
# This shell script tests support for the NC_STRING type
testcase() {
zext=$1
echo "*** Test: cache operation"
# Get pure zarr args
fileargs tmp_scalar_zarr "mode=zarr,$zext"
zarrurl="$fileurl"
zarrfile="$file"
# setup
deletemap $zext $zarrfile
echo "*** write cache"
${execdir}/test_writecaching
echo "*** read cache"
${execdir}/test_readcaching
}
testcase file
if test "x$FEATURE_NCZARR_ZIP" = xyes ; then testcase zip; fi
if test "x$FEATURE_S3TESTS" = xyes ; then testcase s3; fi
if test "x$FEATURE_S3TESTS" = xyes ; then s3sdkdelete "/${S3ISOPATH}" ; fi # Cleanup

View File

@ -0,0 +1,84 @@
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/resource.h>
#if defined(__APPLE__) && defined(__MACH__)
#include <mach/mach.h>
#endif
#include "netcdf.h"
#define filename "file://tmp_cachetest.zarr#mode=zarr,file"
#define chunkSize (size_t)(1<<17) /* 128k */
#define numCells (size_t)(50 * chunkSize)
#define numSteps (size_t)360
static float var[numCells];
size_t getPeakRSS(void)
{
struct rusage rusage;
getrusage( RUSAGE_SELF, &rusage );
#if defined(__APPLE__) && defined(__MACH__)
return (size_t)rusage.ru_maxrss;
#else
return (size_t)(rusage.ru_maxrss * 1024L);
#endif
}
static void
nce(int istat)
{
if (istat != NC_NOERR)
{
fprintf(stderr, "%s\n", nc_strerror(istat));
exit(-1);
}
}
int
main(void)
{
printf("read: chunkSize=%zu, numCells=%zu, numSteps=%zu, filename=%s\n", chunkSize, numCells, numSteps, filename);
int ncId;
nce(nc_open(filename, NC_NOWRITE, &ncId));
int varId;
nce(nc_inq_varid(ncId, "var", &varId));
size_t size, nelems;
float preemption;
nce(nc_get_var_chunk_cache(ncId, varId, &size, &nelems, &preemption));
printf("default chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
size = 4 * numCells; // one float field at one time step
nelems = 1000;
preemption = 0.5;
nce(nc_set_var_chunk_cache(ncId, varId, size, nelems, preemption));
printf("set chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
{
for (size_t i = 0; i < numCells; ++i) var[i] = 0.0f;
for (size_t i = 0; i < numSteps; ++i)
{
size_t start[2], count[2];
start[0] = i; start[1] = 0;
count[0] = 1; count[1] = numCells;
nce(nc_get_vara_float(ncId, varId, start, count, var));
}
}
nce(nc_close(ncId));
{
size_t mbused = getPeakRSS() / (1024 * 1024);
printf("Max mem: %zu MB\n", mbused);
if(mbused > 100) {
fprintf(stderr,"*** Failed: used: %luMB expected: < 100MB\n",mbused);
return (1);
}
}
return 0;
}

View File

@ -0,0 +1,99 @@
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/resource.h>
#if defined(__APPLE__) && defined(__MACH__)
#include <mach/mach.h>
#endif
#include "netcdf.h"
#define filename "file://tmp_cachetest.zarr#mode=zarr,file"
#define chunkSize ((size_t)(1<<17)) /* 128k */
#define numCells ((size_t)(50 * chunkSize))
#define numSteps ((size_t)360)
static float var[numCells];
size_t getPeakRSS(void)
{
struct rusage rusage;
getrusage( RUSAGE_SELF, &rusage );
#if defined(__APPLE__) && defined(__MACH__)
return (size_t)rusage.ru_maxrss;
#else
return (size_t)(rusage.ru_maxrss * 1024L);
#endif
}
static void
nce(int istat)
{
if (istat != NC_NOERR)
{
fprintf(stderr, "%s\n", nc_strerror(istat));
exit(-1);
}
}
int
main(void)
{
printf("write: chunkSize=%zu, numCells=%zu, numSteps=%zu, filename=%s\n", chunkSize, numCells, numSteps, filename);
int ncId;
nce(nc_create(filename, NC_CLOBBER | NC_NETCDF4, &ncId));
int oldfill;
nce(nc_set_fill(ncId, NC_NOFILL, &oldfill));
int dims[2];
nce(nc_def_dim(ncId, "time", numSteps, &dims[0]));
nce(nc_def_dim(ncId, "cells", numCells, &dims[1]));
int varId;
nce(nc_def_var(ncId, "var", NC_FLOAT, 2, dims, &varId));
size_t chunks[2] = {1, chunkSize};
nc_def_var_chunking(ncId, varId, NC_CHUNKED, chunks);
int shuffle = 0, deflate = 1, level = 3;
nce(nc_def_var_deflate(ncId, varId, shuffle, deflate, level));
size_t size, nelems;
float preemption;
nce(nc_get_var_chunk_cache(ncId, varId, &size, &nelems, &preemption));
printf("default chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
size = 4 * numCells; // one float field at one time step
nelems = 1000;
preemption = 0.5;
nce(nc_set_var_chunk_cache(ncId, varId, size, nelems, preemption));
printf("set chunk cache: size=%zu, nelems=%zu, preemption=%g\n", size, nelems, preemption);
nce(nc_enddef(ncId));
{
for (size_t i = 0; i < numCells; ++i) var[i] = 0.0f;
for (size_t i = 0; i < numSteps; ++i)
{
size_t start[2], count[2];
start[0] = i; start[1] = 0;
count[0] = 1; count[1] = numCells;
nce(nc_put_vara_float(ncId, varId, start, count, var));
}
}
nce(nc_close(ncId));
{
size_t mbused = getPeakRSS() / (1024 * 1024);
printf("Max mem: %zu MB\n", mbused);
if(mbused > 100) {
printf("*** Failed: used: %luMB expected: < 100MB\n",mbused);
return (1);
} else {
printf("*** Passed: used: %luMB (< 100MB)\n",mbused);
return 0;
}
}
}