Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pq as a "jobs queue" - items can not be read concurrently when using transactional get #33

Closed
elawdio opened this issue Mar 31, 2019 · 19 comments
Labels

Comments

@elawdio
Copy link

elawdio commented Mar 31, 2019

The use-case: running multiple threads that pull an item from the queue and handle it concurrently. In addition, in case of a failure (of any kind) in the handling logic, the item should be returned to the queue and be available in the next iteration (by opening a new transaction for each pull).

I realized that my jobs are running synchronously and that all threads are blocked when one thread works on an item, and they are released only after it finishes to handle the item and releases the transaction.

I believe it happens because there's an "UPDATE" SQL statement in "_pull_item" method in "pq/init.py" which probably locks the table until the transaction is released.

Any ideas on how to make the instances run concurrently?

Code example:

import time
from datetime import datetime
from threading import Thread, get_ident
from typing import Callable
from pq import PQ, Queue
from psycopg2 import connect


def _create_queue_instance(queue_name: str):
    conn = connect('<CONN_STRING>')
    pq = PQ(conn)
    queue = pq[queue_name]
    return queue


def _job(queue: Queue, handler: Callable):
    while True:
        try:
            with queue:
                item = queue.get()
                if not item:
                    continue

                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Starting to handle an item')
                handler(item.data)
                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Done')
        except:
            print(f'An exception was thrown while handling an item for queue: {queue.name}')


def _simple_handler(item):
    print(item)
    print('sleeping for 10 seconds')
    time.sleep(10)


def main():
    # Inserting some items
    q = _create_queue_instance('test_queue')
    for i in range(5):
        q.put(f'item-{i}')

    # Starting the job instances
    threads = []
    for _ in range(5):
        q = _create_queue_instance('test_queue')
        t = Thread(target=_job,
                   args=(q, _simple_handler),
                   daemon=True)
        t.start()
        threads.append(t)

    # Blocking the app from ending.
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

The output I get:

[2019-03-31 12:45:26.840991] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-4
sleeping for 10 seconds
[2019-03-31 12:45:36.845939] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:36.854240] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-3
sleeping for 10 seconds
[2019-03-31 12:45:46.858404] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:46.866138] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-2
sleeping for 10 seconds
[2019-03-31 12:45:56.871011] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:56.878916] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-1
sleeping for 10 seconds
[2019-03-31 12:46:06.882754] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:46:06.891333] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-0
sleeping for 10 seconds
[2019-03-31 12:46:16.893778] 123145582637056 (thread) | test_queue (queue) | Done
@stas
Copy link
Collaborator

stas commented Mar 31, 2019

This looks more like a question, @elawdio please check the testcase for an example how to handle the jobs in parallel:
https://github.com/malthe/pq/blob/master/pq/tests.py#L326

Also consider using a connection pooler if you're running jobs in parallel (examples also available in the tests).

@stas stas added the question label Mar 31, 2019
@elawdio
Copy link
Author

elawdio commented Apr 1, 2019

Thanks for the reply :)
I went through the testcase that you mentioned, but I don't see how it relates to my use case.
The testcase iterates through the queue under one single transaction, so if there's a failure - it will affect all of the items that were pulled during the transaction.
In my use case, I want to open a separate transaction for each item so in case of a failure, only that item will be returned to the queue.

Regarding the connection pooler, it will not make a difference as I establish a new fresh connection with Postgres for each thread. (see the usage of '_create_queue_instance' in my code example)

Am I still missing something?

@malthe
Copy link
Owner

malthe commented Apr 1, 2019

There is no table lock. In fact, the pg library uses advisory locks. In theory, one transaction finds the next queue item and holds an advisory lock on just that item until the transaction ends. But last time I checked, the library has some rough edges on transaction management:

  • Documentation could be better
  • There are some warts in the API

@elawdio
Copy link
Author

elawdio commented Apr 1, 2019

Exactly, before I started to use this library, I was glad to see that it uses advisory locks, and this is why I was surprised by this code behavior.

Do you have an idea of what can be done in my implementation / pq implementation to make it work?
If it has something to do with pq, I'll be glad to contribute a new version based on your guidance.

@malthe
Copy link
Owner

malthe commented Apr 1, 2019

Back when #19 was reported, I had a new look at how the whole thing is set up and tried to implement changes that would better support the presumably common scenario where you pull out an item from the queue, make other database queries in the same transaction and either commit or abort the transaction.

But I struggled to get the test cases to work out and ran out of time.

@elawdio
Copy link
Author

elawdio commented Apr 1, 2019

To be honest, I didn't completely understand from your last comment if I can solve my problem with the current version of pq.
It seems like it can not be done, am I right?

I understand why #19 can be complex, but do you think that my specific problem can be solved easily? If so, as I mentioned before - I'll be glad to help.

@malthe
Copy link
Owner

malthe commented Apr 1, 2019

