Rebuilt the compose operation in dceconstraints.c. The

revisions are simpler, and I hope, clearer than the all
previous versions.  Part of the code taken from the
ucar.nc2.Range.compose function.  Modified to handle edge
cases not handled in the Range.compose function.
This commit is contained in:
Dennis Heimbigner 2013-04-19 19:55:43 +00:00
parent b5697ae20f
commit f17e7324c1
7 changed files with 131 additions and 165 deletions

View File

@ -454,11 +454,11 @@ fprintf(stderr,"buildvaraprojection: skeleton: %s\n",dumpprojection(projection))
count = countp[dimindex+j];
slice->count = count;
slice->length = count * slice->stride;
slice->stop = (slice->first + slice->length);
if(slice->stop > slice->declsize) {
slice->stop = slice->declsize;
slice->last = (slice->first + slice->length) - 1;
if(slice->last >= slice->declsize) {
slice->last = slice->declsize - 1;
/* reverse compute the new length */
slice->length = (slice->stop - slice->first);
slice->length = (slice->last - slice->first) + 1;
}
}
dimindex += segment->rank;
@ -484,7 +484,7 @@ iswholeslice(DCEslice* slice, CDFnode* dim)
{
if(slice->first != 0 || slice->stride != 1) return 0;
if(dim != NULL) {
if(slice->stop != dim->dim.declsize) return 0;
if(slice->length != dim->dim.declsize) return 0;
} else if(dim == NULL) {
size_t count = slice->count;
if(slice->declsize == 0
@ -843,7 +843,7 @@ dapshiftslice(DCEslice* slice)
slice->first = 0;
slice->stride = 1;
slice->length = slice->count;
slice->stop = slice->length;
slice->last = slice->length - 1;
}
int

View File

@ -566,27 +566,26 @@ dumpcache(NCcache* cache)
char*
dumpslice(DCEslice* slice)
{
char buf[8192];
char tmp[8192];
size_t last = (slice->first+slice->length)-1;
buf[0] = '\0';
if(last > slice->declsize && slice->declsize > 0)
last = slice->declsize - 1;
if(slice->count == 1) {
snprintf(tmp,sizeof(tmp),"[%lu]",
(unsigned long)slice->first);
} else if(slice->stride == 1) {
snprintf(tmp,sizeof(tmp),"[%lu:%lu]",
(unsigned long)slice->first,
(unsigned long)last);
} else {
snprintf(tmp,sizeof(tmp),"[%lu:%lu:%lu]",
(unsigned long)slice->first,
(unsigned long)slice->stride,
(unsigned long)last);
}
strcat(buf,tmp);
return strdup(tmp);
char buf[8192];
char tmp[8192];
buf[0] = '\0';
if(slice->last > slice->declsize && slice->declsize > 0)
slice->last = slice->declsize - 1;
if(slice->count == 1) {
snprintf(tmp,sizeof(tmp),"[%lu]",
(unsigned long)slice->first);
} else if(slice->stride == 1) {
snprintf(tmp,sizeof(tmp),"[%lu:%lu]",
(unsigned long)slice->first,
(unsigned long)slice->last);
} else {
snprintf(tmp,sizeof(tmp),"[%lu:%lu:%lu]",
(unsigned long)slice->first,
(unsigned long)slice->stride,
(unsigned long)slice->last);
}
strcat(buf,tmp);
return strdup(tmp);
}
char*

View File

@ -82,184 +82,151 @@ dumpslice(const char* prefix, DCEslice* s)
/*
Compose slice s1 with slice s2 -> sr
Compose means that the s2 constraint is applied
to the output of the s1 constraint.
Logical derivation of s1 compose s2 = sr
We have three index sequences to deal with,
where the sequence is the range [0..last].
The composed value of first and stride are easy:
sr.first = s1.first + (s2.first * s1.stride)
sr.stride = s1.stride * s2.stride
1. the original data indices [0..N]
2. the indices resulting from applying slice s1;
this will be a subset of #1 with the sequence
[s1.first .. s1.last] where s1.last = s1.first + s1.length - 1
3. the indices resulting from applying s2:[0..s2.last] wrt
to output of s2
[s2.first .. s2.last]
We can convert #3 into index sequence wrt
#1 as follows
[s2.first .. s2.last] -> [map(s2.first)..map(s2.last)]
where map(int index) = s1.first + (s1.stride * index)
Note that map(i) is undefined if map(i) > s1.last
Finding sr.length, however, is significantly more
complex. In particular, the length specified by s2
may be more or less than the number of values
provided by s1.
One way to compute sr.length is as follows
. Compute the count of the number of values provided by s1
count1 = (s1.length+(s1.stride-1))/s1.stride
Note that count1 is effectively the number of elements
available to s2.
. Compute the stop point of s1 using count1 and wrt original data
(stop point is first + length)
stop1 = (s1.first + (count1 * s1.stride))
. Compute the number of elements resulting from applying s2
count2 = (s2.length+(s2.stride-1))/s2.stride
. Compute the stop point of s2 using count2 and wrt s1 output
stop2 = (s2.first + (count2 * s2.stride))
. Now we can convert stop2 back into a stop point wrt the original data
stopx = (s1.first + (stop2*s1.stride))
. Make sure we do not overrun the output of s2 by using the min()
stopr = min(stopx,stop1)
. Compute the result length wrt the original data
sr.length = (stopr - sr.first)
So, we can compute the result (sr) stride and first as follows
. sr.stride = s1.stride * s2.stride
. sr.first = map(s2.first) = s1.first * (s1.stride * s2.first)
This throws an exception if sr.first > s1.last
. compute the s1.last wrt original data
last1 = s1.first + (s1.length - 1)
. compute the s2.last wrt s2
last2 = s2.first + (s2.length - 1)
. compute candidate last wrt original index sequence
lastx = map(last2) = s1.first + (s1.stride * last2)
. It is possible that lastx is outside of the range (s1.first..s1.last)
so we take the min of last2 and lastx to ensure no overrun wrt s1
sr.last = min(last1,lastx)
. If we want to be pedantic, we need to reduce
sr.last so that it is on an exact multiple of sr.stride
starting at sr.first
delta = ((sr.last + (sr.stride-1)) / sr.stride) * sr.stride
sr.last = sr.first + delta
. compute result length using sr.last
sr.len = (sr.last + 1) - sr.first
Example 1:
0000000000111111111
0123456789012345678
xxxxxxxxx
0 1 2 3 4 s1=(f=1 st=2 len=9)
0 1 2 3 4 s2=(f=0 st=1 len=5)
xxxxxxxxxx
0 1 3 4 5 sr=(f=1 st=2 len=9)
sr.f = 1+(0*2) = 1
sr.st = 2*1 = 2
count1 = (9+(2-1))/2 = 5
stop1 = (1+(5*2)) = 11
count2 = (5+(1-1))/1 = 5
stop2 = 0+(5*1) = 5
stopx = (1+(5*2)) = 11
stopr = min(11,11) = 11
sr.l = (11 - 2) = 9
xxxxxxxxxy
0 1 3 4 5 sr=(f=1 st=2 len=9..10)
Example 2:
0000000000111111111122222222223
0123456789012345678901234567890
0 1 2 3 4 5 6 7 8 _ s1=(f=1 st=3 len=25)
0 1 2 _ s2=(f=3 st=2 len=5)
xxxxxxxxxxxxxxxxxx
0 1 2 sr=(f=10 st=6 l=18)
sr.f = 1+(3*3) = 10
sr.st = 3*2 =6
count1 = (25+(3-1))/3 = 9
stop1 = (1+(9*3)) = 28
count2 = (5+(2-1))/2 = 3
stop2 = (3+(3*2)) = 9
stopx = (1+(9*3)) = 28
stopr = min(28,28) = 28
sr.l = 28 - 10 = 18
xxxxxxxxxxxxxxxxxxxxxxxxx
0 1 2 3 4 5 6 7 8 _ _ _ s1=(f=1 st=3 len=25)
0 1 2 3 4 s2=(f=3 st=2 len=5)
xxxxxxxxxxxxxyyyyy
0 1 2 sr=(f=10 st=6 l=13..17)
Example 3:
0000000000111111
0123456789012345
0 1 2 3 4 _ _ s1=(f=1 st=2 l=9)
0 1 2 3 _ s2=(f=2 st=1 l=4)
xxxxxxp ----------------------------
sr=(f=5 st=2 l=6)
sr.f = 1+(2*2) = 5
sr.st = 2*1 = 2
count1 = (9+(2-1))/2 = 5
stop1 = 1+(5*2) = 11
count2 = (4+(1-1))/1 = 4
stop2 = 2+(4*1) = 6
stopx = 1*(6*2) = 13
stopr = min(13,11) = 11
sr.l = 11 - 5 = 6
xxxxxxxxx
0 1 2 3 4 _ _ s1=(f=1 st=2 len=9)
0 1 2 3 4 s2=(f=2 st=1 len=4)
xxxxxy ----------------------------
sr=(f=5 st=2 len=5..6)
Example 4:
0000000000111111111
0123456789012345678
0 1 2 3 4 _ _ s1=(f=1 st=2 l=9)
0 1 p s2=(f=2 st=2 l=4)
xxxxxx ----------------------------
sr=(f=5 st=4 l=6)
sr.f = 1+(2*2) = 5
sr.st = 2*2 = 4
count1 = (9+(2-1))/2 = 5
stop1 = 1+(5*2) = 11
count2 = (4+(2-1))/2 = 2
stop2 = 2+(2*2) = 6
stopx = 1*(6*2) = 13
stopr = min(13,11) = 11
sr.l = 11 - 5 = 6
xxxxxxxxx
0 1 2 3 4 _ _ _ _ s1=(f=1 st=2 len=9)
0 1 2 3 s2=(f=2 st=2 len=4)
xxxxxxyy
sr=(f=5 st=4 len=6..8)
Example 5:
00000000001
01234567890
xxx
012 s1=(f=0 st=1 l=3)
012 s2=(f=0 st=1 l=3)
----------------------------
sr=(f=0 st=1 l=3)
sr.f = 0+(0*1) = 0
sr.st = 1*1 = 1
count1 = (3+(1-1))/1 = 3
stop1 = 0+(3*1) = 3
count2 = (3+(1-1))/1 = 3
stop2 = 0+(3*1) = 3
stopx = 0+(3*1) = 3
stopr = min(3,3) = 3
sr.l = 3 - 0 = 3
xxx ----------------------------
012 sr=(f=0 st=1 l=3)
Example 6:
00000
01234
xx
01 s1=(f=0 st=1 l=2)
0 s2=(f=0 st=1 l=1)
----------------------------
x ----------------------------
sr=(f=0 st=1 l=1)
sr.f = 0+(0*1) = 0
sr.st = 1*1 = 1
count1 = (2+(1-1))/1 = 2
stop1 = 0+(2*1) = 2
count2 = (1+(1-1))/1 = 1
stop2 = 0+(1*1) = 1
stopx = 0+(1*1) = 1
stopr = min(1,2) = 1
sr.l = 1 - 0 = 1
*/
/* compose slice src into slice dst; dst != src.
Compose means that the src constraint is applied
to the output of the dst constraint.
*/
#define MAP(s1,i) ((s1)->first + ((s1)->stride*(i)))
#define XMIN(x,y) ((x) < (y) ? (x) : (y))
#define XMAX(x,y) ((x) > (y) ? (x) : (y))
int
dceslicecompose(DCEslice* s1, DCEslice* s2)
dceslicecompose(DCEslice* s1, DCEslice* s2, DCEslice* result)
{
int err = NC_NOERR;
DCEslice tmp;
size_t count1, stop1, count2, stop2, stopx, stopr;
size_t lastx = 0;
DCEslice sr; /* For back compatability so s1 and result can be same object */
#ifdef DEBUG1
dumpslice("compose: s1",s1);
dumpslice("compose: s2",s2);
#endif
tmp.node.sort = CES_SLICE;
tmp.first = s1->first+(s2->first * s1->stride);
tmp.stride = s1->stride * s2->stride;
count1 = (s1->length + (s1->stride - 1))/s1->stride;
stop1 = s1->first + (count1 * s1->stride);
count2 = (s2->length + (s2->stride - 1))/s2->stride;
stop2 = s2->first + (count2 * s2->stride);
stopx = s1->first + (stop2 * s1->stride);
stopr = (stopx < stop1 ? stopx : stop1); /* min(stopx,stop1) */
tmp.length = (stopr - tmp.first);
tmp.declsize = (s1->declsize < s2->declsize ? s2->declsize : s1->declsize); /* use max declsize */
sr.node.sort = CES_SLICE;
sr.stride = s1->stride * s2->stride;
sr.first = MAP(s1,s2->first);
if(sr.first > s1->last)
return NC_EINVALCOORDS;
lastx = MAP(s1,s2->last);
sr.last = XMIN(s1->last,lastx);
sr.length = (sr.last + 1) - sr.first;
sr.declsize = XMAX(s1->declsize,s2->declsize); /* use max declsize */
/* fill in other fields */
tmp.stop = tmp.first + tmp.length;
tmp.count = (tmp.length + (tmp.stride - 1))/tmp.stride;
*s1 = tmp;
sr.count = (sr.length + (sr.stride - 1))/sr.stride;
#ifdef DEBUG1
dumpslice("compose: result",&tmp);
dumpslice("compose: result",sr);
#endif
*result = sr;
return err;
}
/*
Given two projection lists, merge
src into dst taking
overlapping projections into acct.
Dst will be modified.
*/
int
dcemergeprojectionlists(NClist* dst, NClist* src)
{
@ -327,7 +294,7 @@ dcemergeprojections(DCEprojection* merged, DCEprojection* addition)
/* If one segment has larger rank, then copy the extra slices unchanged */
for(j=0;j<addedseg->rank;j++) {
if(j < mergedseg->rank)
dceslicecompose(mergedseg->slices+j,addedseg->slices+j);
dceslicecompose(mergedseg->slices+j,addedseg->slices+j,mergedseg->slices+j);
else
mergedseg->slices[j] = addedseg->slices[j];
}
@ -945,7 +912,7 @@ dceiswholeslice(DCEslice* slice)
{
if(slice->first != 0
|| slice->stride != 1
|| slice->stop != slice->declsize) return 0;
|| slice->length != slice->declsize) return 0;
return 1;
}
@ -968,9 +935,9 @@ dcemakewholeslice(DCEslice* slice, size_t declsize)
slice->first = 0;
slice->stride = 1;
slice->length = declsize;
slice->stop = declsize;
slice->declsize = declsize;
slice->count = declsize;
slice->last = slice->length - 1;
}
/* Remove slicing from terminal segment of p */
@ -1091,12 +1058,12 @@ dcedumpraw(DCEnode* node, NCbytes* buf)
case CES_SLICE: {
DCEslice* slice = (DCEslice*)node;
snprintf(tmp,sizeof(tmp),
" [first=%lu count=%lu stride=%lu len=%lu stop=%lu size=%lu]",
" [first=%lu stride=%lu last=%lu len=%lu count=%lu size=%lu]",
(unsigned long)slice->first,
(unsigned long)slice->count,
(unsigned long)slice->stride,
(unsigned long)slice->last,
(unsigned long)slice->length,
(unsigned long)slice->stop,
(unsigned long)slice->count,
(unsigned long)slice->declsize);
ncbytescat(buf,tmp);
} break;

View File

@ -24,7 +24,7 @@ typedef struct DCEslice {
size_t first;
size_t stride;
size_t length;
size_t stop; /* first + length */
size_t last; /* first + length - 1*/
size_t count; /* (length + (stride-1))/ stride == actual # of elements returned to client*/
size_t declsize; /* from defining dimension, if any.*/
} DCEslice;
@ -91,7 +91,7 @@ typedef struct DCEconstraint {
extern int dceparseconstraints(char* constraints, DCEconstraint* DCEonstraint);
extern int dceslicecompose(DCEslice* dst, DCEslice* src);
extern int dceslicecompose(DCEslice* s1, DCEslice* s2, DCEslice* sr);
extern int dcemergeprojectionlists(NClist* dst, NClist* src);
extern DCEnode* dceclone(DCEnode* node);

View File

@ -144,8 +144,8 @@ range(DCEparsestate* state, Object sfirst, Object sstride, Object slast)
dceerror(state,"Illegal index for range last index");
slice->first = first;
slice->stride = (stride == 0 ? 1 : stride);
slice->stop = last + 1;
slice->length = slice->stop - slice->first;
slice->last = last;
slice->length = (slice->last - slice->first) + 1;
slice->count = slice->length / slice->stride;
#ifdef DEBUG
fprintf(stderr," ce.slice: %s\n",
@ -207,9 +207,9 @@ array_indices(DCEparsestate* state, Object list0, Object indexno)
slice = (DCEslice*)dcecreate(CES_SLICE);
slice->first = start;
slice->stride = 1;
slice->count = 1;
slice->length = 1;
slice->stop = start+1;
slice->last = start;
slice->count = 1;
nclistpush(list,(void*)slice);
return list;
}

View File

@ -10,8 +10,8 @@ PROG="$TOP/ncdump/ncdump"
P=`pwd`
F="http://test.opendap.org:8080/dods/dts/test.03"
CON="i32[0:1][1:2][0:2]"
F="http://opendap.co-ops.nos.noaa.gov/dods/IOOS/SixMin_Verified_Water_Level"
CON="&WATERLEVEL_6MIN_VFD_PX._STATION_ID=%228779770%22&WATERLEVEL_6MIN_VFD_PX._DATUM=%22MSL%22&WATERLEVEL_6MIN_VFD_PX._BEGIN_DATE=%2220061001%22&WATERLEVEL_6MIN_VFD_PX._END_DATE=%2220061030%22"
PARMS="[log]"
#PARMS="${PARMS}[netcdf3]"

View File

@ -59,9 +59,9 @@ main(int argc, char **argv)
filesize = atol(argv[1]);
} else {
if(sizeof(size_t) == 4)
filesize = 1000000000;
filesize = 1000000000L;
else if(sizeof(size_t) == 8)
filesize = 3000000000;
filesize = 3000000000L;
else {
fprintf(stderr,"Cannot compute filesize\n");
exit(1);