/*
+ * Arrange to remove a dynamic shared memory mapping at cleanup time.
+ *
+ * dsm_keep_mapping() can be used to preserve a mapping for the entire
+ * lifetime of a process; this function reverses that decision, making
+ * the segment owned by the current resource owner. This may be useful
+ * just before performing some operation that will invalidate the segment
+ * for future use by this backend.
+ */
+void
+dsm_unkeep_mapping(dsm_segment *seg)
+{
+ Assert(seg->resowner == NULL);
+ ResourceOwnerEnlargeDSMs(CurrentResourceOwner);
+ seg->resowner = CurrentResourceOwner;
+ ResourceOwnerRememberDSM(seg->resowner, seg);
+}
Hm, I dislike the name unkeep. I guess you want to be symmetric to
dsm_keep_mapping? dsm_manage_mapping(), dsm_ensure_mapping_cleanup()
dm_remember_mapping()?
From c835a06f20792556d35a0eee4c2fa21f5f23e8a3 Mon Sep 17 00:00:00 2001
Date: Fri, 11 Jul 2014 09:53:40 -0400
Subject: [PATCH 6/6] pg_background: Run commands in a background worker, and
get the results.
The currently-active GUC values from the user session will be copied
to the background worker. If the command returns a result set, you
can retrieve the result set; if not, you can retrieve the command
tags. If the command fails with an error, the same error will be
thrown in the launching process when the results are retrieved.
Warnings and other messages generated by the background worker, and
notifications received by it, are also propagated to the foreground
process.
I got to ask: Why is it helpful that we have this in contrib? I have a
good share of blame to bear for that, but I think we need to stop
dilluting contrib evermore with test programs. These have a place, but I
don't think it should be contrib.
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define PG_BACKGROUND_MAGIC 0x50674267
+#define PG_BACKGROUND_KEY_FIXED_DATA 0
+#define PG_BACKGROUND_KEY_SQL 1
+#define PG_BACKGROUND_KEY_GUC 2
+#define PG_BACKGROUND_KEY_QUEUE 3
+#define PG_BACKGROUND_NKEYS 4
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct pg_background_fixed_data
+{
+ Oid database_id;
+ Oid authenticated_user_id;
+ Oid current_user_id;
+ int sec_context;
+ char database[NAMEDATALEN];
+ char authenticated_user[NAMEDATALEN];
+} pg_background_fixed_data;
Why not NameData?
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+ pid_t pid;
+ dsm_segment *seg;
+ BackgroundWorkerHandle *handle;
+ shm_mq_handle *responseq;
+ bool consumed;
+} pg_background_worker_info;
whitespace damage.
+static HTAB *worker_hash;
Hm. So the lifetime of this hash's contents is managed via
on_dsm_detach(), do I understand that correctly?
+PG_FUNCTION_INFO_V1(pg_background_launch);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_detach);
+void pg_background_worker_main(Datum);
+
+/*
+ * Start a dynamic background worker to run a user-specified SQL command.
+ */
+Datum
+pg_background_launch(PG_FUNCTION_ARGS)
+{
+ text *sql = PG_GETARG_TEXT_PP(0);
+ int32 queue_size = PG_GETARG_INT64(1);
+ int32 sql_len = VARSIZE_ANY_EXHDR(sql);
+ Size guc_len;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc_estimator e;
+ shm_toc *toc;
+ char *sqlp;
wrong indentation.
+ char *gucstate;
+ shm_mq *mq;
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *worker_handle;
+ pg_background_fixed_data *fdata;
+ pid_t pid;
+ shm_mq_handle *responseq;
+ MemoryContext oldcontext;
+
+ /* Ensure a valid queue size. */
+ if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("queue size must be at least %zu bytes",
+ shm_mq_minimum_size)));
Hm. So every user can do this once the extension is created as the
functions are most likely to be PUBLIC. Is this a good idea?
+ /* Wait for the worker to start. */
+ switch (WaitForBackgroundWorkerStartup(worker_handle, &pid))
+ {
+ /* Success. */
+ break;
+ pfree(worker_handle);
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not start background process"),
+ errhint("More details may be available in the server log.")));
+ break;
space vs. tab.
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ shm_mq_result res;
+ FuncCallContext *funcctx;
+ TupleDesc tupdesc;
+ StringInfoData msg;
+ pg_background_result_state *state;
+
+ /* First-time setup. */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+ pg_background_worker_info *info;
+
+ funcctx = SRF_FIRSTCALL_INIT();
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* See if we have a connection to the specified PID. */
+ if ((info = find_worker_info(pid)) == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+
+ /* Can't read results twice. */
+ if (info->consumed)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("results for PID %d have already been consumed", pid)));
+ info->consumed = true;
trailing whitespace.
+ /*
+ * Whether we succeed or fail, a future invocation of this function
+ * may not try to read from the DSM once we've begun to do so.
+ * Accordingly, make arrangements to clean things up at end of query.
+ */
+ dsm_unkeep_mapping(info->seg);
+
+ /* Set up tuple-descriptor based on colum definition list. */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record"),
+ errhint("Try calling the function in the FROM clause "
+ "using a column definition list.")));
Hm, normally we don't add linebreaks inside error messages.
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /* Cache state that will be needed on every call. */
+ state = palloc0(sizeof(pg_background_result_state));
+ state->info = info;
+ if (funcctx->tuple_desc->natts > 0)
+ {
+ int natts = funcctx->tuple_desc->natts;
+ int i;
+
+ state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
+ state->typioparams = palloc(sizeof(Oid) * natts);
+
+ for (i = 0; i < natts; ++i)
+ {
+ Oid receive_function_id;
+
+ getTypeBinaryInputInfo(funcctx->tuple_desc->attrs[i]->atttypid,
+ &receive_function_id,
+ &state->typioparams[i]);
+ fmgr_info(receive_function_id, &state->receive_functions[i]);
+ }
+ }
I'm unsure right now about the rules surrounding this, but shouldn't we
check that the user is allowed to execute these? And shouldn't we fall
back to non binary functions if no binary ones are available?
+ /* Read and processes messages from the shared memory queue. */
+ for (;;)
+ {
+ char msgtype;
+ Size nbytes;
+ void *data;
+
+ /* Get next message. */
+ res = shm_mq_receive(state->info->responseq, &nbytes, &data, false);
+ if (res != SHM_MQ_SUCCESS)
+ break;
+
+ /*
+ * Message-parsing routines operate on a null-terminated StringInfo,
+ * so we must construct one.
+ */
+ resetStringInfo(&msg);
+ enlargeStringInfo(&msg, nbytes);
+ msg.len = nbytes;
+ memcpy(msg.data, data, nbytes);
+ msg.data[nbytes] = '\0';
+ msgtype = pq_getmsgbyte(&msg);
+
+ /* Dispatch on message type. */
+ switch (msgtype)
+ {
+ {
+ ErrorData edata;
+
+ /* Parse ErrorResponse or NoticeResponse. */
+ pq_parse_errornotice(&msg, &edata);
+
+ /*
+ * Limit the maximum error level to ERROR. We don't want
+ * a FATAL inside the background worker to kill the user
+ * session.
+ */
+ if (edata.elevel > ERROR)
+ edata.elevel = ERROR;
Hm. But we still should report that it FATALed? Maybe add error context
notice about it? Not nice, but I don't have a immediately better idea. I
think it generally would be rather helpful to add the information that
this wasn't originally an error triggered by this process. The user
might otherwise be confused when looking for the origin of the error in
the log.
+ {
+ /* Propagate NotifyResponse. */
+ pq_putmessage(msg.data[0], &msg.data[1], nbytes - 1);
+ break;
Hm. Are we sure to be in a situation where the client expects these? And
are we sure their encoding is correct? The other data goe through
input/output methods checking for that, but here we don't. And the other
side AFAICS could have done a SET client_encoding.
+ /* If no data rows, return the command tags instead. */
+ if (!state->has_row_description)
+ {
+ if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query did not return a result set, but "
+ "result rowtype is not a single text column")));
+ if (state->command_tags != NIL)
+ {
+ char *tag = linitial(state->command_tags);
+ Datum value;
+ bool isnull;
+ HeapTuple result;
+
+ state->command_tags = list_delete_first(state->command_tags);
+ value = PointerGetDatum(cstring_to_text(tag));
+ isnull = false;
+ result = heap_form_tuple(tupdesc, &value, &isnull);
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+ }
trailing whitespace.
+/*
+ * Parse a DataRow message and form a result tuple.
+ */
+static HeapTuple
+form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
+ StringInfo msg)
+{
+ /* Handle DataRow message. */
+ int16 natts = pq_getmsgint(msg, 2);
+ int16 i;
+ Datum *values = NULL;
+ bool *isnull = NULL;
+ StringInfoData buf;
+
+ if (!state->has_row_description)
+ elog(ERROR, "DataRow not preceded by RowDescription");
+ if (natts != tupdesc->natts)
+ elog(ERROR, "malformed DataRow");
+ if (natts > 0)
+ {
+ values = palloc(natts * sizeof(Datum));
+ isnull = palloc(natts * sizeof(bool));
+ }
+ initStringInfo(&buf);
+
+ for (i = 0; i < natts; ++i)
+ {
+ int32 bytes = pq_getmsgint(msg, 4);
+
+ if (bytes < 0)
+ {
+ values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+ NULL,
+ state->typioparams[i],
+ tupdesc->attrs[i]->atttypmod);
+ isnull[i] = true;
+ }
+ else
+ {
+ resetStringInfo(&buf);
+ appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes);
+ values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+ &buf,
+ state->typioparams[i],
+ tupdesc->attrs[i]->atttypmod);
+ isnull[i] = false;
+ }
+ }
Hm. I think you didn't check that the typemods are the same above.
+/*
+ * Detach from the dynamic shared memory segment used for communication with
+ * a background worker. This prevents the worker from stalling waiting for
+ * us to read its results.
+ */
+Datum
+pg_background_detach(PG_FUNCTION_ARGS)
+{
+ int32 pid = PG_GETARG_INT32(0);
+ pg_background_worker_info *info;
+
+ info = find_worker_info(pid);
+ if (info == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("PID %d is not attached to this session", pid)));
+ dsm_detach(info->seg);
+
+ PG_RETURN_VOID();
+}
So there 's really no limit of who is allowed to do stuff like
this. I.e. any SECURITY DEFINER and such may do the same.
+/*
+ * Background worker entrypoint.
+ */
+void
+pg_background_worker_main(Datum main_arg)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ pg_background_fixed_data *fdata;
+ char *sql;
+ char *gucstate;
+ shm_mq *mq;
+ shm_mq_handle *responseq;
+
+ /* Establish signal handlers. */
+ pqsignal(SIGTERM, handle_sigterm);
Hm. No SIGINT?
+ /* Find data structures in dynamic shared memory. */
+ fdata = shm_toc_lookup(toc, PG_BACKGROUND_KEY_FIXED_DATA);
+ sql = shm_toc_lookup(toc, PG_BACKGROUND_KEY_SQL);
+ gucstate = shm_toc_lookup(toc, PG_BACKGROUND_KEY_GUC);
+ mq = shm_toc_lookup(toc, PG_BACKGROUND_KEY_QUEUE);
+ shm_mq_set_sender(mq, MyProc);
+ responseq = shm_mq_attach(mq, seg, NULL);
Don't these need to ensure that values have been found? shm_toc_lookup
returns NULL for unknown itmes and such and such?
+ /* Redirect protocol messages to responseq. */
+ pq_redirect_to_shm_mq(mq, responseq);
+
+ /*
+ * Initialize our user and database ID based on the strings version of
+ * the data, and then go back and check that we actually got the database
+ * and user ID that we intended to get. We do this because it's not
+ * impossible for the process that started us to die before we get here,
+ * and the user or database could be renamed in the meantime. We don't
+ * want to latch on the wrong object by accident. There should probably
+ * be a variant of BackgroundWorkerInitializeConnection that accepts OIDs
+ * rather than strings.
+ */
+ BackgroundWorkerInitializeConnection(fdata->database,
+ fdata->authenticated_user);
+ if (fdata->database_id != MyDatabaseId ||
+ fdata->authenticated_user_id != GetAuthenticatedUserId())
+ ereport(ERROR,
+ (errmsg("user or database renamed during pg_background startup")));
+
+ /* Restore GUC values from launching backend. */
+ StartTransactionCommand();
+ RestoreGUCState(gucstate);
+ CommitTransactionCommand();
I haven't read the guc save patch, but is it a) required to this in a
transaction? We normally reload the config even without. b) correct to
do? What's with SET LOCAL variables?
+ /* Restore user ID and security context. */
+ SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+ /* Prepare to execute the query. */
+ SetCurrentStatementStartTimestamp();
+ debug_query_string = sql;
+ pgstat_report_activity(STATE_RUNNING, sql);
+ StartTransactionCommand();
+ if (StatementTimeout > 0)
+ enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
+ else
+ disable_timeout(STATEMENT_TIMEOUT, false);
I doubt that actually works correctly without a SIGINT handler as
statement timeout just falls back to kill(SIGINT)? Or does it, because
it falls back to just doing a proc_exit()? If so, is that actually safe?
+ /* Execute the query. */
+ execute_sql_string(sql);
+
+ /* Post-execution cleanup. */
+ disable_timeout(STATEMENT_TIMEOUT, false);
+ CommitTransactionCommand();
So, we're allowed to do nearly arbitrary nastyness here...
+/*
+ * Execute given SQL string.
+ *
+ * Using SPI here would preclude backgrounding commands like VACUUM which one
+ * might very well wish to launch in the background. So we do this instead.
+ */
+static void
+execute_sql_string(const char *sql)
+{
+ List *raw_parsetree_list;
+ ListCell *lc1;
+ bool isTopLevel;
+ int commands_remaining;
+ MemoryContext parsecontext;
+ MemoryContext oldcontext;
+
+ /*
+ * Parse the SQL string into a list of raw parse trees.
+ *
+ * Because we allow statements that perform internal transaction control,
+ * we can't do this in TopTransactionContext; the parse trees might get
+ * blown away before we're done executing them.
+ */
+ parsecontext = AllocSetContextCreate(TopMemoryContext,
+ "pg_background parse/plan",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
Not that it hugely matters, but shouldn't this rather be
TopTransactionContext?
+ oldcontext = MemoryContextSwitchTo(parsecontext);
+ raw_parsetree_list = pg_parse_query(sql);
+ commands_remaining = list_length(raw_parsetree_list);
+ isTopLevel = commands_remaining == 1;
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Do parse analysis, rule rewrite, planning, and execution for each raw
+ * parsetree. We must fully execute each query before beginning parse
+ * analysis on the next one, since there may be interdependencies.
+ */
+ foreach(lc1, raw_parsetree_list)
+ {
+ Node *parsetree = (Node *) lfirst(lc1);
+ const char *commandTag;
+ char completionTag[COMPLETION_TAG_BUFSIZE];
+ List *querytree_list,
+ *plantree_list;
borked indentation.
+ bool snapshot_set = false;
+ Portal portal;
+ DestReceiver *receiver;
+ int16 format = 1;
+
+ /*
+ * We don't allow transaction-control commands like COMMIT and ABORT
+ * here. The entire SQL statement is executed as a single transaction
+ * which commits if no errors are encountered.
+ */
+ if (IsA(parsetree, TransactionStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("transaction control statements are not allowed in pg_background")));
Hm. I don't think that goes far enough. This allows commands that
internally stop/start transactions like CREATE INDEX CONCURRETNLY. Are
you sure that's working right now?
+ /*
+ * Get the command name for use in status display (it also becomes the
+ * default completion tag, down inside PortalRun). Set ps_status and
+ * do any special start-of-SQL-command processing needed by the
+ * destination.
+ */
+ commandTag = CreateCommandTag(parsetree);
+ set_ps_display(commandTag, false);
+ BeginCommand(commandTag, DestNone);
indentation.
+ /* Set up a snapshot if parse analysis/planning will need one. */
+ if (analyze_requires_snapshot(parsetree))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ /*
+ * OK to analyze, rewrite, and plan this query.
+ *
+ * As with parsing, we need to make sure this data outlives the
+ * transaction, because of the possibility that the statement might
+ * perform internal transaction control.
+ */
+ oldcontext = MemoryContextSwitchTo(parsecontext);
+ querytree_list = pg_analyze_and_rewrite(parsetree, sql, NULL, 0);
+ plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+
+ /* Done with the snapshot used for parsing/planning */
+ if (snapshot_set)
+ PopActiveSnapshot();
+ /* If we got a cancel signal in analysis or planning, quit */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Execute the query using the unnamed portal.
+ */
+ portal = CreatePortal("", true, true);
+ /* Don't display the portal in pg_cursors */
+ portal->visible = false;
indentation.
+ PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
+ PortalStart(portal, NULL, 0, InvalidSnapshot);
+ PortalSetResultFormat(portal, 1, &format); /* binary format */
+
+ /*
+ * Tuples returned by any command other than the last are simply
+ * discarded; but those returned by the last (or only) command are
+ * redirected to the shared memory queue we're using for communication
+ * with the launching backend. If the launching backend is gone or has
+ * detached us, these messages will just get dropped on the floor.
+ */
+ --commands_remaining;
+ if (commands_remaining > 0)
+ receiver = CreateDestReceiver(DestNone);
+ else
+ {
+ receiver = CreateDestReceiver(DestRemote);
+ SetRemoteDestReceiverParams(receiver, portal);
+ }
+
+ /*
+ * Only once the portal and destreceiver have been established can
+ * we return to the transaction context. All that stuff needs to
+ * survive an internal commit inside PortalRun!
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Here's where we actually execute the command. */
+ (void) PortalRun(portal, FETCH_ALL, isTopLevel, receiver, receiver,
+ completionTag);
+
+ /* Clean up the receiver. */
+ (*receiver->rDestroy) (receiver);
+
+ /*
+ * Send a CommandComplete message even if we suppressed the query
+ * results. The user backend will report these in the absence of
+ * any true query results.
+ */
+ EndCommand(completionTag, DestRemote);
Hm. This is a fair amount of code copied from postgres.c.
I think this is interesting work, but I doubt it's ready yet. I need to
read the preceding patches, to really understand where breakage lies
hidden.
Greetings,
Andres Freund
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-***@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers