diff --git a/contrib/pg_dumplo/lo_export.c b/contrib/pg_dumplo/lo_export.c
index e18c3ef651..248cf831f5 100644
--- a/contrib/pg_dumplo/lo_export.c
+++ b/contrib/pg_dumplo/lo_export.c
@@ -94,7 +94,7 @@ pglo_export(LODumpMaster *pgLO)
* Query
* ----------
*/
- sprintf(Qbuff, "SELECT x.%s FROM %s x, pg_class c WHERE x.%s = c.oid and c.relkind = 'l'",
+ sprintf(Qbuff, "SELECT DISTINCT x.\"%s\" FROM \"%s\" x, pg_largeobject l WHERE x.\"%s\" = l.loid",
ll->lo_attr, ll->lo_table, ll->lo_attr);
/* puts(Qbuff); */
@@ -104,7 +104,8 @@ pglo_export(LODumpMaster *pgLO)
if ((tuples = PQntuples(pgLO->res)) == 0) {
if (!pgLO->quiet && pgLO->action == ACTION_EXPORT_ATTR)
- printf("%s: no large objets in '%s'\n", progname, ll->lo_table);
+ printf("%s: no large objects in '%s'\n",
+ progname, ll->lo_table);
continue;
} else if (check_res(pgLO)) {
diff --git a/contrib/vacuumlo/vacuumlo.c b/contrib/vacuumlo/vacuumlo.c
index 3f2c592c09..6e46caf8dd 100644
--- a/contrib/vacuumlo/vacuumlo.c
+++ b/contrib/vacuumlo/vacuumlo.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.5 2000/06/19 13:54:50 momjian Exp $
+ * $Header: /cvsroot/pgsql/contrib/vacuumlo/vacuumlo.c,v 1.6 2000/10/24 01:38:20 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -59,10 +59,9 @@ vacuumlo(char *database, int verbose)
* First we create and populate the lo temp table
*/
buf[0] = '\0';
- strcat(buf, "SELECT oid AS lo ");
+ strcat(buf, "SELECT DISTINCT loid AS lo ");
strcat(buf, "INTO TEMP TABLE vacuum_l ");
- strcat(buf, "FROM pg_class ");
- strcat(buf, "WHERE relkind='l'");
+ strcat(buf, "FROM pg_largeobject ");
if (!(res = PQexec(conn, buf)))
{
fprintf(stderr, "Failed to create temp table.\n");
diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml
index c8daa1f7a4..446449d95e 100644
--- a/doc/src/sgml/ref/psql-ref.sgml
+++ b/doc/src/sgml/ref/psql-ref.sgml
@@ -1,5 +1,5 @@
@@ -706,7 +706,8 @@ lo_import 152801
Shows a list of all Postgres large
- objects
currently stored in the database along with their owners.
+ objects currently stored in the database, along with any
+ comments provided for them.
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index 6a5beee94d..e17a37388c 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -2,7 +2,7 @@
#
# Makefile for catalog
#
-# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.30 2000/10/22 05:27:10 momjian Exp $
+# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.31 2000/10/24 01:38:23 tgl Exp $
#
#-------------------------------------------------------------------------
@@ -11,7 +11,8 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
- pg_aggregate.o pg_operator.o pg_proc.o pg_type.o
+ pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
+ pg_type.o
BKIFILES = global.bki template1.bki global.description template1.description
@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_proc.h pg_type.h pg_attribute.h pg_class.h \
pg_inherits.h pg_index.h pg_statistic.h \
pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
- pg_language.h \
+ pg_language.h pg_largeobject.h \
pg_aggregate.h pg_ipl.h pg_inheritproc.h \
pg_rewrite.h pg_listener.h pg_description.h indexing.h \
)
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index 342896a93b..1a96c3f5ea 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.71 2000/10/22 05:27:10 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.72 2000/10/24 01:38:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,6 +51,8 @@ char *Name_pg_inherits_indices[Num_pg_inherits_indices] =
{InheritsRelidSeqnoIndex};
char *Name_pg_language_indices[Num_pg_language_indices] =
{LanguageOidIndex, LanguageNameIndex};
+char *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
+{LargeObjectLOidPNIndex};
char *Name_pg_listener_indices[Num_pg_listener_indices] =
{ListenerPidRelnameIndex};
char *Name_pg_opclass_indices[Num_pg_opclass_indices] =
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
new file mode 100644
index 0000000000..c471a9ae13
--- /dev/null
+++ b/src/backend/catalog/pg_largeobject.c
@@ -0,0 +1,184 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.c
+ * routines to support manipulation of the pg_largeobject relation
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.5 2000/10/24 01:38:23 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_largeobject.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+
+
+/*
+ * Create a large object having the given LO identifier.
+ *
+ * We do this by inserting an empty first page, so that the object will
+ * appear to exist with size 0. Note that the unique index will reject
+ * an attempt to create a duplicate page.
+ *
+ * Return value is OID assigned to the page tuple (any use in it?)
+ */
+Oid
+LargeObjectCreate(Oid loid)
+{
+ Oid retval;
+ Relation pg_largeobject;
+ HeapTuple ntup;
+ Relation idescs[Num_pg_largeobject_indices];
+ Datum values[Natts_pg_largeobject];
+ char nulls[Natts_pg_largeobject];
+ int i;
+
+ pg_largeobject = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+
+ /*
+ * Form new tuple
+ */
+ for (i = 0; i < Natts_pg_largeobject; i++)
+ {
+ values[i] = (Datum)NULL;
+ nulls[i] = ' ';
+ }
+
+ i = 0;
+ values[i++] = ObjectIdGetDatum(loid);
+ values[i++] = Int32GetDatum(0);
+ values[i++] = DirectFunctionCall1(byteain,
+ CStringGetDatum(""));
+
+ ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
+
+ /*
+ * Insert it
+ */
+ retval = heap_insert(pg_largeobject, ntup);
+
+ /*
+ * Update indices
+ */
+ if (!IsIgnoringSystemIndexes())
+ {
+ CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+ CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
+ CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+ }
+
+ heap_close(pg_largeobject, RowExclusiveLock);
+
+ heap_freetuple(ntup);
+
+ return retval;
+}
+
+void
+LargeObjectDrop(Oid loid)
+{
+ bool found = false;
+ Relation pg_largeobject;
+ Relation pg_lo_idx;
+ ScanKeyData skey[1];
+ IndexScanDesc sd;
+ RetrieveIndexResult indexRes;
+ HeapTupleData tuple;
+ Buffer buffer;
+
+ ScanKeyEntryInitialize(&skey[0],
+ (bits16) 0x0,
+ (AttrNumber) 1,
+ (RegProcedure) F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+ pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+ sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+ tuple.t_datamcxt = CurrentMemoryContext;
+ tuple.t_data = NULL;
+
+ while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+ {
+ tuple.t_self = indexRes->heap_iptr;
+ heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+ pfree(indexRes);
+ if (tuple.t_data != NULL)
+ {
+ heap_delete(pg_largeobject, &tuple.t_self, NULL);
+ ReleaseBuffer(buffer);
+ found = true;
+ }
+ }
+
+ index_endscan(sd);
+
+ index_close(pg_lo_idx);
+ heap_close(pg_largeobject, RowShareLock);
+
+ if (!found)
+ elog(ERROR, "LargeObjectDrop: large object %u not found", loid);
+}
+
+bool
+LargeObjectExists(Oid loid)
+{
+ bool retval = false;
+ Relation pg_largeobject;
+ Relation pg_lo_idx;
+ ScanKeyData skey[1];
+ IndexScanDesc sd;
+ RetrieveIndexResult indexRes;
+ HeapTupleData tuple;
+ Buffer buffer;
+
+ /*
+ * See if we can find any tuples belonging to the specified LO
+ */
+ ScanKeyEntryInitialize(&skey[0],
+ (bits16) 0x0,
+ (AttrNumber) 1,
+ (RegProcedure) F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+ pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+ sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+ tuple.t_datamcxt = CurrentMemoryContext;
+ tuple.t_data = NULL;
+
+ while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+ {
+ tuple.t_self = indexRes->heap_iptr;
+ heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+ pfree(indexRes);
+ if (tuple.t_data != NULL)
+ {
+ retval = true;
+ ReleaseBuffer(buffer);
+ break;
+ }
+ }
+
+ index_endscan(sd);
+
+ index_close(pg_lo_idx);
+ heap_close(pg_largeobject, RowShareLock);
+
+ return retval;
+}
diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c
index bb5c7f6e55..7eff84e5d3 100644
--- a/src/backend/libpq/be-fsstubs.c
+++ b/src/backend/libpq/be-fsstubs.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.54 2000/10/22 05:27:12 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.55 2000/10/24 01:38:26 tgl Exp $
*
* NOTES
* This should be moved to a more appropriate place. It is here
@@ -32,13 +32,13 @@
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
#include
#include
#include
#include
-#include "postgres.h"
-
#include "catalog/pg_shadow.h"
#include "libpq/be-fsstubs.h"
#include "libpq/libpq-fs.h"
@@ -50,8 +50,7 @@
/*#define FSDB 1*/
#define MAX_LOBJ_FDS 256
-#define BUFSIZE 1024
-#define FNAME_BUFSIZE 8192
+#define BUFSIZE 8192
/*
* LO "FD"s are indexes into this array.
@@ -141,10 +140,10 @@ lo_close(PG_FUNCTION_ARGS)
inv_close(cookies[fd]);
- MemoryContextSwitchTo(currentContext);
-
deleteLOfd(fd);
+ MemoryContextSwitchTo(currentContext);
+
PG_RETURN_INT32(0);
}
@@ -267,7 +266,7 @@ lo_creat(PG_FUNCTION_ARGS)
PG_RETURN_OID(InvalidOid);
}
- lobjId = RelationGetRelid(lobjDesc->heap_r);
+ lobjId = lobjDesc->id;
inv_close(lobjDesc);
@@ -310,8 +309,8 @@ lo_unlink(PG_FUNCTION_ARGS)
* any LO-specific data structures at all. (Again, that's probably
* more than this module ought to be assuming.)
*
- * XXX there ought to be some code to clean up any open LOs that
- * reference the specified relation... as is, they remain "open".
+ * XXX there ought to be some code to clean up any open LO FDs that
+ * reference the specified LO... as is, they remain "open".
*/
PG_RETURN_INT32(inv_drop(lobjId));
}
@@ -367,7 +366,7 @@ lo_import(PG_FUNCTION_ARGS)
int nbytes,
tmp;
char buf[BUFSIZE];
- char fnamebuf[FNAME_BUFSIZE];
+ char fnamebuf[MAXPGPATH];
LargeObjectDesc *lobj;
Oid lobjOid;
@@ -382,8 +381,8 @@ lo_import(PG_FUNCTION_ARGS)
* open the file to be read in
*/
nbytes = VARSIZE(filename) - VARHDRSZ;
- if (nbytes >= FNAME_BUFSIZE)
- nbytes = FNAME_BUFSIZE-1;
+ if (nbytes >= MAXPGPATH)
+ nbytes = MAXPGPATH-1;
memcpy(fnamebuf, VARDATA(filename), nbytes);
fnamebuf[nbytes] = '\0';
fd = PathNameOpenFile(fnamebuf, O_RDONLY | PG_BINARY, 0666);
@@ -398,12 +397,7 @@ lo_import(PG_FUNCTION_ARGS)
if (lobj == NULL)
elog(ERROR, "lo_import: can't create inv object for \"%s\"",
fnamebuf);
-
- /*
- * the oid for the large object is just the oid of the relation
- * XInv??? which contains the data.
- */
- lobjOid = RelationGetRelid(lobj->heap_r);
+ lobjOid = lobj->id;
/*
* read in from the Unix file and write to the inversion file
@@ -411,7 +405,7 @@ lo_import(PG_FUNCTION_ARGS)
while ((nbytes = FileRead(fd, buf, BUFSIZE)) > 0)
{
tmp = inv_write(lobj, buf, nbytes);
- if (tmp < nbytes)
+ if (tmp != nbytes)
elog(ERROR, "lo_import: error while reading \"%s\"",
fnamebuf);
}
@@ -435,7 +429,7 @@ lo_export(PG_FUNCTION_ARGS)
int nbytes,
tmp;
char buf[BUFSIZE];
- char fnamebuf[FNAME_BUFSIZE];
+ char fnamebuf[MAXPGPATH];
LargeObjectDesc *lobj;
mode_t oumask;
@@ -461,8 +455,8 @@ lo_export(PG_FUNCTION_ARGS)
* world-writable export files doesn't seem wise.
*/
nbytes = VARSIZE(filename) - VARHDRSZ;
- if (nbytes >= FNAME_BUFSIZE)
- nbytes = FNAME_BUFSIZE-1;
+ if (nbytes >= MAXPGPATH)
+ nbytes = MAXPGPATH-1;
memcpy(fnamebuf, VARDATA(filename), nbytes);
fnamebuf[nbytes] = '\0';
oumask = umask((mode_t) 0022);
@@ -473,12 +467,12 @@ lo_export(PG_FUNCTION_ARGS)
fnamebuf);
/*
- * read in from the Unix file and write to the inversion file
+ * read in from the inversion file and write to the Unix file
*/
while ((nbytes = inv_read(lobj, buf, BUFSIZE)) > 0)
{
tmp = FileWrite(fd, buf, nbytes);
- if (tmp < nbytes)
+ if (tmp != nbytes)
elog(ERROR, "lo_export: error while writing \"%s\"",
fnamebuf);
}
@@ -513,7 +507,7 @@ lo_commit(bool isCommit)
if (cookies[i] != NULL)
{
if (isCommit)
- inv_cleanindex(cookies[i]);
+ inv_close(cookies[i]);
cookies[i] = NULL;
}
}
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c
index 5b7df0562a..607c4861dc 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -9,77 +9,51 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.78 2000/10/22 05:27:15 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.79 2000/10/24 01:38:29 tgl Exp $
*
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
+#include
#include
#include
#include
-#include "postgres.h"
-
#include "access/genam.h"
#include "access/heapam.h"
#include "access/nbtree.h"
+#include "access/htup.h"
#include "catalog/catalog.h"
+#include "catalog/catname.h"
#include "catalog/heap.h"
#include "catalog/index.h"
+#include "catalog/indexing.h"
#include "catalog/pg_opclass.h"
+#include "catalog/pg_largeobject.h"
#include "catalog/pg_type.h"
#include "libpq/libpq-fs.h"
#include "miscadmin.h"
#include "storage/large_object.h"
#include "storage/smgr.h"
#include "utils/fmgroids.h"
-#include "utils/relcache.h"
+#include "utils/builtins.h"
-/*
- * Warning, Will Robinson... In order to pack data into an inversion
- * file as densely as possible, we violate the class abstraction here.
- * When we're appending a new tuple to the end of the table, we check
- * the last page to see how much data we can put on it. If it's more
- * than IMINBLK, we write enough to fill the page. This limits external
- * fragmentation. In no case can we write more than IMAXBLK, since
- * the 8K postgres page size less overhead leaves only this much space
- * for data.
- */
-/*
- * In order to prevent buffer leak on transaction commit, large object
- * scan index handling has been modified. Indexes are persistant inside
- * a transaction but may be closed between two calls to this API (when
- * transaction is committed while object is opened, or when no
- * transaction is active). Scan indexes are thus now reinitialized using
- * the object current offset. [PA]
- *
- * Some cleanup has been also done for non freed memory.
- *
- * For subsequent notes, [PA] is Pascal André
- */
-
-#define IFREESPC(p) (PageGetFreeSpace(p) - \
- MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
- MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
- sizeof(double))
-#define IMAXBLK 8092
-#define IMINBLK 512
-
-/* non-export function prototypes */
-static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
- Page page, char *dbuf, int nwrite);
-static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
-static int inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
- HeapTuple tuple, Buffer buffer);
-static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
-static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
+static int32
+getbytealen(bytea *data)
+{
+ Assert(! VARATT_IS_EXTENDED(data));
+ if (VARSIZE(data) < VARHDRSZ)
+ elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
+ return (VARSIZE(data) - VARHDRSZ);
+}
/*
* inv_create -- create a new large object.
*
* Arguments:
- * flags -- was archive, smgr
+ * flags
*
* Returns:
* large object descriptor, appropriately filled in.
@@ -87,168 +61,80 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
LargeObjectDesc *
inv_create(int flags)
{
- LargeObjectDesc *retval;
Oid file_oid;
- Relation r;
- Relation indr;
- TupleDesc tupdesc;
- IndexInfo *indexInfo;
- Oid classObjectId[1];
- char objname[NAMEDATALEN];
- char indname[NAMEDATALEN];
+ LargeObjectDesc *retval;
/*
- * add one here since the pg_class tuple created will have the next
- * oid and we want to have the relation name to correspond to the
- * tuple OID
+ * Allocate an OID to be the LO's identifier.
*/
- file_oid = newoid() + 1;
+ file_oid = newoid();
- /* come up with some table names */
- sprintf(objname, "xinv%u", file_oid);
- sprintf(indname, "xinx%u", file_oid);
-
- if (RelnameFindRelid(objname) != InvalidOid)
- elog(ERROR,
- "internal error: %s already exists -- cannot create large obj",
- objname);
- if (RelnameFindRelid(indname) != InvalidOid)
- elog(ERROR,
- "internal error: %s already exists -- cannot create large obj",
- indname);
-
- /* this is pretty painful... want a tuple descriptor */
- tupdesc = CreateTemplateTupleDesc(2);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1,
- "olastbye",
- INT4OID,
- -1, 0, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 2,
- "odata",
- BYTEAOID,
- -1, 0, false);
+ /* Check for duplicate (shouldn't happen) */
+ if (LargeObjectExists(file_oid))
+ elog(ERROR, "inv_create: large object %u already exists. This is internal error.", file_oid);
/*
- * First create the table to hold the inversion large object. It will
- * be located on whatever storage manager the user requested.
+ * Create the LO by writing an empty first page for it in pg_largeobject
*/
+ (void) LargeObjectCreate(file_oid);
- heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
- false, false);
-
- /* make the relation visible in this transaction */
+ /*
+ * Advance command counter so that new tuple will be seen by later
+ * large-object operations in this transaction.
+ */
CommandCounterIncrement();
- /*--------------------
- * We hold AccessShareLock on any large object we have open
- * by inv_create or inv_open; it is released by inv_close.
- * Note this will not conflict with ExclusiveLock or ShareLock
- * that we acquire when actually reading/writing; it just prevents
- * deletion of the large object while we have it open.
- *--------------------
- */
- r = heap_openr(objname, AccessShareLock);
-
/*
- * Now create a btree index on the relation's olastbyte attribute to
- * make seeks go faster.
+ * Prepare LargeObjectDesc data structure for accessing LO
*/
- indexInfo = makeNode(IndexInfo);
- indexInfo->ii_NumIndexAttrs = 1;
- indexInfo->ii_NumKeyAttrs = 1;
- indexInfo->ii_KeyAttrNumbers[0] = 1;
- indexInfo->ii_Predicate = NULL;
- indexInfo->ii_FuncOid = InvalidOid;
- indexInfo->ii_Unique = false;
-
- classObjectId[0] = INT4_OPS_OID;
-
- index_create(objname, indname, indexInfo,
- BTREE_AM_OID, classObjectId,
- false, false, false);
-
- /* make the index visible in this transaction */
- CommandCounterIncrement();
-
- indr = index_openr(indname);
-
- if (!RelationIsValid(indr))
- {
- elog(ERROR, "cannot create index for large obj on %s under inversion",
- DatumGetCString(DirectFunctionCall1(smgrout,
- Int16GetDatum(DEFAULT_SMGR))));
- }
-
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
- retval->heap_r = r;
- retval->index_r = indr;
- retval->iscan = (IndexScanDesc) NULL;
- retval->hdesc = RelationGetDescr(r);
- retval->idesc = RelationGetDescr(indr);
- retval->offset = retval->lowbyte = retval->highbyte = 0;
- ItemPointerSetInvalid(&(retval->htid));
- retval->flags = 0;
+ retval->id = file_oid;
+ retval->offset = 0;
- if (flags & INV_WRITE)
- {
- LockRelation(r, ExclusiveLock);
+ if (flags & INV_WRITE) {
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
- }
- else if (flags & INV_READ)
- {
- LockRelation(r, ShareLock);
+ retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+ } else if (flags & INV_READ) {
retval->flags = IFS_RDLOCK;
- }
- retval->flags |= IFS_ATEOF; /* since we know the object is empty */
+ retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+ } else
+ elog(ERROR, "inv_create: invalid flags: %d", flags);
+
+ retval->index_r = index_openr(LargeObjectLOidPNIndex);
return retval;
}
+/*
+ * inv_open -- access an existing large object.
+ *
+ * Returns:
+ * large object descriptor, appropriately filled in.
+ */
LargeObjectDesc *
inv_open(Oid lobjId, int flags)
{
LargeObjectDesc *retval;
- Relation r;
- char *indname;
- Relation indrel;
-
- r = heap_open(lobjId, AccessShareLock);
-
- indname = pstrdup(RelationGetRelationName(r));
-
- /*
- * hack hack hack... we know that the fourth character of the
- * relation name is a 'v', and that the fourth character of the index
- * name is an 'x', and that they're otherwise identical.
- */
- indname[3] = 'x';
- indrel = index_openr(indname);
-
- if (!RelationIsValid(indrel))
- return (LargeObjectDesc *) NULL;
+ if (! LargeObjectExists(lobjId))
+ elog(ERROR, "inv_open: large object %u not found", lobjId);
+
retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
- retval->heap_r = r;
- retval->index_r = indrel;
- retval->iscan = (IndexScanDesc) NULL;
- retval->hdesc = RelationGetDescr(r);
- retval->idesc = RelationGetDescr(indrel);
- retval->offset = retval->lowbyte = retval->highbyte = 0;
- ItemPointerSetInvalid(&(retval->htid));
- retval->flags = 0;
+ retval->id = lobjId;
+ retval->offset = 0;
- if (flags & INV_WRITE)
- {
- LockRelation(r, ExclusiveLock);
+ if (flags & INV_WRITE) {
retval->flags = IFS_WRLOCK | IFS_RDLOCK;
- }
- else if (flags & INV_READ)
- {
- LockRelation(r, ShareLock);
+ retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+ } else if (flags & INV_READ) {
retval->flags = IFS_RDLOCK;
- }
+ retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+ } else
+ elog(ERROR, "inv_open: invalid flags: %d", flags);
+
+ retval->index_r = index_openr(LargeObjectLOidPNIndex);
return retval;
}
@@ -261,174 +147,129 @@ inv_close(LargeObjectDesc *obj_desc)
{
Assert(PointerIsValid(obj_desc));
- if (obj_desc->iscan != (IndexScanDesc) NULL)
- {
- index_endscan(obj_desc->iscan);
- obj_desc->iscan = NULL;
- }
-
+ if (obj_desc->flags & IFS_WRLOCK)
+ heap_close(obj_desc->heap_r, RowExclusiveLock);
+ else if (obj_desc->flags & IFS_RDLOCK)
+ heap_close(obj_desc->heap_r, AccessShareLock);
index_close(obj_desc->index_r);
- heap_close(obj_desc->heap_r, AccessShareLock);
pfree(obj_desc);
}
/*
- * Destroys an existing large object, and frees its associated pointers.
+ * Destroys an existing large object (not to be confused with a descriptor!)
*
* returns -1 if failed
*/
int
inv_drop(Oid lobjId)
{
- Relation r;
-
- r = RelationIdGetRelation(lobjId);
- if (!RelationIsValid(r))
- return -1;
-
- if (r->rd_rel->relkind != RELKIND_LOBJECT)
- {
- /* drop relcache refcount from RelationIdGetRelation */
- RelationDecrementReferenceCount(r);
- return -1;
- }
+ LargeObjectDrop(lobjId);
/*
- * Since heap_drop_with_catalog will destroy the relcache entry,
- * there's no need to drop the refcount in this path.
+ * Advance command counter so that tuple removal will be seen by later
+ * large-object operations in this transaction.
*/
- heap_drop_with_catalog(RelationGetRelationName(r), false);
+ CommandCounterIncrement();
+
return 1;
}
/*
- * inv_stat() -- do a stat on an inversion file.
+ * Determine size of a large object
*
- * For the time being, this is an insanely expensive operation. In
- * order to find the size of the file, we seek to the last block in
- * it and compute the size from that. We scan pg_class to determine
- * the file's owner and create time. We don't maintain mod time or
- * access time, yet.
- *
- * These fields aren't stored in a table anywhere because they're
- * updated so frequently, and postgres only appends tuples at the
- * end of relations. Once clustering works, we should fix this.
+ * NOTE: LOs can contain gaps, just like Unix files. We actually return
+ * the offset of the last byte + 1.
*/
-#ifdef NOT_USED
-
-struct pgstat
-{ /* just the fields we need from stat
- * structure */
- int st_ino;
- int st_mode;
- unsigned int st_size;
- unsigned int st_sizehigh; /* high order bits */
-/* 2^64 == 1.8 x 10^20 bytes */
- int st_uid;
- int st_atime_s; /* just the seconds */
- int st_mtime_s; /* since SysV and the new BSD both have */
- int st_ctime_s; /* usec fields.. */
-};
-
-int
-inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
+static uint32
+inv_getsize(LargeObjectDesc *obj_desc)
{
+ bool found = false;
+ uint32 lastbyte = 0;
+ uint32 thislastbyte;
+ ScanKeyData skey[1];
+ IndexScanDesc sd;
+ RetrieveIndexResult indexRes;
+ HeapTupleData tuple;
+ Buffer buffer;
+ Form_pg_largeobject data;
+ bytea *datafield;
+ bool pfreeit;
+
Assert(PointerIsValid(obj_desc));
- Assert(stbuf != NULL);
- /* need read lock for stat */
- if (!(obj_desc->flags & IFS_RDLOCK))
+ ScanKeyEntryInitialize(&skey[0],
+ (bits16) 0x0,
+ (AttrNumber) 1,
+ (RegProcedure) F_OIDEQ,
+ ObjectIdGetDatum(obj_desc->id));
+
+ sd = index_beginscan(obj_desc->index_r, true, 1, skey);
+
+ tuple.t_datamcxt = CurrentMemoryContext;
+ tuple.t_data = NULL;
+
+ while ((indexRes = index_getnext(sd, ForwardScanDirection)))
{
- LockRelation(obj_desc->heap_r, ShareLock);
- obj_desc->flags |= IFS_RDLOCK;
+ tuple.t_self = indexRes->heap_iptr;
+ heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+ pfree(indexRes);
+ if (tuple.t_data == NULL)
+ continue;
+ found = true;
+ data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+ datafield = &(data->data);
+ pfreeit = false;
+ if (VARATT_IS_EXTENDED(datafield))
+ {
+ datafield = (bytea *)
+ heap_tuple_untoast_attr((varattrib *) datafield);
+ pfreeit = true;
+ }
+ thislastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
+ if (thislastbyte > lastbyte)
+ lastbyte = thislastbyte;
+ if (pfreeit)
+ pfree(datafield);
+ ReleaseBuffer(buffer);
}
+
+ index_endscan(sd);
- stbuf->st_ino = RelationGetRelid(obj_desc->heap_r);
-#if 1
- stbuf->st_mode = (S_IFREG | 0666); /* IFREG|rw-rw-rw- */
-#else
- stbuf->st_mode = 100666; /* IFREG|rw-rw-rw- */
-#endif
- stbuf->st_size = _inv_getsize(obj_desc->heap_r,
- obj_desc->hdesc,
- obj_desc->index_r);
-
- stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
-
- /* we have no good way of computing access times right now */
- stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
-
- return 0;
+ if (!found)
+ elog(ERROR, "inv_getsize: large object %u not found", obj_desc->id);
+ return lastbyte;
}
-#endif
-
int
inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
{
- int oldOffset;
- Datum d;
- ScanKeyData skey;
-
Assert(PointerIsValid(obj_desc));
- if (whence == SEEK_CUR)
+ switch (whence)
{
- offset += obj_desc->offset; /* calculate absolute position */
+ case SEEK_SET:
+ if (offset < 0)
+ elog(ERROR, "inv_seek: invalid offset: %d", offset);
+ obj_desc->offset = offset;
+ break;
+ case SEEK_CUR:
+ if ((obj_desc->offset + offset) < 0)
+ elog(ERROR, "inv_seek: invalid offset: %d", offset);
+ obj_desc->offset += offset;
+ break;
+ case SEEK_END:
+ {
+ uint32 size = inv_getsize(obj_desc);
+ if (offset < 0 || ((uint32) offset) > size)
+ elog(ERROR, "inv_seek: invalid offset");
+ obj_desc->offset = size - offset;
+ }
+ break;
+ default:
+ elog(ERROR, "inv_seek: invalid whence: %d", whence);
}
- else if (whence == SEEK_END)
- {
- /* need read lock for getsize */
- if (!(obj_desc->flags & IFS_RDLOCK))
- {
- LockRelation(obj_desc->heap_r, ShareLock);
- obj_desc->flags |= IFS_RDLOCK;
- }
- offset += _inv_getsize(obj_desc->heap_r,
- obj_desc->hdesc,
- obj_desc->index_r);
- }
- /* now we can assume that the operation is SEEK_SET */
-
- /*
- * Whenever we do a seek, we turn off the EOF flag bit to force
- * ourselves to check for real on the next read.
- */
-
- obj_desc->flags &= ~IFS_ATEOF;
- oldOffset = obj_desc->offset;
- obj_desc->offset = offset;
-
- /* try to avoid doing any work, if we can manage it */
- if (offset >= obj_desc->lowbyte
- && offset <= obj_desc->highbyte
- && oldOffset <= obj_desc->highbyte
- && obj_desc->iscan != (IndexScanDesc) NULL)
- return offset;
-
- /*
- * To do a seek on an inversion file, we start an index scan that will
- * bring us to the right place. Each tuple in an inversion file
- * stores the offset of the last byte that appears on it, and we have
- * an index on this.
- */
- if (obj_desc->iscan != (IndexScanDesc) NULL)
- {
- d = Int32GetDatum(offset);
- btmovescan(obj_desc->iscan, d);
- }
- else
- {
- ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
- Int32GetDatum(offset));
-
- obj_desc->iscan = index_beginscan(obj_desc->index_r,
- (bool) 0, (uint16) 1,
- &skey);
- }
-
- return offset;
+ return obj_desc->offset;
}
int
@@ -442,862 +283,306 @@ inv_tell(LargeObjectDesc *obj_desc)
int
inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{
- HeapTupleData tuple;
- int nread;
- int off;
- int ncopy;
- Datum d;
- struct varlena *fsblock;
- bool isNull;
+ int nread = 0;
+ int n;
+ int off;
+ int len;
+ int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+ uint32 pageoff;
+ ScanKeyData skey[2];
+ IndexScanDesc sd;
+ RetrieveIndexResult indexRes;
+ HeapTupleData tuple;
+ Buffer buffer;
+ Form_pg_largeobject data;
+ bytea *datafield;
+ bool pfreeit;
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
- /* if we're already at EOF, we don't need to do any work here */
- if (obj_desc->flags & IFS_ATEOF)
+ if (nbytes <= 0)
return 0;
- /* make sure we obey two-phase locking */
- if (!(obj_desc->flags & IFS_RDLOCK))
+ ScanKeyEntryInitialize(&skey[0],
+ (bits16) 0x0,
+ (AttrNumber) 1,
+ (RegProcedure) F_OIDEQ,
+ ObjectIdGetDatum(obj_desc->id));
+
+ ScanKeyEntryInitialize(&skey[1],
+ (bits16) 0x0,
+ (AttrNumber) 2,
+ (RegProcedure) F_INT4GE,
+ Int32GetDatum(pageno));
+
+ sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+
+ tuple.t_datamcxt = CurrentMemoryContext;
+ tuple.t_data = NULL;
+
+ while ((indexRes = index_getnext(sd, ForwardScanDirection)))
{
- LockRelation(obj_desc->heap_r, ShareLock);
- obj_desc->flags |= IFS_RDLOCK;
- }
-
- nread = 0;
-
- /* fetch a block at a time */
- while (nread < nbytes)
- {
- Buffer buffer;
-
- /* fetch an inversion file system block */
- inv_fetchtup(obj_desc, &tuple, &buffer);
+ tuple.t_self = indexRes->heap_iptr;
+ heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+ pfree(indexRes);
if (tuple.t_data == NULL)
- {
- obj_desc->flags |= IFS_ATEOF;
- break;
- }
-
- /* copy the data from this block into the buffer */
- d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
- ReleaseBuffer(buffer);
+ continue;
+
+ data = (Form_pg_largeobject) GETSTRUCT(&tuple);
/*
- * If block starts beyond current seek point, then we are looking
- * at a "hole" (unwritten area) in the object. Return zeroes for
- * the "hole".
+ * We assume the indexscan will deliver pages in order. However,
+ * there may be missing pages if the LO contains unwritten "holes".
+ * We want missing sections to read out as zeroes.
*/
- if (obj_desc->offset < obj_desc->lowbyte)
+ pageoff = ((uint32) data->pageno) * LOBLKSIZE;
+ if (pageoff > obj_desc->offset)
{
- int nzeroes = obj_desc->lowbyte - obj_desc->offset;
-
- if (nzeroes > (nbytes - nread))
- nzeroes = (nbytes - nread);
- MemSet(buf, 0, nzeroes);
- buf += nzeroes;
- nread += nzeroes;
- obj_desc->offset += nzeroes;
- if (nread >= nbytes)
- break;
+ n = pageoff - obj_desc->offset;
+ n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+ MemSet(buf + nread, 0, n);
+ nread += n;
+ obj_desc->offset += n;
}
- off = obj_desc->offset - obj_desc->lowbyte;
- ncopy = obj_desc->highbyte - obj_desc->offset + 1;
- if (ncopy > (nbytes - nread))
- ncopy = (nbytes - nread);
- memmove(buf, &(fsblock->vl_dat[off]), ncopy);
+ if (nread < nbytes)
+ {
+ Assert(obj_desc->offset >= pageoff);
+ off = (int) (obj_desc->offset - pageoff);
+ Assert(off >= 0 && off < LOBLKSIZE);
- /* move pointers past the amount we just read */
- buf += ncopy;
- nread += ncopy;
- obj_desc->offset += ncopy;
+ datafield = &(data->data);
+ pfreeit = false;
+ if (VARATT_IS_EXTENDED(datafield))
+ {
+ datafield = (bytea *)
+ heap_tuple_untoast_attr((varattrib *) datafield);
+ pfreeit = true;
+ }
+ len = getbytealen(datafield);
+ if (len > off)
+ {
+ n = len - off;
+ n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+ memcpy(buf + nread, VARDATA(datafield) + off, n);
+ nread += n;
+ obj_desc->offset += n;
+ }
+ if (pfreeit)
+ pfree(datafield);
+ }
+
+ ReleaseBuffer(buffer);
+ if (nread >= nbytes)
+ break;
}
+ index_endscan(sd);
+
return nread;
}
int
inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
{
- HeapTupleData tuple;
- int nwritten;
- int tuplen;
+ int nwritten = 0;
+ int n;
+ int off;
+ int len;
+ int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+ ScanKeyData skey[2];
+ IndexScanDesc sd;
+ RetrieveIndexResult indexRes;
+ HeapTupleData oldtuple;
+ Buffer buffer;
+ Form_pg_largeobject olddata;
+ bool neednextpage;
+ bytea *datafield;
+ bool pfreeit;
+ char workbuf[LOBLKSIZE + VARHDRSZ];
+ char *workb = VARATT_DATA(workbuf);
+ HeapTuple newtup;
+ Datum values[Natts_pg_largeobject];
+ char nulls[Natts_pg_largeobject];
+ char replace[Natts_pg_largeobject];
+ bool write_indices;
+ Relation idescs[Num_pg_largeobject_indices];
Assert(PointerIsValid(obj_desc));
Assert(buf != NULL);
- /*
- * Make sure we obey two-phase locking. A write lock entitles you to
- * read the relation, as well.
- */
+ if (nbytes <= 0)
+ return 0;
- if (!(obj_desc->flags & IFS_WRLOCK))
- {
- LockRelation(obj_desc->heap_r, ExclusiveLock);
- obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
- }
+ write_indices = ! IsIgnoringSystemIndexes();
+ if (write_indices)
+ CatalogOpenIndices(Num_pg_largeobject_indices,
+ Name_pg_largeobject_indices,
+ idescs);
- nwritten = 0;
+ ScanKeyEntryInitialize(&skey[0],
+ (bits16) 0x0,
+ (AttrNumber) 1,
+ (RegProcedure) F_OIDEQ,
+ ObjectIdGetDatum(obj_desc->id));
+
+ ScanKeyEntryInitialize(&skey[1],
+ (bits16) 0x0,
+ (AttrNumber) 2,
+ (RegProcedure) F_INT4GE,
+ Int32GetDatum(pageno));
+
+ sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+
+ oldtuple.t_datamcxt = CurrentMemoryContext;
+ oldtuple.t_data = NULL;
+ olddata = NULL;
+ buffer = InvalidBuffer;
+ neednextpage = true;
- /* write a block at a time */
while (nwritten < nbytes)
{
- Buffer buffer;
-
/*
- * Fetch the current inversion file system block. We can skip
- * the work if we already know we are at EOF.
+ * If possible, get next pre-existing page of the LO. We assume
+ * the indexscan will deliver these in order --- but there may be
+ * holes.
*/
-
- if (obj_desc->flags & IFS_ATEOF)
- tuple.t_data = NULL;
- else
- inv_fetchtup(obj_desc, &tuple, &buffer);
-
- /* either append or replace a block, as required */
- if (tuple.t_data == NULL)
- tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
- else
+ if (neednextpage)
{
- if (obj_desc->offset > obj_desc->highbyte)
+ while ((indexRes = index_getnext(sd, ForwardScanDirection)))
{
- tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
- ReleaseBuffer(buffer);
+ oldtuple.t_self = indexRes->heap_iptr;
+ heap_fetch(obj_desc->heap_r, SnapshotNow, &oldtuple, &buffer);
+ pfree(indexRes);
+ if (oldtuple.t_data != NULL)
+ {
+ olddata = (Form_pg_largeobject) GETSTRUCT(&oldtuple);
+ Assert(olddata->pageno >= pageno);
+ break;
+ }
}
- else
- tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
-
- /*
- * inv_wrold() has already issued WriteBuffer() which has
- * decremented local reference counter (LocalRefCount). So we
- * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
- */
+ neednextpage = false;
}
-
- /* move pointers past the amount we just wrote */
- buf += tuplen;
- nwritten += tuplen;
- obj_desc->offset += tuplen;
- }
-
- /* that's it */
- return nwritten;
-}
-
-/*
- * inv_cleanindex
- * Clean opened indexes for large objects, and clears current result.
- * This is necessary on transaction commit in order to prevent buffer
- * leak.
- * This function must be called for each opened large object.
- * [ PA, 7/17/98 ]
- */
-void
-inv_cleanindex(LargeObjectDesc *obj_desc)
-{
- Assert(PointerIsValid(obj_desc));
-
- if (obj_desc->iscan == (IndexScanDesc) NULL)
- return;
-
- index_endscan(obj_desc->iscan);
- obj_desc->iscan = (IndexScanDesc) NULL;
-
- ItemPointerSetInvalid(&(obj_desc->htid));
-}
-
-/*
- * inv_fetchtup -- Fetch an inversion file system block.
- *
- * This routine finds the file system block containing the offset
- * recorded in the obj_desc structure. Later, we need to think about
- * the effects of non-functional updates (can you rewrite the same
- * block twice in a single transaction?), but for now, we won't bother.
- *
- * Parameters:
- * obj_desc -- the object descriptor.
- * bufP -- pointer to a buffer in the buffer cache; caller
- * must free this.
- *
- * Returns:
- * A heap tuple containing the desired block, or NULL if no
- * such tuple exists.
- */
-static void
-inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
-{
- RetrieveIndexResult res;
- Datum d;
- int firstbyte,
- lastbyte;
- struct varlena *fsblock;
- bool isNull;
-
- /*
- * If we've exhausted the current block, we need to get the next one.
- * When we support time travel and non-functional updates, we will
- * need to loop over the blocks, rather than just have an 'if', in
- * order to find the one we're really interested in.
- */
-
- if (obj_desc->offset > obj_desc->highbyte
- || obj_desc->offset < obj_desc->lowbyte
- || !ItemPointerIsValid(&(obj_desc->htid)))
- {
- ScanKeyData skey;
-
- ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
- Int32GetDatum(obj_desc->offset));
-
- /* initialize scan key if not done */
- if (obj_desc->iscan == (IndexScanDesc) NULL)
+ /*
+ * If we have a pre-existing page, see if it is the page we want
+ * to write, or a later one.
+ */
+ if (olddata != NULL && olddata->pageno == pageno)
{
-
/*
- * As scan index may be prematurely closed (on commit), we
- * must use object current offset (was 0) to reinitialize the
- * entry [ PA ].
+ * Update an existing page with fresh data.
+ *
+ * First, load old data into workbuf
*/
- obj_desc->iscan = index_beginscan(obj_desc->index_r,
- (bool) 0, (uint16) 1,
- &skey);
- }
- else
- index_rescan(obj_desc->iscan, false, &skey);
-
- do
- {
- res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
- if (res == (RetrieveIndexResult) NULL)
+ datafield = &(olddata->data);
+ pfreeit = false;
+ if (VARATT_IS_EXTENDED(datafield))
{
- ItemPointerSetInvalid(&(obj_desc->htid));
- tuple->t_datamcxt = NULL;
- tuple->t_data = NULL;
- return;
+ datafield = (bytea *)
+ heap_tuple_untoast_attr((varattrib *) datafield);
+ pfreeit = true;
}
-
+ len = getbytealen(datafield);
+ Assert(len <= LOBLKSIZE);
+ memcpy(workb, VARDATA(datafield), len);
+ if (pfreeit)
+ pfree(datafield);
/*
- * For time travel, we need to use the actual time qual here,
- * rather that NowTimeQual. We currently have no way to pass
- * a time qual in.
- *
- * This is now valid for snapshot !!! And should be fixed in some
- * way... - vadim 07/28/98
- *
+ * Fill any hole
+ */
+ off = (int) (obj_desc->offset % LOBLKSIZE);
+ if (off > len)
+ MemSet(workb + len, 0, off - len);
+ /*
+ * Insert appropriate portion of new data
+ */
+ n = LOBLKSIZE - off;
+ n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+ memcpy(workb + off, buf + nwritten, n);
+ nwritten += n;
+ obj_desc->offset += n;
+ off += n;
+ /* compute valid length of new page */
+ len = (len >= off) ? len : off;
+ VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+ /*
+ * Form and insert updated tuple
+ */
+ memset(values, 0, sizeof(values));
+ memset(nulls, ' ', sizeof(nulls));
+ memset(replace, ' ', sizeof(replace));
+ values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+ replace[Anum_pg_largeobject_data - 1] = 'r';
+ newtup = heap_modifytuple(&oldtuple, obj_desc->heap_r,
+ values, nulls, replace);
+ heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
+ if (write_indices)
+ CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+ obj_desc->heap_r, newtup);
+ heap_freetuple(newtup);
+ /*
+ * We're done with this old page.
*/
- tuple->t_self = res->heap_iptr;
- heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
- pfree(res);
- } while (tuple->t_data == NULL);
-
- /* remember this tid -- we may need it for later reads/writes */
- ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
- }
- else
- {
- tuple->t_self = obj_desc->htid;
- heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
- if (tuple->t_data == NULL)
- elog(ERROR, "inv_fetchtup: heap_fetch failed");
- }
-
- /*
- * By here, we have the heap tuple we're interested in. We cache the
- * upper and lower bounds for this block in the object descriptor and
- * return the tuple.
- */
-
- d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
- lastbyte = (int32) DatumGetInt32(d);
- d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
-
- /*
- * order of + and - is important -- these are unsigned quantites near
- * 0
- */
- firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
-
- obj_desc->lowbyte = firstbyte;
- obj_desc->highbyte = lastbyte;
-
- return;
-}
-
-/*
- * inv_wrnew() -- append a new filesystem block tuple to the inversion
- * file.
- *
- * In response to an inv_write, we append one or more file system
- * blocks to the class containing the large object. We violate the
- * class abstraction here in order to pack things as densely as we
- * are able. We examine the last page in the relation, and write
- * just enough to fill it, assuming that it has above a certain
- * threshold of space available. If the space available is less than
- * the threshold, we allocate a new page by writing a big tuple.
- *
- * By the time we get here, we know all the parameters passed in
- * are valid, and that we hold the appropriate lock on the heap
- * relation.
- *
- * Parameters:
- * obj_desc: large object descriptor for which to append block.
- * buf: buffer containing data to write.
- * nbytes: amount to write
- *
- * Returns:
- * number of bytes actually written to the new tuple.
- */
-static int
-inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
- Relation hr;
- HeapTuple ntup;
- Buffer buffer;
- Page page;
- int nblocks;
- int nwritten;
-
- hr = obj_desc->heap_r;
-
- /*
- * Get the last block in the relation. If there's no data in the
- * relation at all, then we just get a new block. Otherwise, we check
- * the last block to see whether it has room to accept some or all of
- * the data that the user wants to write. If it doesn't, then we
- * allocate a new block.
- */
-
- nblocks = RelationGetNumberOfBlocks(hr);
-
- if (nblocks > 0)
- {
- buffer = ReadBuffer(hr, nblocks - 1);
- page = BufferGetPage(buffer);
- }
- else
- {
- buffer = ReadBuffer(hr, P_NEW);
- page = BufferGetPage(buffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
- }
-
- /*
- * If the last page is too small to hold all the data, and it's too
- * small to hold IMINBLK, then we allocate a new page. If it will
- * hold at least IMINBLK, but less than all the data requested, then
- * we write IMINBLK here. The caller is responsible for noticing that
- * less than the requested number of bytes were written, and calling
- * this routine again.
- */
-
- nwritten = IFREESPC(page);
- if (nwritten < nbytes)
- {
- if (nwritten < IMINBLK)
- {
ReleaseBuffer(buffer);
- buffer = ReadBuffer(hr, P_NEW);
- page = BufferGetPage(buffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
- if (nbytes > IMAXBLK)
- nwritten = IMAXBLK;
- else
- nwritten = nbytes;
- }
- }
- else
- nwritten = nbytes;
-
- /*
- * Insert a new file system block tuple, index it, and write it out.
- */
-
- ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
- inv_indextup(obj_desc, ntup);
- heap_freetuple(ntup);
-
- /* new tuple is inserted */
- WriteBuffer(buffer);
-
- return nwritten;
-}
-
-static int
-inv_wrold(LargeObjectDesc *obj_desc,
- char *dbuf,
- int nbytes,
- HeapTuple tuple,
- Buffer buffer)
-{
- Relation hr;
- HeapTuple ntup;
- Buffer newbuf;
- Page page;
- Page newpage;
- int tupbytes;
- Datum d;
- struct varlena *fsblock;
- int nwritten,
- nblocks,
- freespc;
- bool isNull;
- int keep_offset;
- RetrieveIndexResult res;
-
- /*
- * Since we're using a no-overwrite storage manager, the way we
- * overwrite blocks is to mark the old block invalid and append a new
- * block. First mark the old block invalid. This violates the tuple
- * abstraction.
- */
-
- TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
- tuple->t_data->t_cmax = GetCurrentCommandId();
- tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
-
- /*
- * If we're overwriting the entire block, we're lucky. All we need to
- * do is to insert a new block.
- */
-
- if (obj_desc->offset == obj_desc->lowbyte
- && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
- {
- WriteBuffer(buffer);
- return inv_wrnew(obj_desc, dbuf, nbytes);
- }
-
- /*
- * By here, we need to overwrite part of the data in the current
- * tuple. In order to reduce the degree to which we fragment blocks,
- * we guarantee that no block will be broken up due to an overwrite.
- * This means that we need to allocate a tuple on a new page, if
- * there's not room for the replacement on this one.
- */
-
- newbuf = buffer;
- page = BufferGetPage(buffer);
- newpage = BufferGetPage(newbuf);
- hr = obj_desc->heap_r;
- freespc = IFREESPC(page);
- d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
- fsblock = (struct varlena *) DatumGetPointer(d);
- tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
-
- if (freespc < tupbytes)
- {
-
- /*
- * First see if there's enough space on the last page of the table
- * to put this tuple.
- */
-
- nblocks = RelationGetNumberOfBlocks(hr);
-
- if (nblocks > 0)
- {
- newbuf = ReadBuffer(hr, nblocks - 1);
- newpage = BufferGetPage(newbuf);
+ oldtuple.t_datamcxt = CurrentMemoryContext;
+ oldtuple.t_data = NULL;
+ olddata = NULL;
+ neednextpage = true;
}
else
{
- newbuf = ReadBuffer(hr, P_NEW);
- newpage = BufferGetPage(newbuf);
- PageInit(newpage, BufferGetPageSize(newbuf), 0);
- }
-
- freespc = IFREESPC(newpage);
-
- /*
- * If there's no room on the last page, allocate a new last page
- * for the table, and put it there.
- */
-
- if (freespc < tupbytes)
- {
- ReleaseBuffer(newbuf);
- newbuf = ReadBuffer(hr, P_NEW);
- newpage = BufferGetPage(newbuf);
- PageInit(newpage, BufferGetPageSize(newbuf), 0);
+ /*
+ * Write a brand new page.
+ *
+ * First, fill any hole
+ */
+ off = (int) (obj_desc->offset % LOBLKSIZE);
+ if (off > 0)
+ MemSet(workb, 0, off);
+ /*
+ * Insert appropriate portion of new data
+ */
+ n = LOBLKSIZE - off;
+ n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+ memcpy(workb + off, buf + nwritten, n);
+ nwritten += n;
+ obj_desc->offset += n;
+ /* compute valid length of new page */
+ len = off + n;
+ VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+ /*
+ * Form and insert updated tuple
+ */
+ memset(values, 0, sizeof(values));
+ memset(nulls, ' ', sizeof(nulls));
+ values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
+ values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
+ values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+ newtup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
+ heap_insert(obj_desc->heap_r, newtup);
+ if (write_indices)
+ CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+ obj_desc->heap_r, newtup);
+ heap_freetuple(newtup);
}
+ pageno++;
}
- nwritten = nbytes;
- if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
- nwritten = obj_desc->highbyte - obj_desc->offset + 1;
- memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
- dbuf, nwritten);
+ if (olddata != NULL)
+ ReleaseBuffer(buffer);
+
+ index_endscan(sd);
+
+ if (write_indices)
+ CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
/*
- * we are rewriting the entire old block, therefore we reset offset to
- * the lowbyte of the original block before jumping into
- * inv_newtuple()
+ * Advance command counter so that my tuple updates will be seen by later
+ * large-object operations in this transaction.
*/
- keep_offset = obj_desc->offset;
- obj_desc->offset = obj_desc->lowbyte;
- ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
- tupbytes);
- /* after we are done, we restore to the true offset */
- obj_desc->offset = keep_offset;
+ CommandCounterIncrement();
- /*
- * By here, we have a page (newpage) that's guaranteed to have enough
- * space on it to put the new tuple. Call inv_newtuple to do the
- * work. Passing NULL as a buffer to inv_newtuple() keeps it from
- * copying any data into the new tuple. When it returns, the tuple is
- * ready to receive data from the old tuple and the user's data
- * buffer.
- */
-/*
- ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
- dptr = ((char *) ntup) + ntup->t_hoff -
- (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
- sizeof(int4)
- + sizeof(fsblock->vl_len);
-
- if (obj_desc->offset > obj_desc->lowbyte) {
- memmove(dptr,
- &(fsblock->vl_dat[0]),
- obj_desc->offset - obj_desc->lowbyte);
- dptr += obj_desc->offset - obj_desc->lowbyte;
- }
-
-
- nwritten = nbytes;
- if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
- nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-
- memmove(dptr, dbuf, nwritten);
- dptr += nwritten;
-
- if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
-*/
-/*
- loc = (obj_desc->highbyte - obj_desc->offset)
- + nwritten;
- sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
-
- what's going on here?? - jolly
-*/
-/*
- sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
- memmove(&(fsblock->vl_dat[0]), dptr, sz);
- }
-*/
-
-
- /* index the new tuple */
- inv_indextup(obj_desc, ntup);
- heap_freetuple(ntup);
-
- /*
- * move the scandesc forward so we don't reread the newly inserted
- * tuple on the next index scan
- */
- res = NULL;
- if (obj_desc->iscan)
- res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
- if (res)
- pfree(res);
-
- /*
- * Okay, by here, a tuple for the new block is correctly placed,
- * indexed, and filled. Write the changed pages out.
- */
-
- WriteBuffer(buffer);
- if (newbuf != buffer)
- WriteBuffer(newbuf);
-
- /* Tuple id is no longer valid */
- ItemPointerSetInvalid(&(obj_desc->htid));
-
- /* done */
return nwritten;
}
-
-static HeapTuple
-inv_newtuple(LargeObjectDesc *obj_desc,
- Buffer buffer,
- Page page,
- char *dbuf,
- int nwrite)
-{
- HeapTuple ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
- PageHeader ph;
- int tupsize;
- int hoff;
- Offset lower;
- Offset upper;
- ItemId itemId;
- OffsetNumber off;
- OffsetNumber limit;
- char *attptr;
-
- /* compute tuple size -- no nulls */
- hoff = offsetof(HeapTupleHeaderData, t_bits);
- hoff = MAXALIGN(hoff);
-
- /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
- tupsize = hoff + (2 * sizeof(int32)) + nwrite;
- tupsize = MAXALIGN(tupsize);
-
- /*
- * Allocate the tuple on the page, violating the page abstraction.
- * This code was swiped from PageAddItem().
- */
-
- ph = (PageHeader) page;
- limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
- /* look for "recyclable" (unused & deallocated) ItemId */
- for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
- {
- itemId = &ph->pd_linp[off - 1];
- if ((((*itemId).lp_flags & LP_USED) == 0) &&
- ((*itemId).lp_len == 0))
- break;
- }
-
- if (off > limit)
- lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
- else if (off == limit)
- lower = ph->pd_lower + sizeof(ItemIdData);
- else
- lower = ph->pd_lower;
-
- upper = ph->pd_upper - tupsize;
-
- itemId = &ph->pd_linp[off - 1];
- (*itemId).lp_off = upper;
- (*itemId).lp_len = tupsize;
- (*itemId).lp_flags = LP_USED;
- ph->pd_lower = lower;
- ph->pd_upper = upper;
-
- ntup->t_datamcxt = NULL;
- ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
-
- /*
- * Tuple is now allocated on the page. Next, fill in the tuple
- * header. This block of code violates the tuple abstraction.
- */
-
- ntup->t_len = tupsize;
- ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
- ntup->t_data->t_oid = newoid();
- TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
- ntup->t_data->t_cmin = GetCurrentCommandId();
- StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
- ntup->t_data->t_cmax = 0;
- ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
- ntup->t_data->t_natts = 2;
- ntup->t_data->t_hoff = hoff;
-
- /* if a NULL is passed in, avoid the calculations below */
- if (dbuf == NULL)
- return ntup;
-
- /*
- * Finally, copy the user's data buffer into the tuple. This violates
- * the tuple and class abstractions.
- */
-
- attptr = ((char *) ntup->t_data) + hoff;
- *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
- attptr += sizeof(int32);
-
- /*
- * * mer fixed disk layout of varlenas to get rid of the need for
- * this. *
- *
- * ((int32 *) attptr) = nwrite + sizeof(int32); * attptr +=
- * sizeof(int32);
- */
-
- *((int32 *) attptr) = nwrite + sizeof(int32);
- attptr += sizeof(int32);
-
- /*
- * If a data buffer was passed in, then copy the data from the buffer
- * to the tuple. Some callers (eg, inv_wrold()) may not pass in a
- * buffer, since they have to copy part of the old tuple data and part
- * of the user's new data into the new tuple.
- */
-
- if (dbuf != (char *) NULL)
- memmove(attptr, dbuf, nwrite);
-
- /* keep track of boundary of current tuple */
- obj_desc->lowbyte = obj_desc->offset;
- obj_desc->highbyte = obj_desc->offset + nwrite - 1;
-
- /* new tuple is filled -- return it */
- return ntup;
-}
-
-static void
-inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
-{
- InsertIndexResult res;
- Datum v[1];
- char n[1];
-
- n[0] = ' ';
- v[0] = Int32GetDatum(obj_desc->highbyte);
- res = index_insert(obj_desc->index_r, &v[0], &n[0],
- &(tuple->t_self), obj_desc->heap_r);
-
- if (res)
- pfree(res);
-}
-
-#ifdef NOT_USED
-
-static void
-DumpPage(Page page, int blkno)
-{
- ItemId lp;
- HeapTuple tup;
- int flags, i, nline;
- ItemPointerData pointerData;
-
- printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
- ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
- ((PageHeader)page)->pd_special);
-
- printf("\t:MaxOffsetNumber=%d\n",
- (int16) PageGetMaxOffsetNumber(page));
-
- nline = (int16) PageGetMaxOffsetNumber(page);
-
-{
- int i;
- char *cp;
-
- i = PageGetSpecialSize(page);
- cp = PageGetSpecialPointer(page);
-
- printf("\t:SpecialData=");
-
- while (i > 0) {
- printf(" 0x%02x", *cp);
- cp += 1;
- i -= 1;
- }
- printf("\n");
-}
- for (i = 0; i < nline; i++) {
- lp = ((PageHeader)page)->pd_linp + i;
- flags = (*lp).lp_flags;
- ItemPointerSet(&pointerData, blkno, 1 + i);
- printf("%s:off=%d:flags=0x%x:len=%d",
- ItemPointerFormExternal(&pointerData), (*lp).lp_off,
- flags, (*lp).lp_len);
-
- if (flags & LP_USED) {
- HeapTupleData htdata;
-
- printf(":USED");
-
- memmove((char *) &htdata,
- (char *) &((char *)page)[(*lp).lp_off],
- sizeof(htdata));
-
- tup = &htdata;
-
- printf("\n\t:ctid=%s:oid=%d",
- ItemPointerFormExternal(&tup->t_ctid),
- tup->t_oid);
- printf(":natts=%d:thoff=%d:",
- tup->t_natts,
- tup->t_hoff);
-
- printf("\n\t:cmin=%u:",
- tup->t_cmin);
-
- printf("xmin=%u:", tup->t_xmin);
-
- printf("\n\t:cmax=%u:",
- tup->t_cmax);
-
- printf("xmax=%u:\n", tup->t_xmax);
-
- } else
- putchar('\n');
- }
-}
-
-static char*
-ItemPointerFormExternal(ItemPointer pointer)
-{
- static char itemPointerString[32];
-
- if (!ItemPointerIsValid(pointer)) {
- memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
- } else {
- sprintf(itemPointerString, "<%u,%u>",
- ItemPointerGetBlockNumber(pointer),
- ItemPointerGetOffsetNumber(pointer));
- }
-
- return itemPointerString;
-}
-
-#endif
-
-static int
-_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
-{
- IndexScanDesc iscan;
- RetrieveIndexResult res;
- HeapTupleData tuple;
- Datum d;
- long size;
- bool isNull;
- Buffer buffer;
-
- /* scan backwards from end */
- iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
-
- do
- {
- res = index_getnext(iscan, BackwardScanDirection);
-
- /*
- * If there are no more index tuples, then the relation is empty,
- * so the file's size is zero.
- */
-
- if (res == (RetrieveIndexResult) NULL)
- {
- index_endscan(iscan);
- return 0;
- }
-
- /*
- * For time travel, we need to use the actual time qual here,
- * rather that NowTimeQual. We currently have no way to pass a
- * time qual in.
- */
- tuple.t_self = res->heap_iptr;
- heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
- pfree(res);
- } while (tuple.t_data == NULL);
-
- /* don't need the index scan anymore */
- index_endscan(iscan);
-
- /* get olastbyte attribute */
- d = heap_getattr(&tuple, 1, hdesc, &isNull);
- size = DatumGetInt32(d) + 1;
- ReleaseBuffer(buffer);
-
- return size;
-}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index a3a914f8af..0d2c161280 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -22,7 +22,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.174 2000/10/22 23:16:55 pjw Exp $
+ * $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.175 2000/10/24 01:38:32 tgl Exp $
*
* Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
*
@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
/* Cursor to get all BLOB tables */
- appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT);
+ appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT DISTINCT loid FROM pg_largeobject");
res = PQexec(g_conn, oidQry->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1874,8 +1874,7 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
* tables before the child tables when traversing the tblinfo*
*
* we ignore tables that are not type 'r' (ordinary relation) or 'S'
- * (sequence) or 'v' (view) --- in particular, Large Object
- * relations (type 'l') are ignored.
+ * (sequence) or 'v' (view).
*/
appendPQExpBuffer(query,
@@ -1886,7 +1885,6 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
"where relname !~ '^pg_' "
"and relkind in ('%c', '%c', '%c') "
"order by oid",
- RELKIND_VIEW,
RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW);
res = PQexec(g_conn, query->data);
@@ -2585,7 +2583,7 @@ getIndices(int *numIndices)
* find all the user-defined indices. We do not handle partial
* indices.
*
- * Notice we skip indices on inversion objects (relkind 'l')
+ * Notice we skip indices on system classes
*
* this is a 4-way join !!
*/
@@ -2597,8 +2595,8 @@ getIndices(int *numIndices)
"from pg_index i, pg_class t1, pg_class t2, pg_am a "
"WHERE t1.oid = i.indexrelid and t2.oid = i.indrelid "
"and t1.relam = a.oid and i.indexrelid > '%u'::oid "
- "and t2.relname !~ '^pg_' and t2.relkind != '%c' and not i.indisprimary",
- g_last_builtin_oid, RELKIND_LOBJECT);
+ "and t2.relname !~ '^pg_' and not i.indisprimary",
+ g_last_builtin_oid);
res = PQexec(g_conn, query->data);
if (!res ||
diff --git a/src/bin/pgtclsh/updateStats.tcl b/src/bin/pgtclsh/updateStats.tcl
index d97c8a7b67..9cb8384dc2 100644
--- a/src/bin/pgtclsh/updateStats.tcl
+++ b/src/bin/pgtclsh/updateStats.tcl
@@ -59,7 +59,7 @@ proc update_attnvals {conn rel} {
proc updateStats { dbName } {
# datnames is the list to be result
set conn [pg_connect $dbName]
- set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_' and relname !~ '^xinv'"]
+ set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_'"]
set ntups [pg_result $res -numTuples]
for {set i 0} {$i < $ntups} {incr i} {
set rel [pg_result $res -getTuple $i]
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 26c54b366a..3db2eb95a6 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -3,7 +3,7 @@
*
* Copyright 2000 by PostgreSQL Global Development Group
*
- * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.24 2000/09/07 04:55:27 ishii Exp $
+ * $Header: /cvsroot/pgsql/src/bin/psql/describe.c,v 1.25 2000/10/24 01:38:38 tgl Exp $
*/
#include "postgres.h"
#include "describe.h"
@@ -1020,10 +1020,6 @@ listTables(const char *infotype, const char *name, bool desc)
strcat(buf, "'S'");
strcat(buf, ")\n");
- /* ignore large-obj indices */
- if (showIndices)
- strcat(buf, " AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
strcat(buf, showSystem ? " AND c.relname ~ '^pg_'\n" : " AND c.relname !~ '^pg_'\n");
if (name)
{
@@ -1050,10 +1046,6 @@ listTables(const char *infotype, const char *name, bool desc)
strcat(buf, "'S'");
strcat(buf, ")\n");
- /* ignore large-obj indices */
- if (showIndices)
- strcat(buf, " AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
strcat(buf, showSystem ? " AND c.relname ~ '^pg_'\n" : " AND c.relname !~ '^pg_'\n");
if (name)
{
diff --git a/src/bin/psql/large_obj.c b/src/bin/psql/large_obj.c
index 020b0173eb..5cfd18c328 100644
--- a/src/bin/psql/large_obj.c
+++ b/src/bin/psql/large_obj.c
@@ -3,7 +3,7 @@
*
* Copyright 2000 by PostgreSQL Global Development Group
*
- * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.10 2000/04/12 17:16:22 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/bin/psql/large_obj.c,v 1.11 2000/10/24 01:38:39 tgl Exp $
*/
#include "postgres.h"
#include "large_obj.h"
@@ -193,7 +193,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg)
/* insert description if given */
if (comment_arg)
{
- sprintf(buf, "INSERT INTO pg_description VALUES (%d, '", loid);
+ sprintf(buf, "INSERT INTO pg_description VALUES (%u, '", loid);
for (i = 0; i < strlen(comment_arg); i++)
if (comment_arg[i] == '\'')
strcat(buf, "\\'");
@@ -284,7 +284,7 @@ do_lo_unlink(const char *loid_arg)
}
/* remove the comment as well */
- sprintf(buf, "DELETE FROM pg_description WHERE objoid = %d", loid);
+ sprintf(buf, "DELETE FROM pg_description WHERE objoid = %u", loid);
if (!(res = PSQLexec(buf)))
{
if (own_transaction)
@@ -328,15 +328,9 @@ do_lo_list(void)
printQueryOpt myopt = pset.popt;
strcpy(buf,
- "SELECT usename as \"Owner\", substring(relname from 5) as \"ID\",\n"
- " obj_description(pg_class.oid) as \"Description\"\n"
- "FROM pg_class, pg_user\n"
- "WHERE usesysid = relowner AND relkind = 'l'\n"
- "UNION\n"
- "SELECT NULL as \"Owner\", substring(relname from 5) as \"ID\",\n"
- " obj_description(pg_class.oid) as \"Description\"\n"
- "FROM pg_class\n"
- "WHERE not exists (select 1 from pg_user where usesysid = relowner) AND relkind = 'l'\n"
+ "SELECT DISTINCT loid as \"ID\",\n"
+ " obj_description(loid) as \"Description\"\n"
+ "FROM pg_largeobject\n"
"ORDER BY \"ID\"");
res = PSQLexec(buf);
diff --git a/src/include/catalog/catname.h b/src/include/catalog/catname.h
index b82977d806..54b964e215 100644
--- a/src/include/catalog/catname.h
+++ b/src/include/catalog/catname.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: catname.h,v 1.16 2000/10/22 05:27:20 momjian Exp $
+ * $Id: catname.h,v 1.17 2000/10/24 01:38:41 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -29,6 +29,7 @@
#define InheritsRelationName "pg_inherits"
#define InheritancePrecidenceListRelationName "pg_ipl"
#define LanguageRelationName "pg_language"
+#define LargeObjectRelationName "pg_largeobject"
#define ListenerRelationName "pg_listener"
#define LogRelationName "pg_log"
#define OperatorClassRelationName "pg_opclass"
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index c16c6ae83e..f6fd284f34 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -37,7 +37,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: catversion.h,v 1.51 2000/10/22 17:55:49 pjw Exp $
+ * $Id: catversion.h,v 1.52 2000/10/24 01:38:41 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 200010231
+#define CATALOG_VERSION_NO 200010232
#endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 6cc98bdc32..7150a43d2d 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: indexing.h,v 1.44 2000/10/22 05:27:20 momjian Exp $
+ * $Id: indexing.h,v 1.45 2000/10/24 01:38:41 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,6 +31,7 @@
#define Num_pg_index_indices 2
#define Num_pg_inherits_indices 1
#define Num_pg_language_indices 2
+#define Num_pg_largeobject_indices 1
#define Num_pg_listener_indices 1
#define Num_pg_opclass_indices 2
#define Num_pg_operator_indices 2
@@ -62,6 +63,7 @@
#define InheritsRelidSeqnoIndex "pg_inherits_relid_seqno_index"
#define LanguageNameIndex "pg_language_name_index"
#define LanguageOidIndex "pg_language_oid_index"
+#define LargeObjectLOidPNIndex "pg_largeobject_loid_pn_index"
#define ListenerPidRelnameIndex "pg_listener_pid_relname_index"
#define OpclassDeftypeIndex "pg_opclass_deftype_index"
#define OpclassNameIndex "pg_opclass_name_index"
@@ -92,6 +94,7 @@ extern char *Name_pg_group_indices[];
extern char *Name_pg_index_indices[];
extern char *Name_pg_inherits_indices[];
extern char *Name_pg_language_indices[];
+extern char *Name_pg_largeobject_indices[];
extern char *Name_pg_listener_indices[];
extern char *Name_pg_opclass_indices[];
extern char *Name_pg_operator_indices[];
@@ -191,6 +194,7 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
/* This column needs to allow multiple zero entries, but is in the cache */
DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index a9592e7ddb..68db583fe3 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: pg_class.h,v 1.43 2000/10/22 17:55:49 pjw Exp $
+ * $Id: pg_class.h,v 1.44 2000/10/24 01:38:41 tgl Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
@@ -174,7 +174,6 @@ DESCR("");
#define XactLockTableId 376
#define RELKIND_INDEX 'i' /* secondary index */
-#define RELKIND_LOBJECT 'l' /* large objects */
#define RELKIND_RELATION 'r' /* ordinary cataloged heap */
#define RELKIND_SPECIAL 's' /* special (non-heap) */
#define RELKIND_SEQUENCE 'S' /* SEQUENCE relation */
diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h
new file mode 100644
index 0000000000..7777604e27
--- /dev/null
+++ b/src/include/catalog/pg_largeobject.h
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.h
+ * definition of the system "largeobject" relation (pg_largeobject)
+ * along with the relation's initial contents.
+ *
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: pg_largeobject.h,v 1.5 2000/10/24 01:38:41 tgl Exp $
+ *
+ * NOTES
+ * the genbki.sh script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LARGEOBJECT_H
+#define PG_LARGEOBJECT_H
+
+/* ----------------
+ * postgres.h contains the system type definintions and the
+ * CATALOG(), BOOTSTRAP and DATA() sugar words so this file
+ * can be read by both genbki.sh and the C compiler.
+ * ----------------
+ */
+
+/* ----------------
+ * pg_largeobject definition. cpp turns this into
+ * typedef struct FormData_pg_largeobject. Large object id
+ * is stored in loid;
+ * ----------------
+ */
+
+CATALOG(pg_largeobject)
+{
+ Oid loid; /* Identifier of large object */
+ int4 pageno; /* Page number (starting from 0) */
+ bytea data; /* Data for page (may be zero-length) */
+} FormData_pg_largeobject;
+
+/* ----------------
+ * Form_pg_largeobject corresponds to a pointer to a tuple with
+ * the format of pg_largeobject relation.
+ * ----------------
+ */
+typedef FormData_pg_largeobject *Form_pg_largeobject;
+
+/* ----------------
+ * compiler constants for pg_largeobject
+ * ----------------
+ */
+#define Natts_pg_largeobject 3
+#define Anum_pg_largeobject_loid 1
+#define Anum_pg_largeobject_pageno 2
+#define Anum_pg_largeobject_data 3
+
+extern Oid LargeObjectCreate(Oid loid);
+extern void LargeObjectDrop(Oid loid);
+extern bool LargeObjectExists(Oid loid);
+
+#endif /* PG_LARGEOBJECT_H */
diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h
index c480f5b787..6bb0c4fcf2 100644
--- a/src/include/storage/large_object.h
+++ b/src/include/storage/large_object.h
@@ -8,39 +8,54 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: large_object.h,v 1.17 2000/10/22 05:27:23 momjian Exp $
+ * $Id: large_object.h,v 1.18 2000/10/24 01:38:43 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef LARGE_OBJECT_H
#define LARGE_OBJECT_H
-#include
+#include "utils/rel.h"
-#include "access/relscan.h"
-/*
- * This structure will eventually have lots more stuff associated with it.
+/*----------
+ * Data about a currently-open large object.
+ *
+ * id is the logical OID of the large object
+ * offset is the current seek offset within the LO
+ * heap_r holds an open-relation reference to pg_largeobject
+ * index_r holds an open-relation reference to pg_largeobject_loid_pn_index
+ *
+ * NOTE: before 7.1, heap_r and index_r held references to the separate
+ * table and index of a specific large object. Now they all live in one rel.
+ *----------
*/
-typedef struct LargeObjectDesc
-{
- Relation heap_r; /* heap relation */
- Relation index_r; /* index relation on seqno attribute */
- IndexScanDesc iscan; /* index scan we're using */
- TupleDesc hdesc; /* heap relation tuple desc */
- TupleDesc idesc; /* index relation tuple desc */
- uint32 lowbyte; /* low byte on the current page */
- uint32 highbyte; /* high byte on the current page */
+typedef struct LargeObjectDesc {
+ Oid id;
uint32 offset; /* current seek pointer */
- ItemPointerData htid; /* tid of current heap tuple */
+ int flags; /* locking info, etc */
+/* flag bits: */
#define IFS_RDLOCK (1 << 0)
#define IFS_WRLOCK (1 << 1)
-#define IFS_ATEOF (1 << 2)
- u_long flags; /* locking info, etc */
+ Relation heap_r;
+ Relation index_r;
} LargeObjectDesc;
+
+/*
+ * Each "page" (tuple) of a large object can hold this much data
+ *
+ * Calculation is max tuple size less tuple header, loid field (Oid),
+ * pageno field (int32), and varlena header of data (int32). Note we
+ * assume none of the fields will be NULL, hence no need for null bitmap.
+ */
+#define LOBLKSIZE (MaxTupleSize \
+ - MAXALIGN(offsetof(HeapTupleHeaderData, t_bits)) \
+ - sizeof(Oid) - sizeof(int32) * 2)
+
+
/*
* Function definitions...
*/
@@ -55,7 +70,4 @@ extern int inv_tell(LargeObjectDesc *obj_desc);
extern int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
extern int inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-/* added for buffer leak prevention [ PA ] */
-extern void inv_cleanindex(LargeObjectDesc *obj_desc);
-
#endif /* LARGE_OBJECT_H */
diff --git a/src/interfaces/odbc/info.c b/src/interfaces/odbc/info.c
index 9c99a120ad..9d4e75a9e0 100644
--- a/src/interfaces/odbc/info.c
+++ b/src/interfaces/odbc/info.c
@@ -1007,8 +1007,7 @@ mylog("%s: entering...stmt=%u\n", func, stmt);
}
- /* filter out large objects unconditionally (they are not system tables) and match users */
- strcat(tables_query, " and relname !~ '^xinv[0-9]+'");
+ /* match users */
strcat(tables_query, " and usesysid = relowner");
strcat(tables_query, " order by relname");
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index f5d2427cfa..9fd96b2280 100644
--- a/src/test/regress/expected/opr_sanity.out
+++ b/src/test/regress/expected/opr_sanity.out
@@ -482,8 +482,8 @@ WHERE p1.aggtransfn = p2.oid AND
(p2.pronargs = 1 AND p1.aggbasetype = 0)));
oid | aggname | oid | proname
-------+---------+-----+-------------
- 16984 | max | 768 | int4larger
- 16998 | min | 769 | int4smaller
+ 16996 | max | 768 | int4larger
+ 17010 | min | 769 | int4smaller
(2 rows)
-- Cross-check finalfn (if present) against its entry in pg_proc.
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index 823d9e142d..f2412386d1 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -40,6 +40,7 @@ SELECT relname, relhasindex
pg_index | t
pg_inherits | t
pg_language | t
+ pg_largeobject | t
pg_listener | t
pg_opclass | t
pg_operator | t
@@ -54,5 +55,5 @@ SELECT relname, relhasindex
shighway | t
tenk1 | t
tenk2 | t
-(44 rows)
+(45 rows)