From f17e7324c1072ba08b38246c76edb5c4afd21e48 Mon Sep 17 00:00:00 2001 From: Dennis Heimbigner Date: Fri, 19 Apr 2013 19:55:43 +0000 Subject: [PATCH] Rebuilt the compose operation in dceconstraints.c. The revisions are simpler, and I hope, clearer than the all previous versions. Part of the code taken from the ucar.nc2.Range.compose function. Modified to handle edge cases not handled in the Range.compose function. --- libdap2/constraints3.c | 12 +-- libdap2/dapdump.c | 41 ++++--- libdap2/dceconstraints.c | 223 +++++++++++++++++---------------------- libdap2/dceconstraints.h | 4 +- libdap2/dceparse.c | 8 +- libdap2/env | 4 +- nc_test/tst_diskless4.c | 4 +- 7 files changed, 131 insertions(+), 165 deletions(-) diff --git a/libdap2/constraints3.c b/libdap2/constraints3.c index 017a63dd8..8c349ac20 100644 --- a/libdap2/constraints3.c +++ b/libdap2/constraints3.c @@ -454,11 +454,11 @@ fprintf(stderr,"buildvaraprojection: skeleton: %s\n",dumpprojection(projection)) count = countp[dimindex+j]; slice->count = count; slice->length = count * slice->stride; - slice->stop = (slice->first + slice->length); - if(slice->stop > slice->declsize) { - slice->stop = slice->declsize; + slice->last = (slice->first + slice->length) - 1; + if(slice->last >= slice->declsize) { + slice->last = slice->declsize - 1; /* reverse compute the new length */ - slice->length = (slice->stop - slice->first); + slice->length = (slice->last - slice->first) + 1; } } dimindex += segment->rank; @@ -484,7 +484,7 @@ iswholeslice(DCEslice* slice, CDFnode* dim) { if(slice->first != 0 || slice->stride != 1) return 0; if(dim != NULL) { - if(slice->stop != dim->dim.declsize) return 0; + if(slice->length != dim->dim.declsize) return 0; } else if(dim == NULL) { size_t count = slice->count; if(slice->declsize == 0 @@ -843,7 +843,7 @@ dapshiftslice(DCEslice* slice) slice->first = 0; slice->stride = 1; slice->length = slice->count; - slice->stop = slice->length; + slice->last = slice->length - 1; } int diff --git a/libdap2/dapdump.c b/libdap2/dapdump.c index c2065ef5b..9b07aeeaa 100644 --- a/libdap2/dapdump.c +++ b/libdap2/dapdump.c @@ -566,27 +566,26 @@ dumpcache(NCcache* cache) char* dumpslice(DCEslice* slice) { - char buf[8192]; - char tmp[8192]; - size_t last = (slice->first+slice->length)-1; - buf[0] = '\0'; - if(last > slice->declsize && slice->declsize > 0) - last = slice->declsize - 1; - if(slice->count == 1) { - snprintf(tmp,sizeof(tmp),"[%lu]", - (unsigned long)slice->first); - } else if(slice->stride == 1) { - snprintf(tmp,sizeof(tmp),"[%lu:%lu]", - (unsigned long)slice->first, - (unsigned long)last); - } else { - snprintf(tmp,sizeof(tmp),"[%lu:%lu:%lu]", - (unsigned long)slice->first, - (unsigned long)slice->stride, - (unsigned long)last); - } - strcat(buf,tmp); - return strdup(tmp); + char buf[8192]; + char tmp[8192]; + buf[0] = '\0'; + if(slice->last > slice->declsize && slice->declsize > 0) + slice->last = slice->declsize - 1; + if(slice->count == 1) { + snprintf(tmp,sizeof(tmp),"[%lu]", + (unsigned long)slice->first); + } else if(slice->stride == 1) { + snprintf(tmp,sizeof(tmp),"[%lu:%lu]", + (unsigned long)slice->first, + (unsigned long)slice->last); + } else { + snprintf(tmp,sizeof(tmp),"[%lu:%lu:%lu]", + (unsigned long)slice->first, + (unsigned long)slice->stride, + (unsigned long)slice->last); + } + strcat(buf,tmp); + return strdup(tmp); } char* diff --git a/libdap2/dceconstraints.c b/libdap2/dceconstraints.c index 505e71d14..7d33945e8 100644 --- a/libdap2/dceconstraints.c +++ b/libdap2/dceconstraints.c @@ -82,184 +82,151 @@ dumpslice(const char* prefix, DCEslice* s) /* +Compose slice s1 with slice s2 -> sr +Compose means that the s2 constraint is applied +to the output of the s1 constraint. + Logical derivation of s1 compose s2 = sr +We have three index sequences to deal with, +where the sequence is the range [0..last]. -The composed value of first and stride are easy: -sr.first = s1.first + (s2.first * s1.stride) -sr.stride = s1.stride * s2.stride +1. the original data indices [0..N] +2. the indices resulting from applying slice s1; + this will be a subset of #1 with the sequence + [s1.first .. s1.last] where s1.last = s1.first + s1.length - 1 +3. the indices resulting from applying s2:[0..s2.last] wrt + to output of s2 + [s2.first .. s2.last] + We can convert #3 into index sequence wrt + #1 as follows + [s2.first .. s2.last] -> [map(s2.first)..map(s2.last)] + where map(int index) = s1.first + (s1.stride * index) + Note that map(i) is undefined if map(i) > s1.last -Finding sr.length, however, is significantly more -complex. In particular, the length specified by s2 -may be more or less than the number of values -provided by s1. -One way to compute sr.length is as follows -. Compute the count of the number of values provided by s1 - count1 = (s1.length+(s1.stride-1))/s1.stride - Note that count1 is effectively the number of elements - available to s2. -. Compute the stop point of s1 using count1 and wrt original data - (stop point is first + length) - stop1 = (s1.first + (count1 * s1.stride)) -. Compute the number of elements resulting from applying s2 - count2 = (s2.length+(s2.stride-1))/s2.stride -. Compute the stop point of s2 using count2 and wrt s1 output - stop2 = (s2.first + (count2 * s2.stride)) -. Now we can convert stop2 back into a stop point wrt the original data - stopx = (s1.first + (stop2*s1.stride)) -. Make sure we do not overrun the output of s2 by using the min() - stopr = min(stopx,stop1) -. Compute the result length wrt the original data - sr.length = (stopr - sr.first) +So, we can compute the result (sr) stride and first as follows + +. sr.stride = s1.stride * s2.stride + +. sr.first = map(s2.first) = s1.first * (s1.stride * s2.first) + This throws an exception if sr.first > s1.last + +. compute the s1.last wrt original data + last1 = s1.first + (s1.length - 1) + +. compute the s2.last wrt s2 + last2 = s2.first + (s2.length - 1) + +. compute candidate last wrt original index sequence + lastx = map(last2) = s1.first + (s1.stride * last2) + +. It is possible that lastx is outside of the range (s1.first..s1.last) + so we take the min of last2 and lastx to ensure no overrun wrt s1 + sr.last = min(last1,lastx) + +. If we want to be pedantic, we need to reduce + sr.last so that it is on an exact multiple of sr.stride + starting at sr.first + delta = ((sr.last + (sr.stride-1)) / sr.stride) * sr.stride + sr.last = sr.first + delta + +. compute result length using sr.last + sr.len = (sr.last + 1) - sr.first Example 1: 0000000000111111111 0123456789012345678 + xxxxxxxxx 0 1 2 3 4 s1=(f=1 st=2 len=9) 0 1 2 3 4 s2=(f=0 st=1 len=5) - xxxxxxxxxx - 0 1 3 4 5 sr=(f=1 st=2 len=9) -sr.f = 1+(0*2) = 1 -sr.st = 2*1 = 2 -count1 = (9+(2-1))/2 = 5 -stop1 = (1+(5*2)) = 11 -count2 = (5+(1-1))/1 = 5 -stop2 = 0+(5*1) = 5 -stopx = (1+(5*2)) = 11 -stopr = min(11,11) = 11 -sr.l = (11 - 2) = 9 + xxxxxxxxxy + 0 1 3 4 5 sr=(f=1 st=2 len=9..10) Example 2: 0000000000111111111122222222223 0123456789012345678901234567890 - 0 1 2 3 4 5 6 7 8 _ s1=(f=1 st=3 len=25) - 0 1 2 _ s2=(f=3 st=2 len=5) - xxxxxxxxxxxxxxxxxx - 0 1 2 sr=(f=10 st=6 l=18) - -sr.f = 1+(3*3) = 10 -sr.st = 3*2 =6 -count1 = (25+(3-1))/3 = 9 -stop1 = (1+(9*3)) = 28 -count2 = (5+(2-1))/2 = 3 -stop2 = (3+(3*2)) = 9 -stopx = (1+(9*3)) = 28 -stopr = min(28,28) = 28 -sr.l = 28 - 10 = 18 - + xxxxxxxxxxxxxxxxxxxxxxxxx + 0 1 2 3 4 5 6 7 8 _ _ _ s1=(f=1 st=3 len=25) + 0 1 2 3 4 s2=(f=3 st=2 len=5) + xxxxxxxxxxxxxyyyyy + 0 1 2 sr=(f=10 st=6 l=13..17) Example 3: 0000000000111111 0123456789012345 - 0 1 2 3 4 _ _ s1=(f=1 st=2 l=9) - 0 1 2 3 _ s2=(f=2 st=1 l=4) - xxxxxxp ---------------------------- - sr=(f=5 st=2 l=6) -sr.f = 1+(2*2) = 5 -sr.st = 2*1 = 2 -count1 = (9+(2-1))/2 = 5 -stop1 = 1+(5*2) = 11 -count2 = (4+(1-1))/1 = 4 -stop2 = 2+(4*1) = 6 -stopx = 1*(6*2) = 13 -stopr = min(13,11) = 11 -sr.l = 11 - 5 = 6 - + xxxxxxxxx + 0 1 2 3 4 _ _ s1=(f=1 st=2 len=9) + 0 1 2 3 4 s2=(f=2 st=1 len=4) + xxxxxy ---------------------------- + sr=(f=5 st=2 len=5..6) Example 4: 0000000000111111111 0123456789012345678 - 0 1 2 3 4 _ _ s1=(f=1 st=2 l=9) - 0 1 p s2=(f=2 st=2 l=4) - xxxxxx ---------------------------- - sr=(f=5 st=4 l=6) -sr.f = 1+(2*2) = 5 -sr.st = 2*2 = 4 -count1 = (9+(2-1))/2 = 5 -stop1 = 1+(5*2) = 11 -count2 = (4+(2-1))/2 = 2 -stop2 = 2+(2*2) = 6 -stopx = 1*(6*2) = 13 -stopr = min(13,11) = 11 -sr.l = 11 - 5 = 6 + xxxxxxxxx + 0 1 2 3 4 _ _ _ _ s1=(f=1 st=2 len=9) + 0 1 2 3 s2=(f=2 st=2 len=4) + xxxxxxyy + sr=(f=5 st=4 len=6..8) Example 5: 00000000001 01234567890 +xxx 012 s1=(f=0 st=1 l=3) 012 s2=(f=0 st=1 l=3) - ---------------------------- - sr=(f=0 st=1 l=3) -sr.f = 0+(0*1) = 0 -sr.st = 1*1 = 1 -count1 = (3+(1-1))/1 = 3 -stop1 = 0+(3*1) = 3 -count2 = (3+(1-1))/1 = 3 -stop2 = 0+(3*1) = 3 -stopx = 0+(3*1) = 3 -stopr = min(3,3) = 3 -sr.l = 3 - 0 = 3 +xxx ---------------------------- +012 sr=(f=0 st=1 l=3) +Example 6: 00000 01234 +xx 01 s1=(f=0 st=1 l=2) 0 s2=(f=0 st=1 l=1) - ---------------------------- +x ---------------------------- sr=(f=0 st=1 l=1) -sr.f = 0+(0*1) = 0 -sr.st = 1*1 = 1 -count1 = (2+(1-1))/1 = 2 -stop1 = 0+(2*1) = 2 -count2 = (1+(1-1))/1 = 1 -stop2 = 0+(1*1) = 1 -stopx = 0+(1*1) = 1 -stopr = min(1,2) = 1 -sr.l = 1 - 0 = 1 */ -/* compose slice src into slice dst; dst != src. -Compose means that the src constraint is applied -to the output of the dst constraint. - */ +#define MAP(s1,i) ((s1)->first + ((s1)->stride*(i))) +#define XMIN(x,y) ((x) < (y) ? (x) : (y)) +#define XMAX(x,y) ((x) > (y) ? (x) : (y)) int -dceslicecompose(DCEslice* s1, DCEslice* s2) +dceslicecompose(DCEslice* s1, DCEslice* s2, DCEslice* result) { int err = NC_NOERR; - DCEslice tmp; - size_t count1, stop1, count2, stop2, stopx, stopr; + size_t lastx = 0; + DCEslice sr; /* For back compatability so s1 and result can be same object */ #ifdef DEBUG1 dumpslice("compose: s1",s1); dumpslice("compose: s2",s2); #endif - tmp.node.sort = CES_SLICE; - tmp.first = s1->first+(s2->first * s1->stride); - tmp.stride = s1->stride * s2->stride; - count1 = (s1->length + (s1->stride - 1))/s1->stride; - stop1 = s1->first + (count1 * s1->stride); - count2 = (s2->length + (s2->stride - 1))/s2->stride; - stop2 = s2->first + (count2 * s2->stride); - stopx = s1->first + (stop2 * s1->stride); - stopr = (stopx < stop1 ? stopx : stop1); /* min(stopx,stop1) */ - tmp.length = (stopr - tmp.first); - tmp.declsize = (s1->declsize < s2->declsize ? s2->declsize : s1->declsize); /* use max declsize */ + sr.node.sort = CES_SLICE; + sr.stride = s1->stride * s2->stride; + sr.first = MAP(s1,s2->first); + if(sr.first > s1->last) + return NC_EINVALCOORDS; + lastx = MAP(s1,s2->last); + sr.last = XMIN(s1->last,lastx); + sr.length = (sr.last + 1) - sr.first; + sr.declsize = XMAX(s1->declsize,s2->declsize); /* use max declsize */ /* fill in other fields */ - tmp.stop = tmp.first + tmp.length; - tmp.count = (tmp.length + (tmp.stride - 1))/tmp.stride; - *s1 = tmp; + sr.count = (sr.length + (sr.stride - 1))/sr.stride; #ifdef DEBUG1 -dumpslice("compose: result",&tmp); +dumpslice("compose: result",sr); #endif + *result = sr; return err; } - /* Given two projection lists, merge src into dst taking overlapping projections into acct. Dst will be modified. */ + int dcemergeprojectionlists(NClist* dst, NClist* src) { @@ -327,7 +294,7 @@ dcemergeprojections(DCEprojection* merged, DCEprojection* addition) /* If one segment has larger rank, then copy the extra slices unchanged */ for(j=0;jrank;j++) { if(j < mergedseg->rank) - dceslicecompose(mergedseg->slices+j,addedseg->slices+j); + dceslicecompose(mergedseg->slices+j,addedseg->slices+j,mergedseg->slices+j); else mergedseg->slices[j] = addedseg->slices[j]; } @@ -945,7 +912,7 @@ dceiswholeslice(DCEslice* slice) { if(slice->first != 0 || slice->stride != 1 - || slice->stop != slice->declsize) return 0; + || slice->length != slice->declsize) return 0; return 1; } @@ -968,9 +935,9 @@ dcemakewholeslice(DCEslice* slice, size_t declsize) slice->first = 0; slice->stride = 1; slice->length = declsize; - slice->stop = declsize; slice->declsize = declsize; slice->count = declsize; + slice->last = slice->length - 1; } /* Remove slicing from terminal segment of p */ @@ -1091,12 +1058,12 @@ dcedumpraw(DCEnode* node, NCbytes* buf) case CES_SLICE: { DCEslice* slice = (DCEslice*)node; snprintf(tmp,sizeof(tmp), - " [first=%lu count=%lu stride=%lu len=%lu stop=%lu size=%lu]", + " [first=%lu stride=%lu last=%lu len=%lu count=%lu size=%lu]", (unsigned long)slice->first, - (unsigned long)slice->count, (unsigned long)slice->stride, + (unsigned long)slice->last, (unsigned long)slice->length, - (unsigned long)slice->stop, + (unsigned long)slice->count, (unsigned long)slice->declsize); ncbytescat(buf,tmp); } break; diff --git a/libdap2/dceconstraints.h b/libdap2/dceconstraints.h index c081cd3da..d985eba27 100644 --- a/libdap2/dceconstraints.h +++ b/libdap2/dceconstraints.h @@ -24,7 +24,7 @@ typedef struct DCEslice { size_t first; size_t stride; size_t length; - size_t stop; /* first + length */ + size_t last; /* first + length - 1*/ size_t count; /* (length + (stride-1))/ stride == actual # of elements returned to client*/ size_t declsize; /* from defining dimension, if any.*/ } DCEslice; @@ -91,7 +91,7 @@ typedef struct DCEconstraint { extern int dceparseconstraints(char* constraints, DCEconstraint* DCEonstraint); -extern int dceslicecompose(DCEslice* dst, DCEslice* src); +extern int dceslicecompose(DCEslice* s1, DCEslice* s2, DCEslice* sr); extern int dcemergeprojectionlists(NClist* dst, NClist* src); extern DCEnode* dceclone(DCEnode* node); diff --git a/libdap2/dceparse.c b/libdap2/dceparse.c index 956f21911..355d95af6 100644 --- a/libdap2/dceparse.c +++ b/libdap2/dceparse.c @@ -144,8 +144,8 @@ range(DCEparsestate* state, Object sfirst, Object sstride, Object slast) dceerror(state,"Illegal index for range last index"); slice->first = first; slice->stride = (stride == 0 ? 1 : stride); - slice->stop = last + 1; - slice->length = slice->stop - slice->first; + slice->last = last; + slice->length = (slice->last - slice->first) + 1; slice->count = slice->length / slice->stride; #ifdef DEBUG fprintf(stderr," ce.slice: %s\n", @@ -207,9 +207,9 @@ array_indices(DCEparsestate* state, Object list0, Object indexno) slice = (DCEslice*)dcecreate(CES_SLICE); slice->first = start; slice->stride = 1; - slice->count = 1; slice->length = 1; - slice->stop = start+1; + slice->last = start; + slice->count = 1; nclistpush(list,(void*)slice); return list; } diff --git a/libdap2/env b/libdap2/env index c90ee1d04..65cecce40 100644 --- a/libdap2/env +++ b/libdap2/env @@ -10,8 +10,8 @@ PROG="$TOP/ncdump/ncdump" P=`pwd` -F="http://test.opendap.org:8080/dods/dts/test.03" -CON="i32[0:1][1:2][0:2]" +F="http://opendap.co-ops.nos.noaa.gov/dods/IOOS/SixMin_Verified_Water_Level" +CON="&WATERLEVEL_6MIN_VFD_PX._STATION_ID=%228779770%22&WATERLEVEL_6MIN_VFD_PX._DATUM=%22MSL%22&WATERLEVEL_6MIN_VFD_PX._BEGIN_DATE=%2220061001%22&WATERLEVEL_6MIN_VFD_PX._END_DATE=%2220061030%22" PARMS="[log]" #PARMS="${PARMS}[netcdf3]" diff --git a/nc_test/tst_diskless4.c b/nc_test/tst_diskless4.c index 36997795b..edde6fd8c 100644 --- a/nc_test/tst_diskless4.c +++ b/nc_test/tst_diskless4.c @@ -59,9 +59,9 @@ main(int argc, char **argv) filesize = atol(argv[1]); } else { if(sizeof(size_t) == 4) - filesize = 1000000000; + filesize = 1000000000L; else if(sizeof(size_t) == 8) - filesize = 3000000000; + filesize = 3000000000L; else { fprintf(stderr,"Cannot compute filesize\n"); exit(1);