Toy task system in PostgreSQL

I’ve been reading SQL Performance Explained book recently and got curious how performant is PostgreSQL with properly configured partial indices. Thus in this blog post I’ll construct a toy task management system example which supports inter-task dependencies.

To get started I’m using postgres 13:

CREATE TABLE tasks (
    id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
    data JSONB NOT NULL,
    deps uuid[],
    finished bool NOT NULL default false
);

The database schema is quite simple. Let’s the the producer code:

func producer(ctx context.Context) error {
	for {
		rateLimit <- struct{}{}
		if _, err := genFn(ctx, 3); err != nil {
			return err
		}
	}

}

func genFn(ctx context.Context, depth int) (*uuid.UUID, error) {
	t := time.Now()
	id := uuid.New()
	deps := make([]uuid.UUID, 0)

	numDep := int(rand.Int31n(4))
	if depth >= 3 {
		numDep = 0
	}

	for i := 0; i < numDep; i++ {
		u, err := genFn(ctx, depth + 1)
		if err != nil {
			return nil, err
		}
		deps = append(deps, *u)
	}

	conn, err := pool.Acquire(ctx)
	if err != nil {
		return nil, err
	}
	defer conn.Release()

	_, err = conn.Exec(ctx, `
		INSERT INTO tasks(id, data, deps) 
		VALUES ($1, '{}', $2)
	`, id, deps)
	if err != nil {
		return nil, err
	}
	addedProcessed.Inc()
	addedDuration.Observe(float64(time.Since(t))/ float64(time.Second))
	return &id, nil
}

It’s quite straightforward how it implements random small tree of tasks to be executed. There’s rate-limiting component to this that produced doesn’t go too far ahead of the worker since I wish not fill my database with unprocessed entries too quickly.

Anyhow this is the code for processing a single entry:

func processEntry(ctx context.Context, id uuid.UUID) error {
	t := time.Now()
	conn, err := pool.Acquire(ctx)
	if err != nil {
		return err
	}
	defer conn.Release()
	tx, err := conn.Begin(ctx)
	if err != nil {
		return err
	}
	if _, err := tx.Exec(ctx, `
UPDATE tasks
SET finished = TRUE
WHERE id = $1`, id); err != nil {
		return err
	}
	if _, err := tx.Exec(ctx, `
UPDATE tasks
SET deps = array_remove(deps, $1)
WHERE $1 = ANY (deps)
				`, id); err != nil {
		return err
	}
	if err := tx.Commit(ctx); err != nil {
		return err
	}
	opsProcessed.Inc()
	atomic.AddInt64(&totalProcessed, 1)
	opsLatency.Observe(float64(time.Since(t)) / float64(time.Second))
	return nil
}

And a small runner for a worker:

func worker(ctx context.Context) error {
	for {
		conn, err := pool.Acquire(ctx)
		if err != nil {
			return err
		}
		rows, err := conn.Query(ctx, `
SELECT id, data
FROM tasks
WHERE finished = FALSE AND deps = '{}'
`)
		if err != nil {
			conn.Release()
			return err
		}
		for rows.Next() {
			taskId := uuid.UUID{}
			data := ""
			if err := rows.Scan(&taskId, &data); err != nil {
				conn.Release()
				return err
			}
			<-rateLimit
			if err := processEntry(ctx, taskId); err != nil {
				conn.Release()
				return err
			}
		}
	}
}

For now I’ll just leave single worker & single consumer. Since there are no indices (except PK one) this is, just well not ideal. On my home PC I have throughput of 150-200 tasks processed per second with generator running waay ahead of the consumer. Though after reaching around 500 000 taks in the system this crawls to an halt, at about 25 items per seconds.

Let’s inspect the table after running this a while:

(or you know run:

INSERT INTO tasks(data, deps, finished)
SELECT '{}', '{}', TRUE
FROM generate_series(1, 500000)

)

SELECT COUNT(*)
FROM tasks
500000

So we have some task entries in here. Not a lot, but some. Now let’s inspect the tasks ready to be executed. First is the find tasks ready to be executed:

