mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-18 18:44:06 +08:00
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in the FE/BE protocol that are implicit in the old libpq API after each query. The application can then insert Sync at its leisure with a new libpq function PQpipelineSync. This can lead to substantial reductions in query latency. Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com> Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com> Reviewed-by: Daniel Vérité <daniel@manitou-mail.org> Reviewed-by: David G. Johnston <david.g.johnston@gmail.com> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com> Reviewed-by: Michael Paquier <michael.paquier@gmail.com> Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com> Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com> Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
This commit is contained in:
parent
146cb3889c
commit
acb7e4eb6b
@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res);
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="libpq-pgres-pipeline-sync">
|
||||
<term><literal>PGRES_PIPELINE_SYNC</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
The <structname>PGresult</structname> represents a
|
||||
synchronization point in pipeline mode, requested by
|
||||
<xref linkend="libpq-PQpipelineSync"/>.
|
||||
This status occurs only when pipeline mode has been selected.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="libpq-pgres-pipeline-aborted">
|
||||
<term><literal>PGRES_PIPELINE_ABORTED</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
The <structname>PGresult</structname> represents a pipeline that has
|
||||
received an error from the server. <function>PQgetResult</function>
|
||||
must be called repeatedly, and each time it will return this status code
|
||||
until the end of the current pipeline, at which point it will return
|
||||
<literal>PGRES_PIPELINE_SYNC</literal> and normal processing can
|
||||
resume.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
</variablelist>
|
||||
|
||||
If the result status is <literal>PGRES_TUPLES_OK</literal> or
|
||||
@ -4677,8 +4704,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
|
||||
<xref linkend="libpq-PQsendQueryParams"/>,
|
||||
<xref linkend="libpq-PQsendPrepare"/>,
|
||||
<xref linkend="libpq-PQsendQueryPrepared"/>,
|
||||
<xref linkend="libpq-PQsendDescribePrepared"/>, or
|
||||
<xref linkend="libpq-PQsendDescribePortal"/>
|
||||
<xref linkend="libpq-PQsendDescribePrepared"/>,
|
||||
<xref linkend="libpq-PQsendDescribePortal"/>, or
|
||||
<xref linkend="libpq-PQpipelineSync"/>
|
||||
call, and returns it.
|
||||
A null pointer is returned when the command is complete and there
|
||||
will be no more results.
|
||||
@ -4702,6 +4730,19 @@ PGresult *PQgetResult(PGconn *conn);
|
||||
<xref linkend="libpq-PQconsumeInput"/>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
In pipeline mode, <function>PQgetResult</function> will return normally
|
||||
unless an error occurs; for any subsequent query sent after the one
|
||||
that caused the error until (and excluding) the next synchronization point,
|
||||
a special result of type <literal>PGRES_PIPELINE_ABORTED</literal> will
|
||||
be returned, and a null pointer will be returned after it.
|
||||
When the pipeline synchronization point is reached, a result of type
|
||||
<literal>PGRES_PIPELINE_SYNC</literal> will be returned.
|
||||
The result of the next query after the synchronization point follows
|
||||
immediately (that is, no null pointer is returned after
|
||||
the synchronization point.)
|
||||
</para>
|
||||
|
||||
<note>
|
||||
<para>
|
||||
Even when <xref linkend="libpq-PQresultStatus"/> indicates a fatal
|
||||
@ -4926,6 +4967,476 @@ int PQflush(PGconn *conn);
|
||||
|
||||
</sect1>
|
||||
|
||||
<sect1 id="libpq-pipeline-mode">
|
||||
<title>Pipeline Mode</title>
|
||||
|
||||
<indexterm zone="libpq-pipeline-mode">
|
||||
<primary>libpq</primary>
|
||||
<secondary>pipeline mode</secondary>
|
||||
</indexterm>
|
||||
|
||||
<indexterm zone="libpq-pipeline-mode">
|
||||
<primary>pipelining</primary>
|
||||
<secondary>in libpq</secondary>
|
||||
</indexterm>
|
||||
|
||||
<indexterm zone="libpq-pipeline-mode">
|
||||
<primary>batch mode</primary>
|
||||
<secondary>in libpq</secondary>
|
||||
</indexterm>
|
||||
|
||||
<para>
|
||||
<application>libpq</application> pipeline mode allows applications to
|
||||
send a query without having to read the result of the previously
|
||||
sent query. Taking advantage of the pipeline mode, a client will wait
|
||||
less for the server, since multiple queries/results can be
|
||||
sent/received in a single network transaction.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
While pipeline mode provides a significant performance boost, writing
|
||||
clients using the pipeline mode is more complex because it involves
|
||||
managing a queue of pending queries and finding which result
|
||||
corresponds to which query in the queue.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Pipeline mode also generally consumes more memory on both the client and server,
|
||||
though careful and aggressive management of the send/receive queue can mitigate
|
||||
this. This applies whether or not the connection is in blocking or non-blocking
|
||||
mode.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
While the pipeline API was introduced in
|
||||
<productname>PostgreSQL</productname> 14, it is a client-side feature
|
||||
which doesn't require special server support, and works on any server
|
||||
that supports the v3 extended query protocol.
|
||||
</para>
|
||||
|
||||
<sect2 id="libpq-pipeline-using">
|
||||
<title>Using Pipeline Mode</title>
|
||||
|
||||
<para>
|
||||
To issue pipelines, the application must switch the connection
|
||||
into pipeline mode,
|
||||
which is done with <xref linkend="libpq-PQenterPipelineMode"/>.
|
||||
<xref linkend="libpq-PQpipelineStatus"/> can be used
|
||||
to test whether pipeline mode is active.
|
||||
In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link>
|
||||
are permitted, and <literal>COPY</literal> is disallowed.
|
||||
Using synchronous command execution functions
|
||||
such as <function>PQfn</function>,
|
||||
<function>PQexec</function>,
|
||||
<function>PQexecParams</function>,
|
||||
<function>PQprepare</function>,
|
||||
<function>PQexecPrepared</function>,
|
||||
<function>PQdescribePrepared</function>,
|
||||
<function>PQdescribePortal</function>,
|
||||
is an error condition.
|
||||
Once all dispatched commands have had their results processed, and
|
||||
the end pipeline result has been consumed, the application may return
|
||||
to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>.
|
||||
</para>
|
||||
|
||||
<note>
|
||||
<para>
|
||||
It is best to use pipeline mode with <application>libpq</application> in
|
||||
<link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
|
||||
in blocking mode it is possible for a client/server deadlock to occur.
|
||||
<footnote>
|
||||
<para>
|
||||
The client will block trying to send queries to the server, but the
|
||||
server will block trying to send results to the client from queries
|
||||
it has already processed. This only occurs when the client sends
|
||||
enough queries to fill both its output buffer and the server's receive
|
||||
buffer before it switches to processing input from the server,
|
||||
but it's hard to predict exactly when that will happen.
|
||||
</para>
|
||||
</footnote>
|
||||
</para>
|
||||
</note>
|
||||
|
||||
<sect3 id="libpq-pipeline-sending">
|
||||
<title>Issuing Queries</title>
|
||||
|
||||
<para>
|
||||
After entering pipeline mode, the application dispatches requests using
|
||||
<xref linkend="libpq-PQsendQuery"/>,
|
||||
<xref linkend="libpq-PQsendQueryParams"/>,
|
||||
or its prepared-query sibling
|
||||
<xref linkend="libpq-PQsendQueryPrepared"/>.
|
||||
These requests are queued on the client-side until flushed to the server;
|
||||
this occurs when <xref linkend="libpq-PQpipelineSync"/> is used to
|
||||
establish a synchronization point in the pipeline,
|
||||
or when <xref linkend="libpq-PQflush"/> is called.
|
||||
The functions <xref linkend="libpq-PQsendPrepare"/>,
|
||||
<xref linkend="libpq-PQsendDescribePrepared"/>, and
|
||||
<xref linkend="libpq-PQsendDescribePortal"/> also work in pipeline mode.
|
||||
Result processing is described below.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The server executes statements, and returns results, in the order the
|
||||
client sends them. The server will begin executing the commands in the
|
||||
pipeline immediately, not waiting for the end of the pipeline.
|
||||
If any statement encounters an error, the server aborts the current
|
||||
transaction and does not execute any subsequent command in the queue
|
||||
until the next synchronization point established by
|
||||
<function>PQpipelineSync</function>;
|
||||
a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
|
||||
each such command.
|
||||
(This remains true even if the commands in the pipeline would rollback
|
||||
the transaction.)
|
||||
Query processing resumes after the synchronization point.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
It's fine for one operation to depend on the results of a
|
||||
prior one; for example, one query may define a table that the next
|
||||
query in the same pipeline uses. Similarly, an application may
|
||||
create a named prepared statement and execute it with later
|
||||
statements in the same pipeline.
|
||||
</para>
|
||||
</sect3>
|
||||
|
||||
<sect3 id="libpq-pipeline-results">
|
||||
<title>Processing Results</title>
|
||||
|
||||
<para>
|
||||
To process the result of one query in a pipeline, the application calls
|
||||
<function>PQgetResult</function> repeatedly and handles each result
|
||||
until <function>PQgetResult</function> returns null.
|
||||
The result from the next query in the pipeline may then be retrieved using
|
||||
<function>PQgetResult</function> again and the cycle repeated.
|
||||
The application handles individual statement results as normal.
|
||||
When the results of all the queries in the pipeline have been
|
||||
returned, <function>PQgetResult</function> returns a result
|
||||
containing the status value <literal>PGRES_PIPELINE_SYNC</literal>
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The client may choose to defer result processing until the complete
|
||||
pipeline has been sent, or interleave that with sending further
|
||||
queries in the pipeline; see <xref linkend="libpq-pipeline-interleave"/>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
To enter single-row mode, call <function>PQsetSingleRowMode</function>
|
||||
before retrieving results with <function>PQgetResult</function>.
|
||||
This mode selection is effective only for the query currently
|
||||
being processed. For more information on the use of
|
||||
<function>PQsetSingleRowMode</function>,
|
||||
refer to <xref linkend="libpq-single-row-mode"/>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<function>PQgetResult</function> behaves the same as for normal
|
||||
asynchronous processing except that it may contain the new
|
||||
<type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
|
||||
and <literal>PGRES_PIPELINE_ABORTED</literal>.
|
||||
<literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
|
||||
<function>PQpipelineSync</function> at the corresponding point
|
||||
in the pipeline.
|
||||
<literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
|
||||
query result for the first error and all subsequent results
|
||||
until the next <literal>PGRES_PIPELINE_SYNC</literal>;
|
||||
see <xref linkend="libpq-pipeline-errors"/>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
|
||||
operate as normal when processing pipeline results.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<application>libpq</application> does not provide any information to the
|
||||
application about the query currently being processed (except that
|
||||
<function>PQgetResult</function> returns null to indicate that we start
|
||||
returning the results of next query). The application must keep track
|
||||
of the order in which it sent queries, to associate them with their
|
||||
corresponding results.
|
||||
Applications will typically use a state machine or a FIFO queue for this.
|
||||
</para>
|
||||
|
||||
</sect3>
|
||||
|
||||
<sect3 id="libpq-pipeline-errors">
|
||||
<title>Error Handling</title>
|
||||
|
||||
<para>
|
||||
From the client's perspective, after <function>PQresultStatus</function>
|
||||
returns <literal>PGRES_FATAL_ERROR</literal>,
|
||||
the pipeline is flagged as aborted.
|
||||
<function>PQresultStatus</function> will report a
|
||||
<literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
|
||||
operation in an aborted pipeline. The result for
|
||||
<function>PQpipelineSync</function> is reported as
|
||||
<literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
|
||||
and resumption of normal result processing.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The client <emphasis>must</emphasis> process results with
|
||||
<function>PQgetResult</function> during error recovery.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
If the pipeline used an implicit transaction, then operations that have
|
||||
already executed are rolled back and operations that were queued to follow
|
||||
the failed operation are skipped entirely. The same behavior holds if the
|
||||
pipeline starts and commits a single explicit transaction (i.e. the first
|
||||
statement is <literal>BEGIN</literal> and the last is
|
||||
<literal>COMMIT</literal>) except that the session remains in an aborted
|
||||
transaction state at the end of the pipeline. If a pipeline contains
|
||||
<emphasis>multiple explicit transactions</emphasis>, all transactions that
|
||||
committed prior to the error remain committed, the currently in-progress
|
||||
transaction is aborted, and all subsequent operations are skipped completely,
|
||||
including subsequent transactions. If a pipeline synchronization point
|
||||
occurs with an explicit transaction block in aborted state, the next pipeline
|
||||
will become aborted immediately unless the next command puts the transaction
|
||||
in normal mode with <command>ROLLBACK</command>.
|
||||
</para>
|
||||
|
||||
<note>
|
||||
<para>
|
||||
The client must not assume that work is committed when it
|
||||
<emphasis>sends</emphasis> a <literal>COMMIT</literal> — only when the
|
||||
corresponding result is received to confirm the commit is complete.
|
||||
Because errors arrive asynchronously, the application needs to be able to
|
||||
restart from the last <emphasis>received</emphasis> committed change and
|
||||
resend work done after that point if something goes wrong.
|
||||
</para>
|
||||
</note>
|
||||
</sect3>
|
||||
|
||||
<sect3 id="libpq-pipeline-interleave">
|
||||
<title>Interleaving Result Processing and Query Dispatch</title>
|
||||
|
||||
<para>
|
||||
To avoid deadlocks on large pipelines the client should be structured
|
||||
around a non-blocking event loop using operating system facilities
|
||||
such as <function>select</function>, <function>poll</function>,
|
||||
<function>WaitForMultipleObjectEx</function>, etc.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The client application should generally maintain a queue of work
|
||||
remaining to be dispatched and a queue of work that has been dispatched
|
||||
but not yet had its results processed. When the socket is writable
|
||||
it should dispatch more work. When the socket is readable it should
|
||||
read results and process them, matching them up to the next entry in
|
||||
its corresponding results queue. Based on available memory, results from the
|
||||
socket should be read frequently: there's no need to wait until the
|
||||
pipeline end to read the results. Pipelines should be scoped to logical
|
||||
units of work, usually (but not necessarily) one transaction per pipeline.
|
||||
There's no need to exit pipeline mode and re-enter it between pipelines,
|
||||
or to wait for one pipeline to finish before sending the next.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
An example using <function>select()</function> and a simple state
|
||||
machine to track sent and received work is in
|
||||
<filename>src/test/modules/libpq_pipeline/libpq_pipeline.c</filename>
|
||||
in the PostgreSQL source distribution.
|
||||
</para>
|
||||
</sect3>
|
||||
</sect2>
|
||||
|
||||
<sect2 id="libpq-pipeline-functions">
|
||||
<title>Functions Associated with Pipeline Mode</title>
|
||||
|
||||
<variablelist>
|
||||
|
||||
<varlistentry id="libpq-PQpipelineStatus">
|
||||
<term><function>PQpipelineStatus</function><indexterm><primary>PQpipelineStatus</primary></indexterm></term>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
Returns the current pipeline mode status of the
|
||||
<application>libpq</application> connection.
|
||||
<synopsis>
|
||||
PGpipelineStatus PQpipelineStatus(const PGconn *conn);
|
||||
</synopsis>
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<function>PQpipelineStatus</function> can return one of the following values:
|
||||
|
||||
<variablelist>
|
||||
<varlistentry>
|
||||
<term>
|
||||
<literal>PQ_PIPELINE_ON</literal>
|
||||
</term>
|
||||
<listitem>
|
||||
<para>
|
||||
The <application>libpq</application> connection is in
|
||||
pipeline mode.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term>
|
||||
<literal>PQ_PIPELINE_OFF</literal>
|
||||
</term>
|
||||
<listitem>
|
||||
<para>
|
||||
The <application>libpq</application> connection is
|
||||
<emphasis>not</emphasis> in pipeline mode.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term>
|
||||
<literal>PQ_PIPELINE_ABORTED</literal>
|
||||
</term>
|
||||
<listitem>
|
||||
<para>
|
||||
The <application>libpq</application> connection is in pipeline
|
||||
mode and an error occurred while processing the current pipeline.
|
||||
The aborted flag is cleared when <function>PQgetResult</function>
|
||||
returns a result of type <literal>PGRES_PIPELINE_SYNC</literal>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
</variablelist>
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="libpq-PQenterPipelineMode">
|
||||
<term><function>PQenterPipelineMode</function><indexterm><primary>PQenterPipelineMode</primary></indexterm></term>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
Causes a connection to enter pipeline mode if it is currently idle or
|
||||
already in pipeline mode.
|
||||
|
||||
<synopsis>
|
||||
int PQenterPipelineMode(PGconn *conn);
|
||||
</synopsis>
|
||||
|
||||
</para>
|
||||
<para>
|
||||
Returns 1 for success.
|
||||
Returns 0 and has no effect if the connection is not currently
|
||||
idle, i.e., it has a result ready, or it is waiting for more
|
||||
input from the server, etc.
|
||||
This function does not actually send anything to the server,
|
||||
it just changes the <application>libpq</application> connection
|
||||
state.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="libpq-PQexitPipelineMode">
|
||||
<term><function>PQexitPipelineMode</function><indexterm><primary>PQexitPipelineMode</primary></indexterm></term>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
Causes a connection to exit pipeline mode if it is currently in pipeline mode
|
||||
with an empty queue and no pending results.
|
||||
<synopsis>
|
||||
int PQexitPipelineMode(PGconn *conn);
|
||||
</synopsis>
|
||||
</para>
|
||||
<para>
|
||||
Returns 1 for success. Returns 1 and takes no action if not in
|
||||
pipeline mode. If the current statement isn't finished processing,
|
||||
or <function>PQgetResult</function> has not been called to collect
|
||||
results from all previously sent query, returns 0 (in which case,
|
||||
use <xref linkend="libpq-PQerrorMessage"/> to get more information
|
||||
about the failure).
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="libpq-PQpipelineSync">
|
||||
<term><function>PQpipelineSync</function><indexterm><primary>PQpipelineSync</primary></indexterm></term>
|
||||
|
||||
<listitem>
|
||||
<para>
|
||||
Marks a synchronization point in a pipeline by sending a
|
||||
<link linkend="protocol-flow-ext-query">sync message</link>
|
||||
and flushing the send buffer. This serves as
|
||||
the delimiter of an implicit transaction and an error recovery
|
||||
point; see <xref linkend="libpq-pipeline-errors"/>.
|
||||
|
||||
<synopsis>
|
||||
int PQpipelineSync(PGconn *conn);
|
||||
</synopsis>
|
||||
</para>
|
||||
<para>
|
||||
Returns 1 for success. Returns 0 if the connection is not in
|
||||
pipeline mode or sending a
|
||||
<link linkend="protocol-flow-ext-query">sync message</link>
|
||||
failed.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
</sect2>
|
||||
|
||||
<sect2 id="libpq-pipeline-tips">
|
||||
<title>When to Use Pipeline Mode</title>
|
||||
|
||||
<para>
|
||||
Much like asynchronous query mode, there is no meaningful performance
|
||||
overhead when using pipeline mode. It increases client application complexity,
|
||||
and extra caution is required to prevent client/server deadlocks, but
|
||||
pipeline mode can offer considerable performance improvements, in exchange for
|
||||
increased memory usage from leaving state around longer.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Pipeline mode is most useful when the server is distant, i.e., network latency
|
||||
(<quote>ping time</quote>) is high, and also when many small operations
|
||||
are being performed in rapid succession. There is usually less benefit
|
||||
in using pipelined commands when each query takes many multiples of the client/server
|
||||
round-trip time to execute. A 100-statement operation run on a server
|
||||
300ms round-trip-time away would take 30 seconds in network latency alone
|
||||
without pipelining; with pipelining it may spend as little as 0.3s waiting for
|
||||
results from the server.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Use pipelined commands when your application does lots of small
|
||||
<literal>INSERT</literal>, <literal>UPDATE</literal> and
|
||||
<literal>DELETE</literal> operations that can't easily be transformed
|
||||
into operations on sets, or into a <literal>COPY</literal> operation.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Pipeline mode is not useful when information from one operation is required by
|
||||
the client to produce the next operation. In such cases, the client
|
||||
would have to introduce a synchronization point and wait for a full client/server
|
||||
round-trip to get the results it needs. However, it's often possible to
|
||||
adjust the client design to exchange the required information server-side.
|
||||
Read-modify-write cycles are especially good candidates; for example:
|
||||
<programlisting>
|
||||
BEGIN;
|
||||
SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
|
||||
-- result: x=2
|
||||
-- client adds 1 to x:
|
||||
UPDATE mytable SET x = 3 WHERE id = 42;
|
||||
COMMIT;
|
||||
</programlisting>
|
||||
could be much more efficiently done with:
|
||||
<programlisting>
|
||||
UPDATE mytable SET x = x + 1 WHERE id = 42;
|
||||
</programlisting>
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Pipelining is less useful, and more complex, when a single pipeline contains
|
||||
multiple transactions (see <xref linkend="libpq-pipeline-errors"/>).
|
||||
</para>
|
||||
</sect2>
|
||||
</sect1>
|
||||
|
||||
<sect1 id="libpq-single-row-mode">
|
||||
<title>Retrieving Query Results Row-by-Row</title>
|
||||
|
||||
@ -4966,6 +5477,13 @@ int PQflush(PGconn *conn);
|
||||
Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
When using pipeline mode, single-row mode needs to be activated for each
|
||||
query in the pipeline before retrieving results for that query
|
||||
with <function>PQgetResult</function>.
|
||||
See <xref linkend="libpq-pipeline-mode"/> for more information.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<variablelist>
|
||||
<varlistentry id="libpq-PQsetSingleRowMode">
|
||||
|
@ -130,6 +130,10 @@
|
||||
<application>libpq</application> library.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Client applications cannot use these functions while a libpq connection is in pipeline mode.
|
||||
</para>
|
||||
|
||||
<sect2 id="lo-create">
|
||||
<title>Creating a Large Object</title>
|
||||
|
||||
|
@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
|
||||
walres->err = _("empty query");
|
||||
break;
|
||||
|
||||
case PGRES_PIPELINE_SYNC:
|
||||
case PGRES_PIPELINE_ABORTED:
|
||||
walres->status = WALRCV_ERROR;
|
||||
walres->err = _("unexpected pipeline mode");
|
||||
break;
|
||||
|
||||
case PGRES_NONFATAL_ERROR:
|
||||
case PGRES_FATAL_ERROR:
|
||||
case PGRES_BAD_RESPONSE:
|
||||
|
@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
|
||||
case PGRES_COPY_IN:
|
||||
case PGRES_COPY_BOTH:
|
||||
case PGRES_SINGLE_TUPLE:
|
||||
case PGRES_PIPELINE_SYNC:
|
||||
case PGRES_PIPELINE_ABORTED:
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -179,3 +179,7 @@ PQgetgssctx 176
|
||||
PQsetSSLKeyPassHook_OpenSSL 177
|
||||
PQgetSSLKeyPassHook_OpenSSL 178
|
||||
PQdefaultSSLKeyPassHook_OpenSSL 179
|
||||
PQenterPipelineMode 180
|
||||
PQexitPipelineMode 181
|
||||
PQpipelineSync 182
|
||||
PQpipelineStatus 183
|
||||
|
@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* pqFreeCommandQueue
|
||||
* Free all the entries of PGcmdQueueEntry queue passed.
|
||||
*/
|
||||
static void
|
||||
pqFreeCommandQueue(PGcmdQueueEntry *queue)
|
||||
{
|
||||
while (queue != NULL)
|
||||
{
|
||||
PGcmdQueueEntry *cur = queue;
|
||||
|
||||
queue = cur->next;
|
||||
if (cur->query)
|
||||
free(cur->query);
|
||||
free(cur);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* pqDropServerData
|
||||
@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
|
||||
}
|
||||
conn->notifyHead = conn->notifyTail = NULL;
|
||||
|
||||
pqFreeCommandQueue(conn->cmd_queue_head);
|
||||
conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
|
||||
|
||||
pqFreeCommandQueue(conn->cmd_queue_recycle);
|
||||
conn->cmd_queue_recycle = NULL;
|
||||
|
||||
/* Reset ParameterStatus data, as well as variables deduced from it */
|
||||
pstatus = conn->pstatus;
|
||||
while (pstatus != NULL)
|
||||
@ -2459,6 +2482,7 @@ keep_going: /* We will come back to here until there is
|
||||
/* Drop any PGresult we might have, too */
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
conn->xactStatus = PQTRANS_IDLE;
|
||||
conn->pipelineStatus = PQ_PIPELINE_OFF;
|
||||
pqClearAsyncResult(conn);
|
||||
|
||||
/* Reset conn->status to put the state machine in the right state */
|
||||
@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
|
||||
|
||||
conn->status = CONNECTION_BAD;
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
conn->pipelineStatus = PQ_PIPELINE_OFF;
|
||||
conn->xactStatus = PQTRANS_IDLE;
|
||||
conn->options_valid = false;
|
||||
conn->nonblocking = false;
|
||||
@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
|
||||
if (conn->connip)
|
||||
free(conn->connip);
|
||||
/* Note that conn->Pfdebug is not ours to close or free */
|
||||
if (conn->last_query)
|
||||
free(conn->last_query);
|
||||
if (conn->write_err_msg)
|
||||
free(conn->write_err_msg);
|
||||
if (conn->inBuffer)
|
||||
@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
|
||||
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
conn->xactStatus = PQTRANS_IDLE;
|
||||
conn->pipelineStatus = PQ_PIPELINE_OFF;
|
||||
pqClearAsyncResult(conn); /* deallocate result */
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
release_conn_addrinfo(conn);
|
||||
@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
|
||||
return conn->be_pid;
|
||||
}
|
||||
|
||||
PGpipelineStatus
|
||||
PQpipelineStatus(const PGconn *conn)
|
||||
{
|
||||
if (!conn)
|
||||
return PQ_PIPELINE_OFF;
|
||||
|
||||
return conn->pipelineStatus;
|
||||
}
|
||||
|
||||
int
|
||||
PQconnectionNeedsPassword(const PGconn *conn)
|
||||
{
|
||||
|
@ -39,7 +39,9 @@ char *const pgresStatus[] = {
|
||||
"PGRES_NONFATAL_ERROR",
|
||||
"PGRES_FATAL_ERROR",
|
||||
"PGRES_COPY_BOTH",
|
||||
"PGRES_SINGLE_TUPLE"
|
||||
"PGRES_SINGLE_TUPLE",
|
||||
"PGRES_PIPELINE_SYNC",
|
||||
"PGRES_PIPELINE_ABORTED"
|
||||
};
|
||||
|
||||
/*
|
||||
@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
|
||||
static int PQsendDescribe(PGconn *conn, char desc_type,
|
||||
const char *desc_target);
|
||||
static int check_field_number(const PGresult *res, int field_num);
|
||||
static void pqPipelineProcessQueue(PGconn *conn);
|
||||
static int pqPipelineFlush(PGconn *conn);
|
||||
|
||||
|
||||
/* ----------------
|
||||
@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
|
||||
conn->next_result = conn->result;
|
||||
conn->result = res;
|
||||
/* And mark the result ready to return */
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
conn->asyncStatus = PGASYNC_READY_MORE;
|
||||
}
|
||||
|
||||
return 1;
|
||||
@ -1184,6 +1188,87 @@ fail:
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pqAllocCmdQueueEntry
|
||||
* Get a command queue entry for caller to fill.
|
||||
*
|
||||
* If the recycle queue has a free element, that is returned; if not, a
|
||||
* fresh one is allocated. Caller is responsible for adding it to the
|
||||
* command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
|
||||
* releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
|
||||
*
|
||||
* If allocation fails, sets the error message and returns NULL.
|
||||
*/
|
||||
static PGcmdQueueEntry *
|
||||
pqAllocCmdQueueEntry(PGconn *conn)
|
||||
{
|
||||
PGcmdQueueEntry *entry;
|
||||
|
||||
if (conn->cmd_queue_recycle == NULL)
|
||||
{
|
||||
entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
|
||||
if (entry == NULL)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("out of memory\n"));
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
entry = conn->cmd_queue_recycle;
|
||||
conn->cmd_queue_recycle = entry->next;
|
||||
}
|
||||
entry->next = NULL;
|
||||
entry->query = NULL;
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqAppendCmdQueueEntry
|
||||
* Append a caller-allocated command queue entry to the queue.
|
||||
*
|
||||
* The query itself must already have been put in the output buffer by the
|
||||
* caller.
|
||||
*/
|
||||
static void
|
||||
pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
|
||||
{
|
||||
Assert(entry->next == NULL);
|
||||
|
||||
if (conn->cmd_queue_head == NULL)
|
||||
conn->cmd_queue_head = entry;
|
||||
else
|
||||
conn->cmd_queue_tail->next = entry;
|
||||
|
||||
conn->cmd_queue_tail = entry;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqRecycleCmdQueueEntry
|
||||
* Push a command queue entry onto the freelist.
|
||||
*/
|
||||
static void
|
||||
pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
|
||||
{
|
||||
if (entry == NULL)
|
||||
return;
|
||||
|
||||
/* recyclable entries should not have a follow-on command */
|
||||
Assert(entry->next == NULL);
|
||||
|
||||
if (entry->query)
|
||||
{
|
||||
free(entry->query);
|
||||
entry->query = NULL;
|
||||
}
|
||||
|
||||
entry->next = conn->cmd_queue_recycle;
|
||||
conn->cmd_queue_recycle = entry;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PQsendQuery
|
||||
* Submit a query, but don't wait for it to finish
|
||||
@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
|
||||
static int
|
||||
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
|
||||
{
|
||||
PGcmdQueueEntry *entry = NULL;
|
||||
|
||||
if (!PQsendQueryStart(conn, newQuery))
|
||||
return 0;
|
||||
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
|
||||
/* check the argument */
|
||||
if (!query)
|
||||
{
|
||||
@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* construct the outgoing Query message */
|
||||
if (pqPutMsgStart('Q', conn) < 0 ||
|
||||
pqPuts(query, conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
/* Send the query message(s) */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
/* construct the outgoing Query message */
|
||||
if (pqPutMsgStart('Q', conn) < 0 ||
|
||||
pqPuts(query, conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
{
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* remember we are using simple query protocol */
|
||||
entry->queryclass = PGQUERY_SIMPLE;
|
||||
/* and remember the query text too, if possible */
|
||||
entry->query = strdup(query);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* In pipeline mode we cannot use the simple protocol, so we send
|
||||
* Parse, Bind, Describe Portal, Execute.
|
||||
*/
|
||||
if (pqPutMsgStart('P', conn) < 0 ||
|
||||
pqPuts("", conn) < 0 ||
|
||||
pqPuts(query, conn) < 0 ||
|
||||
pqPutInt(0, 2, conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
if (pqPutMsgStart('B', conn) < 0 ||
|
||||
pqPuts("", conn) < 0 ||
|
||||
pqPuts("", conn) < 0 ||
|
||||
pqPutInt(0, 2, conn) < 0 ||
|
||||
pqPutInt(0, 2, conn) < 0 ||
|
||||
pqPutInt(0, 2, conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
if (pqPutMsgStart('D', conn) < 0 ||
|
||||
pqPutc('P', conn) < 0 ||
|
||||
pqPuts("", conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
if (pqPutMsgStart('E', conn) < 0 ||
|
||||
pqPuts("", conn) < 0 ||
|
||||
pqPutInt(0, 4, conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* remember we are using simple query protocol */
|
||||
conn->queryclass = PGQUERY_SIMPLE;
|
||||
|
||||
/* and remember the query text too, if possible */
|
||||
/* if insufficient memory, last_query just winds up NULL */
|
||||
if (conn->last_query)
|
||||
free(conn->last_query);
|
||||
conn->last_query = strdup(query);
|
||||
entry->queryclass = PGQUERY_EXTENDED;
|
||||
entry->query = strdup(query);
|
||||
}
|
||||
|
||||
/*
|
||||
* Give the data a push. In nonblock mode, don't complain if we're unable
|
||||
* to send it all; PQgetResult() will do any additional flushing needed.
|
||||
*/
|
||||
if (pqFlush(conn) < 0)
|
||||
{
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
if (pqPipelineFlush(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* OK, it's launched! */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
pqRecycleCmdQueueEntry(conn, entry);
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
|
||||
const char *stmtName, const char *query,
|
||||
int nParams, const Oid *paramTypes)
|
||||
{
|
||||
PGcmdQueueEntry *entry = NULL;
|
||||
|
||||
if (!PQsendQueryStart(conn, true))
|
||||
return 0;
|
||||
|
||||
@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
|
||||
return 0;
|
||||
}
|
||||
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
|
||||
/* construct the Parse message */
|
||||
if (pqPutMsgStart('P', conn) < 0 ||
|
||||
pqPuts(stmtName, conn) < 0 ||
|
||||
@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
|
||||
if (pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* construct the Sync message */
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
/* Add a Sync, unless in pipeline mode. */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
}
|
||||
|
||||
/* remember we are doing just a Parse */
|
||||
conn->queryclass = PGQUERY_PREPARE;
|
||||
entry->queryclass = PGQUERY_PREPARE;
|
||||
|
||||
/* and remember the query text too, if possible */
|
||||
/* if insufficient memory, last_query just winds up NULL */
|
||||
if (conn->last_query)
|
||||
free(conn->last_query);
|
||||
conn->last_query = strdup(query);
|
||||
/* if insufficient memory, query just winds up NULL */
|
||||
entry->query = strdup(query);
|
||||
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
|
||||
/*
|
||||
* Give the data a push. In nonblock mode, don't complain if we're unable
|
||||
* to send it all; PQgetResult() will do any additional flushing needed.
|
||||
* Give the data a push (in pipeline mode, only if we're past the size
|
||||
* threshold). In nonblock mode, don't complain if we're unable to send
|
||||
* it all; PQgetResult() will do any additional flushing needed.
|
||||
*/
|
||||
if (pqFlush(conn) < 0)
|
||||
if (pqPipelineFlush(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* OK, it's launched! */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
pqRecycleCmdQueueEntry(conn, entry);
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
|
||||
}
|
||||
|
||||
/*
|
||||
* Common startup code for PQsendQuery and sibling routines
|
||||
* PQsendQueryStart
|
||||
* Common startup code for PQsendQuery and sibling routines
|
||||
*/
|
||||
static bool
|
||||
PQsendQueryStart(PGconn *conn, bool newQuery)
|
||||
@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
|
||||
libpq_gettext("no connection to the server\n"));
|
||||
return false;
|
||||
}
|
||||
/* Can't send while already busy, either. */
|
||||
if (conn->asyncStatus != PGASYNC_IDLE)
|
||||
|
||||
/* Can't send while already busy, either, unless enqueuing for later */
|
||||
if (conn->asyncStatus != PGASYNC_IDLE &&
|
||||
conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("another command is already in progress\n"));
|
||||
return false;
|
||||
}
|
||||
|
||||
/* initialize async result-accumulation state */
|
||||
pqClearAsyncResult(conn);
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
/*
|
||||
* When enqueuing commands we don't change much of the connection
|
||||
* state since it's already in use for the current command. The
|
||||
* connection state will get updated when pqPipelineProcessQueue()
|
||||
* advances to start processing the queued message.
|
||||
*
|
||||
* Just make sure we can safely enqueue given the current connection
|
||||
* state. We can enqueue behind another queue item, or behind a
|
||||
* non-queue command (one that sends its own sync), but we can't
|
||||
* enqueue if the connection is in a copy state.
|
||||
*/
|
||||
switch (conn->asyncStatus)
|
||||
{
|
||||
case PGASYNC_IDLE:
|
||||
case PGASYNC_READY:
|
||||
case PGASYNC_READY_MORE:
|
||||
case PGASYNC_BUSY:
|
||||
/* ok to queue */
|
||||
break;
|
||||
case PGASYNC_COPY_IN:
|
||||
case PGASYNC_COPY_OUT:
|
||||
case PGASYNC_COPY_BOTH:
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot queue commands during COPY\n"));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* This command's results will come in immediately. Initialize async
|
||||
* result-accumulation state
|
||||
*/
|
||||
pqClearAsyncResult(conn);
|
||||
|
||||
/* reset single-row processing mode */
|
||||
conn->singleRowMode = false;
|
||||
/* reset single-row processing mode */
|
||||
conn->singleRowMode = false;
|
||||
|
||||
}
|
||||
/* ready to send command message */
|
||||
return true;
|
||||
}
|
||||
@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
|
||||
int resultFormat)
|
||||
{
|
||||
int i;
|
||||
PGcmdQueueEntry *entry;
|
||||
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
|
||||
/*
|
||||
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
|
||||
* using specified statement name and the unnamed portal.
|
||||
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
|
||||
* (if not in pipeline mode), using specified statement name and the
|
||||
* unnamed portal.
|
||||
*/
|
||||
|
||||
if (command)
|
||||
@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* construct the Sync message */
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
/* construct the Sync message if not in pipeline mode */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
}
|
||||
|
||||
/* remember we are using extended query protocol */
|
||||
conn->queryclass = PGQUERY_EXTENDED;
|
||||
entry->queryclass = PGQUERY_EXTENDED;
|
||||
|
||||
/* and remember the query text too, if possible */
|
||||
/* if insufficient memory, last_query just winds up NULL */
|
||||
if (conn->last_query)
|
||||
free(conn->last_query);
|
||||
/* if insufficient memory, query just winds up NULL */
|
||||
if (command)
|
||||
conn->last_query = strdup(command);
|
||||
else
|
||||
conn->last_query = NULL;
|
||||
entry->query = strdup(command);
|
||||
|
||||
/*
|
||||
* Give the data a push. In nonblock mode, don't complain if we're unable
|
||||
* to send it all; PQgetResult() will do any additional flushing needed.
|
||||
* Give the data a push (in pipeline mode, only if we're past the size
|
||||
* threshold). In nonblock mode, don't complain if we're unable to send
|
||||
* it all; PQgetResult() will do any additional flushing needed.
|
||||
*/
|
||||
if (pqFlush(conn) < 0)
|
||||
if (pqPipelineFlush(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* OK, it's launched! */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
pqRecycleCmdQueueEntry(conn, entry);
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
|
||||
return 0;
|
||||
if (conn->asyncStatus != PGASYNC_BUSY)
|
||||
return 0;
|
||||
if (conn->queryclass != PGQUERY_SIMPLE &&
|
||||
conn->queryclass != PGQUERY_EXTENDED)
|
||||
if (!conn->cmd_queue_head ||
|
||||
(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
|
||||
conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
|
||||
return 0;
|
||||
if (conn->result)
|
||||
return 0;
|
||||
@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
|
||||
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PQgetResult
|
||||
* Get the next PGresult produced by a query. Returns NULL if no
|
||||
* query work remains or an error has occurred (e.g. out of
|
||||
* memory).
|
||||
*
|
||||
* In pipeline mode, once all the result of a query have been returned,
|
||||
* PQgetResult returns NULL to let the user know that the next
|
||||
* query is being processed. At the end of the pipeline, returns a
|
||||
* result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
|
||||
*/
|
||||
|
||||
PGresult *
|
||||
PQgetResult(PGconn *conn)
|
||||
{
|
||||
@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
|
||||
{
|
||||
case PGASYNC_IDLE:
|
||||
res = NULL; /* query is complete */
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
/*
|
||||
* We're about to return the NULL that terminates the round of
|
||||
* results from the current query; prepare to send the results
|
||||
* of the next query when we're called next. Also, since this
|
||||
* is the start of the results of the next query, clear any
|
||||
* prior error message.
|
||||
*/
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
pqPipelineProcessQueue(conn);
|
||||
}
|
||||
break;
|
||||
case PGASYNC_READY:
|
||||
|
||||
/*
|
||||
* For any query type other than simple query protocol, we advance
|
||||
* the command queue here. This is because for simple query
|
||||
* protocol we can get the READY state multiple times before the
|
||||
* command is actually complete, since the command string can
|
||||
* contain many queries. In simple query protocol, the queue
|
||||
* advance is done by fe-protocol3 when it receives ReadyForQuery.
|
||||
*/
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
|
||||
pqCommandQueueAdvance(conn);
|
||||
res = pqPrepareAsyncResult(conn);
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
/*
|
||||
* We're about to send the results of the current query. Set
|
||||
* us idle now, and ...
|
||||
*/
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
|
||||
/*
|
||||
* ... in cases when we're sending a pipeline-sync result,
|
||||
* move queue processing forwards immediately, so that next
|
||||
* time we're called, we're prepared to return the next result
|
||||
* received from the server. In all other cases, leave the
|
||||
* queue state change for next time, so that a terminating
|
||||
* NULL result is sent.
|
||||
*
|
||||
* (In other words: we don't return a NULL after a pipeline
|
||||
* sync.)
|
||||
*/
|
||||
if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
|
||||
pqPipelineProcessQueue(conn);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Set the state back to BUSY, allowing parsing to proceed. */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
}
|
||||
break;
|
||||
case PGASYNC_READY_MORE:
|
||||
res = pqPrepareAsyncResult(conn);
|
||||
/* Set the state back to BUSY, allowing parsing to proceed. */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
|
||||
if (!conn)
|
||||
return false;
|
||||
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Since this is the beginning of a query cycle, reset the error buffer.
|
||||
*/
|
||||
@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
|
||||
static int
|
||||
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
|
||||
{
|
||||
PGcmdQueueEntry *entry = NULL;
|
||||
|
||||
/* Treat null desc_target as empty string */
|
||||
if (!desc_target)
|
||||
desc_target = "";
|
||||
@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
|
||||
if (!PQsendQueryStart(conn, true))
|
||||
return 0;
|
||||
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
|
||||
/* construct the Describe message */
|
||||
if (pqPutMsgStart('D', conn) < 0 ||
|
||||
pqPutc(desc_type, conn) < 0 ||
|
||||
@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
|
||||
goto sendFailed;
|
||||
|
||||
/* construct the Sync message */
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* remember we are doing a Describe */
|
||||
conn->queryclass = PGQUERY_DESCRIBE;
|
||||
|
||||
/* reset last_query string (not relevant now) */
|
||||
if (conn->last_query)
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
free(conn->last_query);
|
||||
conn->last_query = NULL;
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
}
|
||||
|
||||
/* remember we are doing a Describe */
|
||||
entry->queryclass = PGQUERY_DESCRIBE;
|
||||
|
||||
/*
|
||||
* Give the data a push. In nonblock mode, don't complain if we're unable
|
||||
* to send it all; PQgetResult() will do any additional flushing needed.
|
||||
* Give the data a push (in pipeline mode, only if we're past the size
|
||||
* threshold). In nonblock mode, don't complain if we're unable to send
|
||||
* it all; PQgetResult() will do any additional flushing needed.
|
||||
*/
|
||||
if (pqFlush(conn) < 0)
|
||||
if (pqPipelineFlush(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/* OK, it's launched! */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
pqRecycleCmdQueueEntry(conn, entry);
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
|
||||
* If we sent the COPY command in extended-query mode, we must issue a
|
||||
* Sync as well.
|
||||
*/
|
||||
if (conn->queryclass != PGQUERY_SIMPLE)
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
|
||||
{
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
|
||||
*/
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("PQfn not allowed in pipeline mode\n"));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
|
||||
conn->result != NULL)
|
||||
{
|
||||
@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
|
||||
args, nargs);
|
||||
}
|
||||
|
||||
/* ====== Pipeline mode support ======== */
|
||||
|
||||
/*
|
||||
* PQenterPipelineMode
|
||||
* Put an idle connection in pipeline mode.
|
||||
*
|
||||
* Returns 1 on success. On failure, errorMessage is set and 0 is returned.
|
||||
*
|
||||
* Commands submitted after this can be pipelined on the connection;
|
||||
* there's no requirement to wait for one to finish before the next is
|
||||
* dispatched.
|
||||
*
|
||||
* Queuing of a new query or syncing during COPY is not allowed.
|
||||
*
|
||||
* A set of commands is terminated by a PQpipelineSync. Multiple sync
|
||||
* points can be established while in pipeline mode. Pipeline mode can
|
||||
* be exited by calling PQexitPipelineMode() once all results are processed.
|
||||
*
|
||||
* This doesn't actually send anything on the wire, it just puts libpq
|
||||
* into a state where it can pipeline work.
|
||||
*/
|
||||
int
|
||||
PQenterPipelineMode(PGconn *conn)
|
||||
{
|
||||
if (!conn)
|
||||
return 0;
|
||||
|
||||
/* succeed with no action if already in pipeline mode */
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
return 1;
|
||||
|
||||
if (conn->asyncStatus != PGASYNC_IDLE)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
|
||||
return 0;
|
||||
}
|
||||
|
||||
conn->pipelineStatus = PQ_PIPELINE_ON;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* PQexitPipelineMode
|
||||
* End pipeline mode and return to normal command mode.
|
||||
*
|
||||
* Returns 1 in success (pipeline mode successfully ended, or not in pipeline
|
||||
* mode).
|
||||
*
|
||||
* Returns 0 if in pipeline mode and cannot be ended yet. Error message will
|
||||
* be set.
|
||||
*/
|
||||
int
|
||||
PQexitPipelineMode(PGconn *conn)
|
||||
{
|
||||
if (!conn)
|
||||
return 0;
|
||||
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
return 1;
|
||||
|
||||
switch (conn->asyncStatus)
|
||||
{
|
||||
case PGASYNC_READY:
|
||||
case PGASYNC_READY_MORE:
|
||||
/* there are some uncollected results */
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
|
||||
return 0;
|
||||
|
||||
case PGASYNC_BUSY:
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot exit pipeline mode while busy\n"));
|
||||
return 0;
|
||||
|
||||
default:
|
||||
/* OK */
|
||||
break;
|
||||
}
|
||||
|
||||
/* still work to process */
|
||||
if (conn->cmd_queue_head != NULL)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
|
||||
return 0;
|
||||
}
|
||||
|
||||
conn->pipelineStatus = PQ_PIPELINE_OFF;
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
|
||||
/* Flush any pending data in out buffer */
|
||||
if (pqFlush(conn) < 0)
|
||||
return 0; /* error message is setup already */
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* pqCommandQueueAdvance
|
||||
* Remove one query from the command queue, when we receive
|
||||
* all results from the server that pertain to it.
|
||||
*/
|
||||
void
|
||||
pqCommandQueueAdvance(PGconn *conn)
|
||||
{
|
||||
PGcmdQueueEntry *prevquery;
|
||||
|
||||
if (conn->cmd_queue_head == NULL)
|
||||
return;
|
||||
|
||||
/* delink from queue */
|
||||
prevquery = conn->cmd_queue_head;
|
||||
conn->cmd_queue_head = conn->cmd_queue_head->next;
|
||||
|
||||
/* and make it recyclable */
|
||||
prevquery->next = NULL;
|
||||
pqRecycleCmdQueueEntry(conn, prevquery);
|
||||
}
|
||||
|
||||
/*
|
||||
* pqPipelineProcessQueue: subroutine for PQgetResult
|
||||
* In pipeline mode, start processing the results of the next query in the queue.
|
||||
*/
|
||||
void
|
||||
pqPipelineProcessQueue(PGconn *conn)
|
||||
{
|
||||
switch (conn->asyncStatus)
|
||||
{
|
||||
case PGASYNC_COPY_IN:
|
||||
case PGASYNC_COPY_OUT:
|
||||
case PGASYNC_COPY_BOTH:
|
||||
case PGASYNC_READY:
|
||||
case PGASYNC_READY_MORE:
|
||||
case PGASYNC_BUSY:
|
||||
/* client still has to process current query or results */
|
||||
return;
|
||||
case PGASYNC_IDLE:
|
||||
/* next query please */
|
||||
break;
|
||||
}
|
||||
|
||||
/* Nothing to do if not in pipeline mode, or queue is empty */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
|
||||
conn->cmd_queue_head == NULL)
|
||||
return;
|
||||
|
||||
/* Initialize async result-accumulation state */
|
||||
pqClearAsyncResult(conn);
|
||||
|
||||
/*
|
||||
* Reset single-row processing mode. (Client has to set it up for each
|
||||
* query, if desired.)
|
||||
*/
|
||||
conn->singleRowMode = false;
|
||||
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
|
||||
conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
|
||||
{
|
||||
/*
|
||||
* In an aborted pipeline we don't get anything from the server for
|
||||
* each result; we're just discarding commands from the queue until we
|
||||
* get to the next sync from the server.
|
||||
*
|
||||
* The PGRES_PIPELINE_ABORTED results tell the client that its queries
|
||||
* got aborted.
|
||||
*/
|
||||
conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
|
||||
if (!conn->result)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("out of memory\n"));
|
||||
pqSaveErrorResult(conn);
|
||||
return;
|
||||
}
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* allow parsing to continue */
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* PQpipelineSync
|
||||
* Send a Sync message as part of a pipeline, and flush to server
|
||||
*
|
||||
* It's legal to start submitting more commands in the pipeline immediately,
|
||||
* without waiting for the results of the current pipeline. There's no need to
|
||||
* end pipeline mode and start it again.
|
||||
*
|
||||
* If a command in a pipeline fails, every subsequent command up to and including
|
||||
* the result to the Sync message sent by PQpipelineSync gets set to
|
||||
* PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
|
||||
* error, a PGresult with PGRES_PIPELINE_SYNC is produced.
|
||||
*
|
||||
* Queries can already have been sent before PQpipelineSync is called, but
|
||||
* PQpipelineSync need to be called before retrieving command results.
|
||||
*
|
||||
* The connection will remain in pipeline mode and unavailable for new
|
||||
* synchronous command execution functions until all results from the pipeline
|
||||
* are processed by the client.
|
||||
*/
|
||||
int
|
||||
PQpipelineSync(PGconn *conn)
|
||||
{
|
||||
PGcmdQueueEntry *entry;
|
||||
|
||||
if (!conn)
|
||||
return 0;
|
||||
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (conn->asyncStatus)
|
||||
{
|
||||
case PGASYNC_COPY_IN:
|
||||
case PGASYNC_COPY_OUT:
|
||||
case PGASYNC_COPY_BOTH:
|
||||
/* should be unreachable */
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
"internal error: cannot send pipeline while in COPY\n");
|
||||
return 0;
|
||||
case PGASYNC_READY:
|
||||
case PGASYNC_READY_MORE:
|
||||
case PGASYNC_BUSY:
|
||||
case PGASYNC_IDLE:
|
||||
/* OK to send sync */
|
||||
break;
|
||||
}
|
||||
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
|
||||
entry->queryclass = PGQUERY_SYNC;
|
||||
entry->query = NULL;
|
||||
|
||||
/* construct the Sync message */
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
|
||||
/*
|
||||
* Give the data a push. In nonblock mode, don't complain if we're unable
|
||||
* to send it all; PQgetResult() will do any additional flushing needed.
|
||||
*/
|
||||
if (PQflush(conn) < 0)
|
||||
goto sendFailed;
|
||||
|
||||
/*
|
||||
* Call pqPipelineProcessQueue so the user can call start calling
|
||||
* PQgetResult.
|
||||
*/
|
||||
pqPipelineProcessQueue(conn);
|
||||
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
pqRecycleCmdQueueEntry(conn, entry);
|
||||
/* error message should be set up already */
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* ====== accessor funcs for PGresult ======== */
|
||||
|
||||
@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
|
||||
char *
|
||||
PQresStatus(ExecStatusType status)
|
||||
{
|
||||
if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
|
||||
if ((unsigned int) status >= lengthof(pgresStatus))
|
||||
return libpq_gettext("invalid ExecStatusType code");
|
||||
return pgresStatus[status];
|
||||
}
|
||||
@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
|
||||
return pqFlush(conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* pqPipelineFlush
|
||||
*
|
||||
* In pipeline mode, data will be flushed only when the out buffer reaches the
|
||||
* threshold value. In non-pipeline mode, it behaves as stock pqFlush.
|
||||
*
|
||||
* Returns 0 on success.
|
||||
*/
|
||||
static int
|
||||
pqPipelineFlush(PGconn *conn)
|
||||
{
|
||||
if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
|
||||
(conn->outCount >= OUTBUFFER_THRESHOLD))
|
||||
return pqFlush(conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PQfreemem - safely frees memory allocated
|
||||
|
@ -158,6 +158,18 @@ pqParseInput3(PGconn *conn)
|
||||
if (conn->asyncStatus != PGASYNC_IDLE)
|
||||
return;
|
||||
|
||||
/*
|
||||
* We're also notionally not-IDLE when in pipeline mode the state
|
||||
* says "idle" (so we have completed receiving the results of one
|
||||
* query from the server and dispatched them to the application)
|
||||
* but another query is queued; yield back control to caller so
|
||||
* that they can initiate processing of the next query in the
|
||||
* queue.
|
||||
*/
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
|
||||
conn->cmd_queue_head != NULL)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Unexpected message in IDLE state; need to recover somehow.
|
||||
* ERROR messages are handled using the notice processor;
|
||||
@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Any other case is unexpected and we summarily skip it */
|
||||
pqInternalNotice(&conn->noticeHooks,
|
||||
"message type 0x%02x arrived from server while idle",
|
||||
id);
|
||||
@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
|
||||
return;
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
break;
|
||||
case 'Z': /* backend is ready for new query */
|
||||
case 'Z': /* sync response, backend is ready for new
|
||||
* query */
|
||||
if (getReadyForQuery(conn))
|
||||
return;
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
conn->result = PQmakeEmptyPGresult(conn,
|
||||
PGRES_PIPELINE_SYNC);
|
||||
if (!conn->result)
|
||||
{
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("out of memory"));
|
||||
pqSaveErrorResult(conn);
|
||||
}
|
||||
else
|
||||
{
|
||||
conn->pipelineStatus = PQ_PIPELINE_ON;
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* In simple query protocol, advance the command queue
|
||||
* (see PQgetResult).
|
||||
*/
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
|
||||
pqCommandQueueAdvance(conn);
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
}
|
||||
break;
|
||||
case 'I': /* empty query */
|
||||
if (conn->result == NULL)
|
||||
@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
|
||||
break;
|
||||
case '1': /* Parse Complete */
|
||||
/* If we're doing PQprepare, we're done; else ignore */
|
||||
if (conn->queryclass == PGQUERY_PREPARE)
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
|
||||
{
|
||||
if (conn->result == NULL)
|
||||
{
|
||||
@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
|
||||
conn->inCursor += msgLength;
|
||||
}
|
||||
else if (conn->result == NULL ||
|
||||
conn->queryclass == PGQUERY_DESCRIBE)
|
||||
(conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
|
||||
{
|
||||
/* First 'T' in a query sequence */
|
||||
if (getRowDescriptions(conn, msgLength))
|
||||
@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
|
||||
* instead of PGRES_TUPLES_OK. Otherwise we can just
|
||||
* ignore this message.
|
||||
*/
|
||||
if (conn->queryclass == PGQUERY_DESCRIBE)
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
|
||||
{
|
||||
if (conn->result == NULL)
|
||||
{
|
||||
@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
|
||||
id, msgLength);
|
||||
/* build an error result holding the error message */
|
||||
pqSaveErrorResult(conn);
|
||||
conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
|
||||
conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */
|
||||
/* flush input data since we're giving up on processing it */
|
||||
pqDropConnection(conn, true);
|
||||
conn->status = CONNECTION_BAD; /* No more connection to backend */
|
||||
@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
|
||||
* PGresult created by getParamDescriptions, and we should fill data into
|
||||
* that. Otherwise, create a new, empty PGresult.
|
||||
*/
|
||||
if (conn->queryclass == PGQUERY_DESCRIBE)
|
||||
if (!conn->cmd_queue_head ||
|
||||
(conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
|
||||
{
|
||||
if (conn->result)
|
||||
result = conn->result;
|
||||
@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
|
||||
* If we're doing a Describe, we're done, and ready to pass the result
|
||||
* back to the client.
|
||||
*/
|
||||
if (conn->queryclass == PGQUERY_DESCRIBE)
|
||||
if ((!conn->cmd_queue_head) ||
|
||||
(conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
|
||||
{
|
||||
conn->asyncStatus = PGASYNC_READY;
|
||||
return 0;
|
||||
@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
|
||||
PQExpBufferData workBuf;
|
||||
char id;
|
||||
|
||||
/* If in pipeline mode, set error indicator for it */
|
||||
if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
conn->pipelineStatus = PQ_PIPELINE_ABORTED;
|
||||
|
||||
/*
|
||||
* If this is an error message, pre-emptively clear any incomplete query
|
||||
* result we may have. We'd just throw it away below anyway, and
|
||||
@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
|
||||
* might need it for an error cursor display, which is only true if there
|
||||
* is a PG_DIAG_STATEMENT_POSITION field.
|
||||
*/
|
||||
if (have_position && conn->last_query && res)
|
||||
res->errQuery = pqResultStrdup(res, conn->last_query);
|
||||
if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
|
||||
res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
|
||||
|
||||
/*
|
||||
* Now build the "overall" error message for PQresultErrorMessage.
|
||||
@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
|
||||
* If we sent the COPY command in extended-query mode, we must issue a
|
||||
* Sync as well.
|
||||
*/
|
||||
if (conn->queryclass != PGQUERY_SIMPLE)
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
|
||||
{
|
||||
if (pqPutMsgStart('S', conn) < 0 ||
|
||||
pqPutMsgEnd(conn) < 0)
|
||||
@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
|
||||
int avail;
|
||||
int i;
|
||||
|
||||
/* already validated by PQfn */
|
||||
Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
|
||||
|
||||
/* PQfn already validated connection state */
|
||||
|
||||
if (pqPutMsgStart('F', conn) < 0 || /* function call msg */
|
||||
|
@ -96,7 +96,10 @@ typedef enum
|
||||
PGRES_NONFATAL_ERROR, /* notice or warning message */
|
||||
PGRES_FATAL_ERROR, /* query failed */
|
||||
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
|
||||
PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
|
||||
PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
|
||||
PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
|
||||
PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
|
||||
* earlier in a pipeline */
|
||||
} ExecStatusType;
|
||||
|
||||
typedef enum
|
||||
@ -136,6 +139,16 @@ typedef enum
|
||||
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
|
||||
} PGPing;
|
||||
|
||||
/*
|
||||
* PGpipelineStatus - Current status of pipeline mode
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
PQ_PIPELINE_OFF,
|
||||
PQ_PIPELINE_ON,
|
||||
PQ_PIPELINE_ABORTED
|
||||
} PGpipelineStatus;
|
||||
|
||||
/* PGconn encapsulates a connection to the backend.
|
||||
* The contents of this struct are not supposed to be known to applications.
|
||||
*/
|
||||
@ -327,6 +340,7 @@ extern int PQserverVersion(const PGconn *conn);
|
||||
extern char *PQerrorMessage(const PGconn *conn);
|
||||
extern int PQsocket(const PGconn *conn);
|
||||
extern int PQbackendPID(const PGconn *conn);
|
||||
extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
|
||||
extern int PQconnectionNeedsPassword(const PGconn *conn);
|
||||
extern int PQconnectionUsedPassword(const PGconn *conn);
|
||||
extern int PQclientEncoding(const PGconn *conn);
|
||||
@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
|
||||
extern int PQisBusy(PGconn *conn);
|
||||
extern int PQconsumeInput(PGconn *conn);
|
||||
|
||||
/* Routines for pipeline mode management */
|
||||
extern int PQenterPipelineMode(PGconn *conn);
|
||||
extern int PQexitPipelineMode(PGconn *conn);
|
||||
extern int PQpipelineSync(PGconn *conn);
|
||||
|
||||
/* LISTEN/NOTIFY support */
|
||||
extern PGnotify *PQnotifies(PGconn *conn);
|
||||
|
||||
|
@ -217,21 +217,16 @@ typedef enum
|
||||
{
|
||||
PGASYNC_IDLE, /* nothing's happening, dude */
|
||||
PGASYNC_BUSY, /* query in progress */
|
||||
PGASYNC_READY, /* result ready for PQgetResult */
|
||||
PGASYNC_READY, /* query done, waiting for client to fetch
|
||||
* result */
|
||||
PGASYNC_READY_MORE, /* query done, waiting for client to fetch
|
||||
* result, more results expected from this
|
||||
* query */
|
||||
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
|
||||
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
|
||||
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
|
||||
} PGAsyncStatusType;
|
||||
|
||||
/* PGQueryClass tracks which query protocol we are now executing */
|
||||
typedef enum
|
||||
{
|
||||
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
|
||||
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
|
||||
PGQUERY_PREPARE, /* Parse only (PQprepare) */
|
||||
PGQUERY_DESCRIBE /* Describe Statement or Portal */
|
||||
} PGQueryClass;
|
||||
|
||||
/* Target server type (decoded value of target_session_attrs) */
|
||||
typedef enum
|
||||
{
|
||||
@ -305,6 +300,29 @@ typedef enum pg_conn_host_type
|
||||
CHT_UNIX_SOCKET
|
||||
} pg_conn_host_type;
|
||||
|
||||
/*
|
||||
* PGQueryClass tracks which query protocol is in use for each command queue
|
||||
* entry, or special operation in execution
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
|
||||
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
|
||||
PGQUERY_PREPARE, /* Parse only (PQprepare) */
|
||||
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
|
||||
PGQUERY_SYNC /* Sync (at end of a pipeline) */
|
||||
} PGQueryClass;
|
||||
|
||||
/*
|
||||
* An entry in the pending command queue.
|
||||
*/
|
||||
typedef struct PGcmdQueueEntry
|
||||
{
|
||||
PGQueryClass queryclass; /* Query type */
|
||||
char *query; /* SQL command, or NULL if none/unknown/OOM */
|
||||
struct PGcmdQueueEntry *next; /* list link */
|
||||
} PGcmdQueueEntry;
|
||||
|
||||
/*
|
||||
* pg_conn_host stores all information about each of possibly several hosts
|
||||
* mentioned in the connection string. Most fields are derived by splitting
|
||||
@ -389,12 +407,11 @@ struct pg_conn
|
||||
ConnStatusType status;
|
||||
PGAsyncStatusType asyncStatus;
|
||||
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
|
||||
PGQueryClass queryclass;
|
||||
char *last_query; /* last SQL command, or NULL if unknown */
|
||||
char last_sqlstate[6]; /* last reported SQLSTATE */
|
||||
bool options_valid; /* true if OK to attempt connection */
|
||||
bool nonblocking; /* whether this connection is using nonblock
|
||||
* sending semantics */
|
||||
PGpipelineStatus pipelineStatus; /* status of pipeline mode */
|
||||
bool singleRowMode; /* return current query result row-by-row? */
|
||||
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
|
||||
int copy_already_done; /* # bytes already returned in COPY OUT */
|
||||
@ -407,6 +424,19 @@ struct pg_conn
|
||||
pg_conn_host *connhost; /* details about each named host */
|
||||
char *connip; /* IP address for current network connection */
|
||||
|
||||
/*
|
||||
* The pending command queue as a singly-linked list. Head is the command
|
||||
* currently in execution, tail is where new commands are added.
|
||||
*/
|
||||
PGcmdQueueEntry *cmd_queue_head;
|
||||
PGcmdQueueEntry *cmd_queue_tail;
|
||||
|
||||
/*
|
||||
* To save malloc traffic, we don't free entries right away; instead we
|
||||
* save them in this list for possible reuse.
|
||||
*/
|
||||
PGcmdQueueEntry *cmd_queue_recycle;
|
||||
|
||||
/* Connection data */
|
||||
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
|
||||
* unconnected */
|
||||
@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
|
||||
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
|
||||
const char *value);
|
||||
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
|
||||
extern void pqCommandQueueAdvance(PGconn *conn);
|
||||
extern int PQsendQueryContinue(PGconn *conn, const char *query);
|
||||
|
||||
/* === in fe-protocol3.c === */
|
||||
@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
|
||||
*/
|
||||
#define pqIsnonblocking(conn) ((conn)->nonblocking)
|
||||
|
||||
/*
|
||||
* Connection's outbuffer threshold, for pipeline mode.
|
||||
*/
|
||||
#define OUTBUFFER_THRESHOLD 65536
|
||||
|
||||
#ifdef ENABLE_NLS
|
||||
extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
|
||||
extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
|
||||
|
@ -10,6 +10,7 @@ SUBDIRS = \
|
||||
delay_execution \
|
||||
dummy_index_am \
|
||||
dummy_seclabel \
|
||||
libpq_pipeline \
|
||||
plsample \
|
||||
snapshot_too_old \
|
||||
test_bloomfilter \
|
||||
|
5
src/test/modules/libpq_pipeline/.gitignore
vendored
Normal file
5
src/test/modules/libpq_pipeline/.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
# Generated subdirectories
|
||||
/log/
|
||||
/results/
|
||||
/tmp_check/
|
||||
/libpq_pipeline
|
20
src/test/modules/libpq_pipeline/Makefile
Normal file
20
src/test/modules/libpq_pipeline/Makefile
Normal file
@ -0,0 +1,20 @@
|
||||
# src/test/modules/libpq_pipeline/Makefile
|
||||
|
||||
PROGRAM = libpq_pipeline
|
||||
OBJS = libpq_pipeline.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
PG_LIBS_INTERNAL += $(libpq_pgport)
|
||||
|
||||
TAP_TESTS = 1
|
||||
|
||||
ifdef USE_PGXS
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
else
|
||||
subdir = src/test/modules/libpq_pipeline
|
||||
top_builddir = ../../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
include $(top_srcdir)/contrib/contrib-global.mk
|
||||
endif
|
1
src/test/modules/libpq_pipeline/README
Normal file
1
src/test/modules/libpq_pipeline/README
Normal file
@ -0,0 +1 @@
|
||||
Test programs and libraries for libpq
|
1303
src/test/modules/libpq_pipeline/libpq_pipeline.c
Normal file
1303
src/test/modules/libpq_pipeline/libpq_pipeline.c
Normal file
File diff suppressed because it is too large
Load Diff
28
src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
Normal file
28
src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
Normal file
@ -0,0 +1,28 @@
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Config;
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 8;
|
||||
use Cwd;
|
||||
|
||||
my $node = get_new_node('main');
|
||||
$node->init;
|
||||
$node->start;
|
||||
|
||||
my $numrows = 10000;
|
||||
$ENV{PATH} = "$ENV{PATH}:" . getcwd();
|
||||
|
||||
my ($out, $err) = run_command(['libpq_pipeline', 'tests']);
|
||||
die "oops: $err" unless $err eq '';
|
||||
my @tests = split(/\s/, $out);
|
||||
|
||||
for my $testname (@tests)
|
||||
{
|
||||
$node->command_ok(
|
||||
[ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
|
||||
"libpq_pipeline $testname");
|
||||
}
|
||||
|
||||
$node->stop('fast');
|
@ -33,10 +33,11 @@ my @unlink_on_exit;
|
||||
|
||||
# Set of variables for modules in contrib/ and src/test/modules/
|
||||
my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
|
||||
my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
|
||||
my @contrib_uselibpgport = ('oid2name', 'vacuumlo');
|
||||
my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
|
||||
my $contrib_extralibs = undef;
|
||||
my @contrib_uselibpq =
|
||||
('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
|
||||
my @contrib_uselibpgport = ('libpq_pipeline', 'oid2name', 'vacuumlo');
|
||||
my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
|
||||
my $contrib_extralibs = { 'libpq_pipeline' => ['ws2_32.lib'] };
|
||||
my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
|
||||
my $contrib_extrasource = {
|
||||
'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],
|
||||
|
@ -1563,10 +1563,12 @@ PG_Locale_Strategy
|
||||
PG_Lock_Status
|
||||
PG_init_t
|
||||
PGcancel
|
||||
PGcmdQueueEntry
|
||||
PGconn
|
||||
PGdataValue
|
||||
PGlobjfuncs
|
||||
PGnotify
|
||||
PGpipelineStatus
|
||||
PGresAttDesc
|
||||
PGresAttValue
|
||||
PGresParamDesc
|
||||
|
Loading…
Reference in New Issue
Block a user