Flask by Example – Implementing a Redis Task Queue

Flask by Example – Implementing a Redis Task Queue

This part of the tutorial details how to implement a Redis task queue to handle text processing.

Updates:

  • 02/12/2020: Upgraded to Python version 3.8.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details. Mention a bug in the latest RQ version and provide a solution. Solved the http before https bug.
  • 03/22/2016: Upgraded to Python version 3.5.1 as well as the latest versions of Redis, Python Redis, and RQ. See below for details.
  • 02/22/2015: Added Python 3 support.

Remember: Here’s what we’re building—A Flask app that calculates word-frequency pairs based on the text from a given URL.

  1. Part One: Set up a local development environment and then deploy both a staging and a production environment on Heroku.
  2. Part Two: Set up a PostgreSQL database along with SQLAlchemy and Alembic to handle migrations.
  3. Part Three: Add in the back-end logic to scrape and then process the word counts from a webpage using the requests, BeautifulSoup, and Natural Language Toolkit (NLTK) libraries.
  4. Part Four: Implement a Redis task queue to handle the text processing. (current)
  5. Part Five: Set up Angular on the front-end to continuously poll the back-end to see if the request is done processing.
  6. Part Six: Push to the staging server on Heroku - setting up Redis and detailing how to run two processes (web and worker) on a single Dyno.
  7. Part Seven: Update the front-end to make it more user-friendly.
  8. Part Eight: Create a custom Angular Directive to display a frequency distribution chart using JavaScript and D3.

Need the code? Grab it from the repo.

Install Requirements

Tools used:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - a simple library for creating a task queue

Start by downloading and installing Redis from either the official site or via Homebrew (brew install redis). Once installed, start the Redis server:

Shell
$ redis-server

Next install Python Redis and RQ in a new terminal window:

Shell
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt

Set up the Worker

Let’s start by creating a worker process to listen for queued tasks. Create a new file worker.py, and add this code:

Python
import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Here, we listened for a queue called default and established a connection to the Redis server on localhost:6379.

Fire this up in another terminal window:

Shell
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Now we need to update our app.py to send jobs to the queue…

Update app.py

Add the following imports to app.py:

Python
from rq import Queue
from rq.job import Job
from worker import conn

Then update the configuration section:

Python
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) set up a Redis connection and initialized a queue based on that connection.

Move the text processing functionality out of our index route and into a new function called count_and_save_words(). This function accepts one argument, a URL, which we will pass to it when we call it from our index route.

Python
def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Take note of the following code:

Python
job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Note: We need to import the count_and_save_words function in our function index as the RQ package currently has a bug, where it won’t find functions in the same module.

Here we used the queue that we initialized earlier and called the enqueue_call() function. This added a new job to the queue and that job ran the count_and_save_words() function with the URL as the argument. The result_ttl=5000 line argument tells RQ how long to hold on to the result of the job for - 5,000 seconds, in this case. Then we outputted the job id to the terminal. This id is needed to see if the job is done processing.

Let’s setup a new route for that…

Get Results

Python
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Let’s test this out.

Fire up the server, navigate to http://localhost:5000/, use the URL https://realpython.com, and grab the job id from the terminal. Then use that id in the ‘/results/’ endpoint - i.e., http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

As long as less than 5,000 seconds have elapsed before you check the status, then you should see an id number, which is generated when we add the results to the database:

Python
# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Now, let’s refactor the route slightly to return the actual results from the database in JSON:

Python
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Make sure to add the import:

Python
from flask import jsonify

Test this out again. If all went well, you should see something similar to in your browser:

JSON
[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]

What’s Next?

In Part 5 we’ll bring the client and server together by adding Angular into the mix to create a poller, which will send a request every five seconds to the /results/<job_key> endpoint asking for updates. Once the data is available, we’ll add it to the DOM.

Cheers!


This is a collaboration piece between Cam Linke, co-founder of Startup Edmonton, and the folks at Real Python

🐍 Python Tricks 💌

Get a short & sweet Python Trick delivered to your inbox every couple of days. No spam ever. Unsubscribe any time. Curated by the Real Python team.

Python Tricks Dictionary Merge

About The Team

Each tutorial at Real Python is created by a team of developers so that it meets our high quality standards. The team members who worked on this tutorial are:

Master Real-World Python Skills With Unlimited Access to Real Python

Locked learning resources

Join us and get access to thousands of tutorials, hands-on video courses, and a community of expert Pythonistas:

Level Up Your Python Skills »

Master Real-World Python Skills
With Unlimited Access to Real Python

Locked learning resources

Join us and get access to thousands of tutorials, hands-on video courses, and a community of expert Pythonistas:

Level Up Your Python Skills »

What Do You Think?

Rate this article:

What’s your #1 takeaway or favorite thing you learned? How are you going to put your newfound skills to use? Leave a comment below and let us know.

Commenting Tips: The most useful comments are those written with the goal of learning from or helping out other students. Get tips for asking good questions and get answers to common questions in our support portal.


Looking for a real-time conversation? Visit the Real Python Community Chat or join the next “Office Hours” Live Q&A Session. Happy Pythoning!

Keep Learning

Related Tutorial Categories: databases flask web-dev