EXPLAIN ANALYSE SELECT id, data
FROM tasks
WHERE finished = FALSE AND deps = '{}';
QUERY PLAN
Seq Scan on tasks (cost=0.00..20810.55 rows=205750 width=21) (actual time=31.042..31.042 rows=0 loops=1)
Filter: ((NOT finished) AND (deps = ‘{}'::uuid[]))
Rows Removed by Filter: 500000
Planning Time: 0.032 ms
Execution Time: 31.052 ms

hmmm that’s 32ms, which is quite a lot for not a single entry being selected. Anyway, let’s try adding a partial index on this table:

CREATE INDEX tasks_ready ON tasks (id)
    INCLUDE (data)
    WHERE finished = FALSE AND deps = '{}';

This index is including non-key column data in index leaf nodes, thus all necessary data for this query is stored within an index and database won’t even have to access table data heap to satisfy this query. Due to its partial nature only ready tasks are in here; logically this improves the performance significatnly:

QUERY PLAN
Index Only Scan using tasks_ready on tasks (cost=0.12..4.14 rows=1 width=21) (actual time=0.003..0.003 rows=0 loops=1)
Heap Fetches: 0
Planning Time: 0.045 ms
Execution Time: 0.009 ms

Did this help? Yes, but the bottleneck now is the update statement. Let’s inspect it in more details:

EXPLAIN ANALYSE UPDATE tasks
SET finished = TRUE
WHERE id = '004a10f5-ac3f-44f0-a199-d469fad0a913'
QUERY PLAN
Update on tasks (cost=0.42..8.44 rows=1 width=41) (actual time=0.023..0.023 rows=0 loops=1)
-> Index Scan using tasks_pkey on tasks (cost=0.42..8.44 rows=1 width=41) (actual time=0.013..0.014 rows=1 loops=1)
Index Cond: (id = ‘004a10f5-ac3f-44f0-a199-d469fad0a913’::uuid)
Planning Time: 0.038 ms
Execution Time: 0.036 ms

hmm…this is much slower than I anticipated over; it’s doing an index scan over the pk index, and updating the table entry. With prepared statement we can get more performance out of it:

PREPARE  mark_tasks(uuid) AS
    UPDATE tasks
    SET finished = TRUE
WHERE id = $1

EXPLAIN ANALYSE EXECUTE mark_tasks('004a10f5-ac3f-44f0-a199-d469fad0a913')
QUERY PLAN
Update on tasks (cost=0.42..8.44 rows=1 width=41) (actual time=0.024..0.024 rows=0 loops=1)
-> Index Scan using tasks_pkey on tasks (cost=0.42..8.44 rows=1 width=41) (actual time=0.014..0.014 rows=1 loops=1)
Index Cond: (id = $1)
Planning Time: 0.008 ms
Execution Time: 0.043 ms

by optimizing the planning time to a minimum.

Thoght the bigger bottleneck is well…the other update statement:

PREPARE upd(uuid) AS UPDATE tasks
SET deps = array_remove(deps, $1)
WHERE $1 = ANY (deps);

EXPLAIN ANALYSE EXECUTE upd('004a10f5-ac3f-44f0-a199-d469fad0a913')
QUERY PLAN
Update on tasks (cost=0.00..19626.25 rows=2500 width=60) (actual time=42.364..42.365 rows=0 loops=1)
-> Seq Scan on tasks (cost=0.00..19626.25 rows=2500 width=60) (actual time=42.363..42.364 rows=0 loops=1)
Filter: (‘004a10f5-ac3f-44f0-a199-d469fad0a913’::uuid = ANY (deps))
Rows Removed by Filter: 529579
Planning Time: 0.059 ms
Execution Time: 42.382 ms

because this is duing sequential scan over the whole tasks table…which isn’t nice when it has 500 000 entries. Let’s add the index and change query a bit:

PREPARE upd(uuid) AS UPDATE tasks
SET deps = array_remove(deps, $1)
WHERE ARRAY[$1] <@ deps;

EXPLAIN ANALYSE EXECUTE upd('004a10f5-ac3f-44f0-a199-d469fad0a913')
QUERY PLAN
Update on tasks (cost=308.52..5909.85 rows=2648 width=60) (actual time=0.004..0.005 rows=0 loops=1)
-> Bitmap Heap Scan on tasks (cost=308.52..5909.85 rows=2648 width=60) (actual time=0.004..0.004 rows=0 loops=1)
Recheck Cond: ('{004a10f5-ac3f-44f0-a199-d469fad0a913}'::uuid[] <@ deps)
-> Bitmap Index Scan on task_deps (cost=0.00..307.86 rows=2648 width=0) (actual time=0.003..0.003 rows=0 loops=1)
Index Cond: (deps @> ‘{004a10f5-ac3f-44f0-a199-d469fad0a913}'::uuid[])
Planning Time: 0.100 ms
Execution Time: 0.020 ms

and we get the speedup we want. With those improvements we're back at 200items/second despite our table being 500k entries large. This is with single consumer
and single producer. Can we speed this up even more? Probably yes.

First let's get notified when a task is ready:

```sql
create or replace function tg_notify_ready_task ()
    returns trigger
    language plpgsql
as $$
begin
    if NEW.deps = '{}' then
        PERFORM pg_notify('ready_task', NEW.id::text);
    end if;
    RETURN NEW;
end;
$$;

CREATE TRIGGER notify_counters
    AFTER INSERT OR UPDATE
    ON tasks
    FOR EACH ROW
EXECUTE PROCEDURE tg_notify_ready_task();

and on the go side of things:

func watchReadyTasks(ctx context.Context) error {
	conn, err := pool.Acquire(ctx)
	if err != nil {
		return err
	}
	defer conn.Release()
	if _, err := conn.Conn().Exec(ctx, `LISTEN ready_task`); err != nil {
		return err
	}
	for {
		notif, err := conn.Conn().WaitForNotification(ctx)
		if err != nil {
			return err
		}
		notifCount.Inc()
		readyTasks <- uuid.MustParse(notif.Payload)
	}
}

This alone doesn’t seem to do much though. When we increse the workers from 1 to 3, it increases the total throughput to 300 tasks/seconds.

This is the limit I’ve been able to optimize this in one afternoon. The original pg database hasn’t been specifically tuned. All code is available on github

I was kinda hoping I’d get 1000 tasks/s, though ACID has its costs, despite pretty efficient indices. And I’m sure seasoned DB is screming internally at me for missing obvious performance optimization…I’d appreciate all the feedback I could get.

References