@stas, why did you add LIMIT 2 here: 362681e – ?

It looks like that would limit the possible concurrency to 2.

@elawdio
Copy link
Author

elawdio commented Apr 1, 2019

Thanks for helping me investigating this!

Why would that limit the concurrency to 2? Correct me if I am wrong but the change to LIMIT 2 was made in the 'put' method and not in the 'get'.

The problem that I am raising relates to being able to execute 'get' multiple times concurrently (in multiple threads) where each 'get' executed inside a separated transaction (with queue: ... handle_item_logic(queue.get()) ...), without blocking each other.

malthe added a commit that referenced this issue Apr 1, 2019
@malthe
Copy link
Owner

malthe commented Apr 1, 2019

@elawdio – using SKIP LOCKED (a newer feature from PostgreSQL 9.5+), this seems to work correctly (see https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/).

Your test script now runs as expected. Can you confirm?

@elawdio
Copy link
Author

elawdio commented Apr 1, 2019

It works amazingly! Thank you very much!
When do you think it can be merged so I can use it as a package?

@stas
Copy link
Collaborator

stas commented Apr 1, 2019

Hmm,
@elawdio, I'm still not clear on what you're trying to achieve here. Please correct me if I'm missing out something from below...

First of all, I don't know why you'd want to place a queue that's blocking into a thread with a DB connection which is shared with a bunch of other threads. This is clearly not safe and it seems like that is the reason why your first thread is blocking the other threads (based on your logs, the get_ident() is the same).

Another question that I have is how and why you're fetching the items from the queue. The Queue.get() method, will block by default (and yes, your DB connection will be in use, this is how LISTEN works and It doesn't seem to have anything to do with the transactions), if you want to reuse your connection, consider Queue.get(block=False), you're in an infinite loop anyway.

One more thing, when you do with queue: you're starting a transaction as well. Just removing that, will give different results.


A bit of context, we've been running pq in production scaling over multiple processes with dozens of threads and had no issues around transactions being blocked. My advise would be to follow strictly the best practices re parallelism when designing your queue (as in, use a connections pool, use a separate connections pool inside the job handlers and so on...). What worked for us, is to wrap every queue into a separate process and let it manage a set of threads. Obviously you can use processes all-over, but that will cost you an expensive connection to every process (also more resources and stress on your DB).

@stas
Copy link
Collaborator

stas commented Apr 1, 2019

@stas, why did you add LIMIT 2 here: 362681e – ?

The two items are used to calculate the next_timeout, this way we can look ahead and say, don't bother running another query for this amount of time, since you won't get anything.

@malthe
Copy link
Owner

malthe commented Apr 2, 2019

@stas – he's not sharing the connection, each thread opens its own connection.

Which means that there's something incorrect about our tests because they didn't pick this up.

@elawdio
Copy link
Author

elawdio commented Apr 2, 2019

Hi @stas, as @malthe just mentioned,

  1. I am not sharing 1 DB connection string among all threads, I open a new connection with the DB for each thread (please refer to '_create_queue_instance' function and its usage in my code example).
    So, why should the first thread block the others?

  2. Although my code example doesn't use queue.get(block=False), it shouldn't block because LISTEN is not enabled specifically in my dev environment. But anyways, as mentioned in the previous clause I use a different connection for each thread.

  3. I need the with queue:so in case of failure (of any kind) the item will be returned to the queue.
    e.g: while handling an item the proccess is shutdown - by using with queue: the item will not be lost.

@malthe
Copy link
Owner

malthe commented Apr 2, 2019

@stas – re LIMIT 2 – gotcha. That's also supported by the code in this PR.

@stas
Copy link
Collaborator

stas commented Apr 2, 2019

Apologies, indeed, I missed the part where connection is initiated.

I started looking at the locks in the running database. And it looks like there's a race condition or something. There are 28 locks with the query RELEASE SAVEPOINT pq. I still belive that with queue: is the issue here.

Take a look at this query while running your example:

SELECT 
  datname,
  relation,
  mode,
  granted,
  query,
  state,
  pg_locks.pid
FROM pg_stat_activity
JOIN pg_locks on pg_locks.pid = pg_stat_activity.pid
WHERE state != 'idle'
AND query NOT LIKE '% FROM pg_stat_activity %' 
;

It would be great if we could find a technical explanation of the current use-case. Though again, I doubt running pq in such a setup makes any sense (one connection per queue in a thread). Threads are just a pain to manage.

@malthe
Copy link
Owner

malthe commented Apr 2, 2019

@stas, well truth be told, that's the whole point of pq – to be used in a concurrent scenario.

@elawdio
Copy link
Author

elawdio commented Apr 7, 2019

Can I kindly ask you when are you going to release a new version of pq including the skip locked change?

@malthe
Copy link
Owner

malthe commented Apr 7, 2019

This has now been fixed as is available in release 1.7.0.

@malthe malthe closed this as completed Apr 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants