Flask

From bibbleWiki
Jump to navigation Jump to search

Introduction

Quick tour of the python framework flask. Most of this has been taken from https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-i-hello-world

When I started the project flask we not found. I either started a new terminal to fix the probably or install flask with

python3 -m pip install flask --upgrade

Getting Started

from app import app

@app.route('/')
@app.route('/index')
def index():
    user = {'username': 'Miguel'}
    return '''
<html>
    <head>
        <title>Home Page - Microblog</title>
    </head>
    <body>
        <h1>Hello, ''' + user['username'] + '''!</h1>
    </body>
</html>'''

Routing

Templates

So flask like others such pug or egs has templates. Very similar indeed. It supports inheritance for navigation and footers

from flask import render_template
from app import app

@app.route('/')
@app.route('/index')
def index():
    user = {'username': 'Miguel'}
    posts = [
        {
            'author': {'username': 'John'},
            'body': 'Beautiful day in Portland!'
        },
        {
            'author': {'username': 'Susan'},
            'body': 'The Avengers movie was so cool!'
        }
    ]
    return render_template('index.html', title='Home', user=user, posts=posts)

And the template

<html>
    <head>
        {% if title %}
        <title>{{ title }} - Microblog</title>
        {% else %}
        <title>Welcome to Microblog</title>
        {% endif %}
    </head>
    <body>
        <h1>Hi, {{ user.username }}!</h1>
        {% for post in posts %}
        <div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
        {% endfor %}
    </body>
</html>

We can include subtemplates like this.

    {% for post in posts %}
        {% include '_post.html' %}
    {% endfor %}

Forms

Introduction

Flask uses Flask-WTF for forms which is a wrapper for WTFForms. To configure this we need a secret key which is attached to requests to help prevent CSRF. This is stored in the root or the project e.g. config.py and loaded by the

import os

class Config(object):
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'

And in __init__.py

from flask import Flask
from config import Config
app = Flask(__name__)

app.config.from_object(Config)
from app import routes

Form Example

Don't want to cut and paste too must from the example but here is the basics for forms

  • Define the form in python
  • Define the template
  • Render the form

Python Code

from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired

class LoginForm(FlaskForm):
    username = StringField('Username', validators=[DataRequired()])
    password = PasswordField('Password', validators=[DataRequired()])
    remember_me = BooleanField('Remember Me')
    submit = SubmitField('Sign In')

Template

The novalidate attribute is used to tell the web browser to not apply validation to the fields in this form, which effectively leaves this task to the Flask application running in the server. Using novalidate is entirely optional.

The form.hidden_tag() template argument generates a hidden field that includes a token that is used to protect the form against CSRF attacks.

{% extends "base.html" %}

{% block content %}
    <h1>Sign In</h1>
    <form action="" method="post" novalidate>
        {{ form.hidden_tag() }}
        <p>
            {{ form.username.label }}<br>
            {{ form.username(size=32) }}<br>
            {% for error in form.username.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>
            {{ form.password.label }}<br>
            {{ form.password(size=32) }}<br>
            {% for error in form.password.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
        <p>{{ form.submit() }}</p>
    </form>
{% endblock %}

Rendering

from flask import render_template
from app import app
from app.forms import LoginForm

# ...

@app.route('/login', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if form.validate_on_submit():
        flash('Login requested for user {}, remember_me={}'.format(
            form.username.data, form.remember_me.data))
        return redirect('/index')
    return render_template('login.html', title='Sign In', form=form)

Databases

Tools of the Trade

For Flask we use flask-sqlalchemy which is a wrapper for SQLAlchemy which is in turn a ORM (Object Relational Mapper). Like Room we can also use Flask-Migrate which is a wrapper for Alembic and database migration tool.

SQLite Config Example

First we add the connection string to the config

import os
basedir = os.path.abspath(os.path.dirname(__file__))

class Config(object):
    # ...
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'sqlite:///' + os.path.join(basedir, 'app.db')
    SQLALCHEMY_TRACK_MODIFICATIONS = False

Then initialise in the __init__.py by adding the db and the migrate

from flask import Flask
from config import Config
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
migrate = Migrate(app, db)

from app import routes, models

Table Definition Example

Nothing to see here except the __repr__ which is the python equivalent of toString(). It really could use some descent formatting. Almost unreadable.

from app import db

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), index=True, unique=True)
    email = db.Column(db.String(120), index=True, unique=True)
    password_hash = db.Column(db.String(128))

    def __repr__(self):
        return '<User {}>'.format(self.username)

Initialize

The flask command relies on FLASK_APP being set to work so make sure this is set.

If flask not found look at comment at the top of this page. Make sure the comments are for your table and not users.
flask db init
flask db migrate
# flask db upgrade

Database Problems

It looks to me that the Flask 2.0 does not have concept of update/downgrade in it's current form as it looks for a application factory to work with. I have to change the tutorial to use the following

microblog.py

The shell_context_processor sets up the flask shell.

from app import create_app, db
from app.models import User, Post

app = create_app()

@app.shell_context_processor
def make_shell_context():
    return {'db': db, 'User': User, 'Post': Post}

init.py

Had to change the __init__.py to below. This mean creating a blueprint and adding it to the app. The routes and forms were moved to app/main

from flask import Flask, current_app
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from config import Config

db = SQLAlchemy()
migrate = Migrate()

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)
    db.init_app(app)
    migrate.init_app(app, db)
        
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    return app
from app import models

main package

As said above the blueprint is created in init.py. Good way to learn the approach with these problems.

init.py

from flask import Blueprint
bp = Blueprint('main', __name__)
from app.main import routes

Forms

These are unchanged

Routes

The routes now point to the blueprint rather than the app.

from flask import render_template, flash, redirect
from app.main.forms import LoginForm
from app.main import bp

@bp.route('/login', methods=['GET', 'POST'])
def login():
    form = LoginForm()
    if form.validate_on_submit():
        flash('Login requested for user {}, remember_me={}'.format(
            form.username.data, form.remember_me.data))
        return redirect('/index')
    return render_template('login.html', title='Sign In', form=form)

Adding Data

Once the database problems were resolve I could add data with the flask shell command. E.g.

flask shell
db
u1 = User(username='john', email='john@example.com')
u2 = User(username='sum', email='sum@example.com')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
u1.follow(u2)
u2.followers
u2.followers.all()

Relationships

Introduction

The tutorial built a messaging system between the users. So I thought it would be useful to show how this was achieved. This just shows how to implement the database changes.

Database

This is the table to hold the message. Nothing fancy here.

    id = db.Column(db.Integer, primary_key=True)
    sender_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    body = db.Column(db.String(140))
    timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)

    def __repr__(self):
        return '<Message {}>'.format(self.body)

One-2-Many Relationship

We add this against the User and provide a function to get the number of new messages.

    messages_sent = db.relationship('Message',
                                    foreign_keys='Message.sender_id',
                                    backref='author', lazy='dynamic')
    messages_received = db.relationship('Message',
                                        foreign_keys='Message.recipient_id',
                                        backref='recipient', lazy='dynamic')
    last_message_read_time = db.Column(db.DateTime)

    def new_messages(self):
        last_read_time = self.last_message_read_time or datetime(1900, 1, 1)
        return Message.query.filter_by(recipient=self).filter(
            Message.timestamp > last_read_time).count()

Update Database

flask db migrate -m "private messages"
flask db upgrade

Authentication

Log In

The Flask-Login manages state of logins so I guess it is a bit like passport.

In the user model we specify we want to use the default implementations. The load_user is required as Flask-Login does not know about databases. Good example is provided.

from flask_login import UserMixin

@login.user_loader
def load_user(id):
    return User.query.get(int(id))

class User(UserMixin,db.Model):

Protecting Routes

We can protect routes by add the folllowing decorator.

from flask_login import login_required

@bp.route('/')
@bp.route('/index')
@login_required

Next

There is some explanation of the use of next on the tutorial so maybe worth logging here as passport does redirecting.

    if form.validate_on_submit():
        user = User.query.filter_by(username=form.username.data).first()
        if user is None or not user.check_password(form.password.data):
            flash('Invalid username or password')
            return redirect(url_for('main.login'))
        login_user(user, remember=form.remember_me.data)
        next_page = request.args.get('next')
        if not next_page or url_parse(next_page).netloc != '':
            next_page = url_for('main.index')
        return redirect(next_page)
  • If the login URL does not have a next argument, then the user is redirected to the index page.
  • If the login URL includes a next argument that is set to a relative path (or in other words, a URL without the domain portion), then the user is redirected to that URL.
  • If the login URL includes a next argument that is set to a full URL that includes a domain name, then the user is redirected to the index page.


The third case is in place to make the application more secure. To determine if the URL is relative or absolute, I parse it with Werkzeug's url_parse() function and then check if the netloc component is set or not.

Sign Up

This consisted of

  • making a form
  • making a template
  • adding the route

All of the code was straight forward.

Error Handling

First thing is probably

export FLASK_DEBUG=1
flask run myapp

Error Handler Page

Here is the custom error page. You will need to register the blueprint and make an __init__.py

from flask import render_template
from app.errors import bp
from app import db

@bp.app_errorhandler(404)
def not_found_error(error):
    return render_template('404.html'), 404

@bp.app_errorhandler(500)
def internal_error(error):
    db.session.rollback()
    return render_template('500.html'), 500

Here is the 404

{% extends "base.html" %}

{% block content %}
    <h1>File Not Found</h1>
    <p><a href="{{ url_for('main.index') }}">Back</a></p>
{% endblock %}

And the 500

{% extends "base.html" %}

{% block content %}
    <h1>An unexpected error has occurred</h1>
    <p>The administrator has been notified. Sorry for the inconvenience!</p>
    <p><a href="{{ url_for('main.index') }}">Back</a></p>
{% endblock %}

Emailing the Error

Quite liked the proactive approach from the beginning. Lets define the extract config

    MAIL_SERVER = os.environ.get('MAIL_SERVER')
    MAIL_PORT = int(os.environ.get('MAIL_PORT') or 25)
    MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') is not None
    MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
    MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
    ADMINS = ['your-email@example.com']

And configure it at startup.

import logging
from logging.handlers import SMTPHandler
...
if not app.debug:
    if app.config['MAIL_SERVER']:
        auth = None
        if app.config['MAIL_USERNAME'] or app.config['MAIL_PASSWORD']:
            auth = (app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'])
        secure = None
        if app.config['MAIL_USE_TLS']:
            secure = ()
        mail_handler = SMTPHandler(
            mailhost=(app.config['MAIL_SERVER'], app.config['MAIL_PORT']),
            fromaddr='no-reply@' + app.config['MAIL_SERVER'],
            toaddrs=app.config['ADMINS'], subject='Microblog Failure',
            credentials=auth, secure=secure)
        mail_handler.setLevel(logging.ERROR)
        app.logger.addHandler(mail_handler)

Run a fake python SMTP server

python -m smtpd -n -c DebuggingServer localhost:8025

Logging to File

Here is some sample code for logging. This comes with rotate.

    file_handler = RotatingFileHandler('logs/microblog.log', maxBytes=10240,
                                       backupCount=10)
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
    file_handler.setLevel(logging.INFO)
    app.logger.addHandler(file_handler)
    app.logger.setLevel(logging.INFO)
    app.logger.info('Microblog startup')

And some sample output

2021-05-25 03:51:27,196 INFO: Microblog startup [in /workspaces/bcnu1701flask/microblog/app/__init__.py:44]
2021-05-25 03:51:29,723 INFO: Adding Mail Handler [in /workspaces/bcnu1701flask/microblog/app/__init__.py:60]

Unit Testing

Example

The tutorial has a database and the relationships are like twitter. A follows B, B follows A and C etc. Below is an example of the testing for the database.

import unittest

from datetime import datetime, timedelta
from app import create_app, db
from app.models import User, Post
from config import Config

class TestConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite://'

class UserModelCase(unittest.TestCase):
    def setUp(self):
        self.app = create_app(TestConfig)
        self.app_context = self.app.app_context()
        self.app_context.push()
        db.create_all()

    def tearDown(self):
        db.session.remove()
        db.drop_all()

    def test_password_hashing(self):
        u = User(username='susan')
        u.set_password('cat')
        self.assertFalse(u.check_password('dog'))
        self.assertTrue(u.check_password('cat'))

    def test_avatar(self):
        u = User(username='john', email='john@example.com')
        self.assertEqual(u.avatar(128), ('https://www.gravatar.com/avatar/'
                                         'd4c74594d841139328695756648b6bd6'
                                         '?d=identicon&s=128'))

    def test_follow(self):
        u1 = User(username='john', email='john@example.com')
        u2 = User(username='susan', email='susan@example.com')
        db.session.add(u1)
        db.session.add(u2)
        db.session.commit()
        self.assertEqual(u1.followed.all(), [])
        self.assertEqual(u1.followers.all(), [])

        u1.follow(u2)
        db.session.commit()
        self.assertTrue(u1.is_following(u2))
        self.assertEqual(u1.followed.count(), 1)
        self.assertEqual(u1.followed.first().username, 'susan')
        self.assertEqual(u2.followers.count(), 1)
        self.assertEqual(u2.followers.first().username, 'john')

        u1.unfollow(u2)
        db.session.commit()
        self.assertFalse(u1.is_following(u2))
        self.assertEqual(u1.followed.count(), 0)
        self.assertEqual(u2.followers.count(), 0)

    def test_follow_posts(self):
        # create four users
        u1 = User(username='john', email='john@example.com')
        u2 = User(username='susan', email='susan@example.com')
        u3 = User(username='mary', email='mary@example.com')
        u4 = User(username='david', email='david@example.com')
        db.session.add_all([u1, u2, u3, u4])

        # create four posts
        now = datetime.utcnow()
        p1 = Post(body="post from john", author=u1,
                  timestamp=now + timedelta(seconds=1))
        p2 = Post(body="post from susan", author=u2,
                  timestamp=now + timedelta(seconds=4))
        p3 = Post(body="post from mary", author=u3,
                  timestamp=now + timedelta(seconds=3))
        p4 = Post(body="post from david", author=u4,
                  timestamp=now + timedelta(seconds=2))
        db.session.add_all([p1, p2, p3, p4])
        db.session.commit()

if __name__ == '__main__':
    unittest.main()

Outputting Results

Not written many python tests but without the call to unittest.main() there is no output.

Pagination In Flask

No alarms and no surprises here.

This is managed with Flask-SQLAlchemy with the paginate command

user.followed_posts().paginate(1, 20, False).items

Email

Introduction

Flask provides two packages for Flask. With my recent work on JWt will be interested in how this works.

pip install flask-mail
pip install pyjwt

With Flask extension, like express or other approaches you need to create and instance and then attach it to the app

# ...
from flask_mail import Mail

app = Flask(__name__)
# ...
mail = Mail()
...
def create_app(config_class=Config):
    app = Flask(__name__)
...
    mail.init_app(app)


from threading import Thread

from flask import render_template, current_app
from flask_mail import Message
from app import mail

def send_async_email(app, msg):
    with app.app_context():
        mail.send(msg)
        
def send_email(subject, sender, recipients, text_body, html_body,
               attachments=None, sync=False):
    msg = Message(subject, sender=sender, recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    if attachments:
        for attachment in attachments:
            msg.attach(*attachment)
    if sync:
        mail.send(msg)
    else:
        Thread(target=send_async_email,
            args=(current_app._get_current_object(), msg)).start()


def send_password_reset_email(user):
    token = user.get_reset_password_token()
    send_email('[Microblog] Reset Your Password',
               sender=current_app.config['ADMINS'][0],
               recipients=[user.email],
               text_body=render_template('email/reset_password.txt',
                                         user=user, token=token),
               html_body=render_template('email/reset_password.html',
                                         user=user, token=token))

Let's plug this into password reset.

Creating the Boiler Plate

As ever we need a

  • link
  • form
  • html
  • route

This is here a reminder of how to go and put any page into a flask app

Link

    <p>
        Forgot Your Password?
        <a href="{{ url_for('main.reset_password_request') }}">Click to Reset It</a>
    </p>

Form

class ResetPasswordRequestForm(FlaskForm):
    email = StringField('Email', validators=[DataRequired(), Email()])
    submit = SubmitField('Request Password Reset')
class ResetPasswordForm(FlaskForm):
    password = PasswordField('Password', validators=[DataRequired()])
    password2 = PasswordField(
        'Repeat Password', validators=[DataRequired(), EqualTo('password')])
    submit = SubmitField('Request Password Reset')

Html

{% extends "base.html" %}

{% block content %}
    <h1>Reset Password</h1>
    <form action="" method="post">
        {{ form.hidden_tag() }}
        <p>
            {{ form.email.label }}<br>
            {{ form.email(size=64) }}<br>
            {% for error in form.email.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>{{ form.submit() }}</p>
    </form>
{% endblock %}


{% extends "base.html" %}

{% block content %}
    <h1>Reset Your Password</h1>
    <form action="" method="post">
        {{ form.hidden_tag() }}
        <p>
            {{ form.password.label }}<br>
            {{ form.password(size=32) }}<br>
            {% for error in form.password.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>
            {{ form.password2.label }}<br>
            {{ form.password2(size=32) }}<br>
            {% for error in form.password2.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>{{ form.submit() }}</p>
    </form>
{% endblock %}

Route

from app.forms import ResetPasswordRequestForm
from app.email import send_password_reset_email

@app.route('/reset_password_request', methods=['GET', 'POST'])
@bp.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
    if current_user.is_authenticated:
        return redirect(url_for('main.index'))
    form = ResetPasswordRequestForm()
    if form.validate_on_submit():
        user = User.query.filter_by(email=form.email.data).first()
        if user:
            send_password_reset_email(user)
        flash('Check your email for the instructions to reset your password')
        return redirect(url_for('main.login'))
    return render_template('reset_password_request.html',
                           title='Reset Password', form=form)
@bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
    if current_user.is_authenticated:
        return redirect(url_for('main.index'))
    user = User.verify_reset_password_token(token)
    if not user:
        return redirect(url_for('index'))
    form = ResetPasswordForm()
    if form.validate_on_submit():
        user.set_password(form.password.data)
        db.session.commit()
        flash('Your password has been reset.')
        return redirect(url_for('main.login'))
    return render_template('reset_password.html', form=form)

Managing Jwt

Here is an example of encode and decode for the Python implementation of Jwt. Few surprises here

import jwt
from app import app

class User(UserMixin, db.Model):
    # ...

    def get_reset_password_token(self, expires_in=600):
        return jwt.encode(
            {'reset_password': self.id, 'exp': time() + expires_in},
            app.config['SECRET_KEY'], algorithm='HS256')

    @staticmethod
    def verify_reset_password_token(token):
        try:
            id = jwt.decode(token, app.config['SECRET_KEY'],
                            algorithms=['HS256'])['reset_password']
        except:
            return
        return User.query.get(id)

UI

Before

It looks pretty awful the app as it stands. Let see what bootstrap can do. Here it is before. Flas preboot.png

Install

We need to install the wrapper. No shown but we need to set up the bootstrap in the app too.

pip install flask-bootstrap

Base Template

{% extends 'bootstrap/base.html' %}

{% block title %}
    {% if title %}{{ title }} - Microblog{% else %}Welcome to Microblog{% endif %}
{% endblock %}

{% block navbar %}
    <nav class="navbar navbar-default">
        ... navigation bar here (see complete code on GitHub) ...
    </nav>
{% endblock %}

{% block content %}
    <div class="container">
        {% with messages = get_flashed_messages() %}
        {% if messages %}
            {% for message in messages %}
            <div class="alert alert-info" role="alert">{{ message }}</div>
            {% endfor %}
        {% endif %}
        {% endwith %}

        {# application content needs to be provided in the app_content block #}
        {% block app_content %}{% endblock %}
    </div>
{% endblock %}

Quick Forms

This does look like a code saver but I do wonder how many will be happy with the software rendering the controls compared to you. Having said that it did do a pretty good job for a simple render.
First the old way

{% extends "base.html" %}

{% block content %}
    <h1>Register</h1>
    <form action="" method="post">
        {{ form.hidden_tag() }}
        <p>
            {{ form.username.label }}<br>
            {{ form.username(size=32) }}<br>
            {% for error in form.username.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>
            {{ form.email.label }}<br>
            {{ form.email(size=64) }}<br>
            {% for error in form.email.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>
            {{ form.password.label }}<br>
            {{ form.password(size=32) }}<br>
            {% for error in form.password.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>
            {{ form.password2.label }}<br>
            {{ form.password2(size=32) }}<br>
            {% for error in form.password2.errors %}
            <span style="color: red;">[{{ error }}]</span>
            {% endfor %}
        </p>
        <p>{{ form.submit() }}</p>
    </form>
{% endblock %}

Using quick forms

{% extends "base.html" %}
{% import 'bootstrap/wtf.html' as wtf %}

{% block app_content %}
    <h1>Register</h1>
    <div class="row">
        <div class="col-md-4">
            {{ wtf.quick_form(form) }}
        </div>
    </div>
{% endblock %}

Post Bootstrap

Hopes were not high but it is not too bad. Still wondering really what the use case is for flask as a web frontend. FlaskPostBoot.png

Dates and Times

Introduction

Someone somewhere needs to sort this out. There is just so much work involved in something which really has two parameters. Time in the world and where you are. For flask they have gone for Moment form moment.js. So we install as ever with

pip install flask-moment

Add it to the app (not shown)

Usage

And here is some usage. The scripts block that I added here is another block exported by Flask-Bootstrap's base template. The super is required to preserves the content from the base template

{% block scripts %}
    {{ super() }}
    {{ moment.include_moment() }}
{% endblock %}

And this will show the date/time

    {% if user.last_seen %}
    <p>Last seen on: {{ moment(user.last_seen).format('LLL') }}</p>
    {% endif %}

And,,,

      <a href="{{ url_for('user', username=post.author.username) }}">
          {{ post.author.username }}
      </a>
      said {{ moment(post.timestamp).fromNow() }}:
      <br>
      {{ post.body }}

Multi Lanaguage

Configuration

For Flask we use Flask-Babel set up in the app. We specify in the Config the languages we support in an array.

LANGUAGES = ['en', 'es']

We need to load this into the app. This object provides a high-level interface to work with the Accept-Language header that clients send with a request.

from flask import request
...
@babel.localeselector
def get_locale():
    return request.accept_languages.best_match(app.config['LANGUAGES'])
from app import models

Usage

From there we need to use it

from flask_babel import _
# ...
flash(_('Your post is now live!'))

Other approach for interpolation

flash(_('User %(username)s not found.', username=username))

Flask-Babel provides a lazy evaluation version of _() that is called lazy_gettext() This is used when the language is not known at the time of usage. E.g. build the labels for form fields.

from flask_babel import lazy_gettext as _l

class LoginForm(FlaskForm):
    username = StringField(_l('Username'), validators=[DataRequired()])
    # ...

Creating list of translations

Once you have wrapped or your string in _ or _1 then we need a list to provide the other languages. The instructions will be left to google and https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-xiii-i18n-and-l10n
To here is the quick answer because I needed it for other parts to the tutorial

Create a babel.cfg in the root

[python: app/**.py]
[jinja2: app/templates/**.html]
extensions=jinja2.ext.autoescape,jinja2.ext.with_

Do the extract and generate in the root (Gosh LC_MESSAGES back in time)

#Extract
pybabel extract -F babel.cfg -k _l -o messages.pot .
#Generate
pybabel init -i messages.pot -d app/translations -l es

Now we need to compile to .po format

pybabel compile -d app/translations

Update the file we can do the following

pybabel extract -F babel.cfg -k _l -o messages.pot .
pybabel update -i messages.pot -d app/translations

So to change the submit button we

  • Change the form to have _1()
  • Extract and Compile

Change the form to have _1()

from flask_babel import _, lazy_gettext as _l
...

class PostForm(FlaskForm):
    post = TextAreaField('Say something', validators=[
        DataRequired(), Length(min=1, max=140)])
    submit = SubmitField(_l('Submit'))
...

Extract and Compile

pybabel extract -F babel.cfg -k _l -o messages.pot .
pybabel update -i messages.pot -d app/translations

Axios

Well the demo is going to add translation for new posts. The steps were

  • Identify the source language of the text
  • Get the preferred language
  • Build a translate() Function
  • Add a Route to do Call translate()
  • Add Placeholder for Translated Text
  • Add JS Function to Call Service
  • Trigger the call to translate()

Identify the source language of the text

There is a package for this

 pip install guess_language-spirit

We add a new column onto the Post table.

class Post(db.Model):
    # ...
    language = db.Column(db.String(5))

Uodate the database

flask db migrate -m "add language to posts"
flask db upgrade

Now add a default to the post form

def index():
    form = PostForm()
    if form.validate_on_submit():
        language = guess_language(form.post.data)
        if language == 'UNKNOWN' or len(language) > 5:
            language = ''        
        post = Post(body=form.post.data, author=current_user)
        db.session.add(post)
        db.session.commit()
        flash('Your post is now live!')
   ....

We also add a g.local to get the current language.

@bp.before_request
def before_request():
    if current_user.is_authenticated:
        current_user.last_seen = datetime.utcnow()
        db.session.commit()
    g.locale = str(get_locale())

Now add the to the post template to show the link if required

                {% if post.language and post.language != g.locale %}
                <br><br>
                <a href="#">{{ _('Translate') }}</a>
                {% endif %}

Build a translate() Function

Setup the Environment to use Azure Function

Signed up to Microsoft Azure to test this. Basically you set up and environment variable with the key in the service e.g.

            "env": {
                "FLASK_APP": "microblog.py",
                "FLASK_ENV": "development",
                "FLASK_DEBUG": "0",
                "MAIL_SERVER": "localhost",
                "MAIL_PORT": "8025",
                "MS_TRANSLATOR_KEY": "xxxxxxxxxxx",
                "MS_TRANSLATOR_TEXT_REGION": "australiaeast",
            },

Create translate() function

Import request

pip install requests

And create a function to do the translation

import requests
from flask_babel import _, current_app
from app import create_app

def translate(text, source_language, dest_language):
    if 'MS_TRANSLATOR_KEY' not in current_app.config or \
            not current_app.config['MS_TRANSLATOR_KEY']:
        return _('Error: the translation key service is not configured.')
    if 'MS_TRANSLATOR_TEXT_REGION' not in current_app.config or \
            not current_app.config['MS_TRANSLATOR_TEXT_REGION']:
        return _('Error: the translation service region is not configured.')
    auth = {
        'Ocp-Apim-Subscription-Key': current_app.config['MS_TRANSLATOR_KEY'],
        'Ocp-Apim-Subscription-Region': current_app.config['MS_TRANSLATOR_TEXT_REGION']
        }
    r = requests.post(
        'https://api.cognitive.microsofttranslator.com/translate?api-version=3.0'
        '&from={}&to={}'.format(
            source_language, dest_language), headers=auth, json=[{'Text': text}])
    if r.status_code != 200:
        print(current_app.config['MS_TRANSLATOR_KEY'])
        print('Hekko')
        print(r)
        return _('Error: the translation service failed.')
    return r.json()[0]['translations'][0]['text']

Add a Route to do Call translate()

We just wrap the call in a post request

from flask import jsonify
from app.translate import translate

@bp.route('/translate', methods=['POST'])
@login_required
def translate_text():
    return jsonify({'text': translate(request.form['text'],
                                      request.form['source_language'],
                                      request.form['dest_language'])})

Add Placeholder for Translated Text

            <span id="post{{ post.id }}">{{ post.body }}</span>
            <span id="translation{{ post.id }}">
                <a href="#">{{ _('Translate') }}</a>
            </span>

Add JS Function to Call Service

This JavaScript is probably the interesting bit as it shows how to call a asynchronous function in Flask.

 {% block scripts %}
    ...
    <script>
        function translate(sourceElem, destElem, sourceLang, destLang) {
            $(destElem).html('<img src="{{ url_for('static', filename='loading.gif') }}">');
            $.post('/translate', {
                text: $(sourceElem).text(),
                source_language: sourceLang,
                dest_language: destLang
            }).done(function(response) {
                $(destElem).text(response['text'])
            }).fail(function() {
                $(destElem).text("{{ _('Error: Could not contact server.') }}");
            });
        }
    </script>
{% endblock %}

Trigger the call to translate()

                <span id="translation{{ post.id }}">
                    <a href="javascript:translate(
                                '#post{{ post.id }}',
                                '#translation{{ post.id }}',
                                '{{ post.language }}',
                                '{{ g.locale }}');">{{ _('Translate') }}</a>
                </span>

Stuff to Know

DotEnv

Python support dotenv similar to nodejs

pip install python-dotenv

We can no initialize this in config

import os
from dotenv import load_dotenv

basedir = os.path.abspath(os.path.dirname(__file__))
load_dotenv(os.path.join(basedir, '.env'))

class Config(object):
...
    SECRET_KEY = os.environ.get('SECRET_KEY')
    MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY')
    MS_TRANSLATOR_TEXT_REGION = os.environ.get('MS_TRANSLATOR_TEXT_REGION')

List Packages to Requirements.txt

Easy Pezzy lemon squeezy

pip freeze > requirements.txt

Elastic Search

Fixes

This was the hardest part of the tutorial. There was one small error where you need to create the index in the elastic search. It is there in the text but not obvious to do so here we go

flask shell
from app.models import Post
Post.reindex()
==Introduction==
The tutorial led me to using Elastic Search. Used a bit before so should be interesting how to integrate with Flask.<br><br>
I used this to install on ubuntu https://linuxize.com/post/how-to-install-elasticsearch-on-ubuntu-20-04
<br>
<br>
<syntaxhighlight lang="bash">
pip install elasticsearch

Don't forget to set the values in /etc/elasticsearch/elasticsearch.yml

network.host: 192.168.1.xx
discovery.seed_hosts: ["192.168.1.xx"]

Configuration

Lets add the URL to the config

class Config(object):
    # ...
    ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL')

Use it in the app

from elasticsearch import Elasticsearch

def create_app(config_class=Config):
...
    app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
    if app.config['ELASTICSEARCH_URL'] else None

Update Model

Let change the model to include all the fields which can be searched. We only have one but who knows next time.

class Post(db.Model):
    __searchable__ = ['body']
    # ...

Search Function

So here is the big moment. The search function.

  • Add a search
  • Remove a search
  • Query
from flask import current_app

def add_to_index(index, model):
    if not current_app.elasticsearch:
        return
    payload = {}
    for field in model.__searchable__:
        payload[field] = getattr(model, field)
    current_app.elasticsearch.index(index=index, id=model.id, body=payload)

def remove_from_index(index, model):
    if not current_app.elasticsearch:
        return
    current_app.elasticsearch.delete(index=index, id=model.id)

def query_index(index, query, page, per_page):
    if not current_app.elasticsearch:
        return [], 0
    search = current_app.elasticsearch.search(
        index=index,
        body={'query': {'multi_match': {'query': query, 'fields': ['*']}},
              'from': (page - 1) * per_page, 'size': per_page})
    ids = [int(hit['_id']) for hit in search['hits']['hits']]
    return ids, search['hits']['total']['value']

Adding Search to the Post Model

Struggling to understand this so bear with. We create Mixens, which are shared static functions I think to the model. Not sure why they put them in the same file. Maybe convention rather than style. The Search returns a list of objects based on the ids which match the expression. The before_commit, after commit look like low level functions for locking.

from app.search import add_to_index, remove_from_index, query_index

class SearchableMixin(object):
    @classmethod
    def search(cls, expression, page, per_page):
        ids, total = query_index(cls.__tablename__, expression, page, per_page)
        if total == 0:
            return cls.query.filter_by(id=0), 0
        when = []
        for i in range(len(ids)):
            when.append((ids[i], i))
        return cls.query.filter(cls.id.in_(ids)).order_by(
            db.case(when, value=cls.id)), total

    @classmethod
    def before_commit(cls, session):
        session._changes = {
            'add': list(session.new),
            'update': list(session.dirty),
            'delete': list(session.deleted)
        }

    @classmethod
    def after_commit(cls, session):
        for obj in session._changes['add']:
            if isinstance(obj, SearchableMixin):
                add_to_index(obj.__tablename__, obj)
        for obj in session._changes['update']:
            if isinstance(obj, SearchableMixin):
                add_to_index(obj.__tablename__, obj)
        for obj in session._changes['delete']:
            if isinstance(obj, SearchableMixin):
                remove_from_index(obj.__tablename__, obj)
        session._changes = None

    @classmethod
    def reindex(cls):
        for obj in cls.query:
            add_to_index(cls.__tablename__, obj)

db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)

Add a Search Form

class SearchForm(FlaskForm):
    q = StringField(_l('Search'), validators=[DataRequired()])

    def __init__(self, *args, **kwargs):
        if 'formdata' not in kwargs:
            kwargs['formdata'] = request.args
        if 'csrf_enabled' not in kwargs:
            kwargs['csrf_enabled'] = False
        super(SearchForm, self).__init__(*args, **kwargs)

Create Search Form

We can create the app wide function in routes before request.

from flask import g
from app.main.forms import SearchForm

@bp.before_app_request
def before_request():
    if current_user.is_authenticated:
        current_user.last_seen = datetime.utcnow()
        db.session.commit()
        g.search_form = SearchForm()
    g.locale = str(get_locale())

And here is the html

            <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
                <ul class="nav navbar-nav">
                    ... home and explore links ...
                </ul>
                {% if g.search_form %}
                <form class="navbar-form navbar-left" method="get"
                        action="{{ url_for('main.search') }}">
                    <div class="form-group">
                        {{ g.search_form.q(size=20, class='form-control',
                            placeholder=g.search_form.q.label.text) }}
                    </div>
                </form>
                {% endif %}

Search Results Form

And the results of the search

{% extends "base.html" %}

{% block app_content %}
    <h1>{{ _('Search Results') }}</h1>
    {% for post in posts %}
        {% include '_post.html' %}
    {% endfor %}
    <nav aria-label="...">
        <ul class="pager">
            <li class="previous{% if not prev_url %} disabled{% endif %}">
                <a href="{{ prev_url or '#' }}">
                    <span aria-hidden="true">&larr;</span>
                    {{ _('Previous results') }}
                </a>
            </li>
            <li class="next{% if not next_url %} disabled{% endif %}">
                <a href="{{ next_url or '#' }}">
                    {{ _('Next results') }}
                    <span aria-hidden="true">&rarr;</span>
                </a>
            </li>
        </ul>
    </nav>
{% endblock %}

Deployment

WSGI

WSGI.png
This stands for Web Service Gateway Interface. This is a specification used to implement the WebServer which originally came out of mod_python from Apache. Advantages include

  • Flexibility (Gunicorn and uWSGI seem to be the favs)
  • Scalability You can set the amount of requests to be handled
  • Speed
  • Used with Django, Flask, CherryPi etc

Gunincorn

Fairly simple to implement, number of workers, and the name of the app

gunicorn -b localhost:8000 -w 4 microblog:app

Supervisor

The supervisor utility uses configuration files that tell it what programs to monitor and how to restart them when necessary. Configuration files must be stored in /etc/supervisor/conf.d.

[program:microblog]
command=/home/ubuntu/microblog/venv/bin/gunicorn -b localhost:8000 -w 4 microblog:app
directory=/home/ubuntu/microblog
user=ubuntu
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true

And now we can run our app

sudo supervisorctl reload

Deployment

Introduction

We are going to install with Gunicorn and Nginx. Why take two servers into the shower? Well the answer is functionality from what I understand. Nginx is the grunt but gunicorn has the WSGI interface.

WSGI

WSGI.png
This stands for Web Service Gateway Interface. This is a specification used to implement the WebServer which originally came out of mod_python from Apache. Advantages include

  • Flexibility (Gunicorn and uWSGI seem to be the favs)
  • Scalability You can set the amount of requests to be handled
  • Speed
  • Used with Django, Flask, CherryPi etc

Gunincorn

Fairly simple to implement, number of workers, and the name of the app

gunicorn -b localhost:8000 -w 4 microblog:app

Supervisor

The supervisor utility uses configuration files that tell it what programs to monitor and how to restart them when necessary. Configuration files must be stored in /etc/supervisor/conf.d.

[program:microblog]
command=/home/ubuntu/microblog/venv/bin/gunicorn -b localhost:8000 -w 4 microblog:app
directory=/home/ubuntu/microblog
user=ubuntu
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true

And now we can run our app

sudo supervisorctl reload

Nginx (N-GIN-X)

I am going to move off Apache and onto Nginx so lets see how this works. We have had a great relationship but cannot ignore how popular this is. Just migrated and wow the speed on Nginx is incredible.

server {
    # listen on port 80 (http)
    listen 80;
    server_name _;
    location / {
        # redirect any requests to the same URL but on https
        return 301 https://$host$request_uri;
    }
}
server {
    # listen on port 443 (https)
    listen 443 ssl;
    server_name _;

    # location of the self-signed SSL certificate
    ssl_certificate /home/ubuntu/microblog/certs/cert.pem;
    ssl_certificate_key /home/ubuntu/microblog/certs/key.pem;

    # write access and error logs to /var/log
    access_log /var/log/microblog_access.log;
    error_log /var/log/microblog_error.log;

    location / {
        # forward application requests to the gunicorn server
        proxy_pass http://localhost:8000;
        proxy_redirect off;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    location /static {
        # handle static files directly, without forwarding to the application
        alias /home/ubuntu/microblog/app/static;
        expires 30d;
    }
}

Docker

Well more on Docker and this time flask. Needd to put the work to date into a container. Attempting the script on the tutorial failed probably because I was using Fask 2.0. It relies on SQLAlchemy which in my version relied on greenlet and then gcc. Putting it in an Alpine container meant I either install gcc and g++ or went for a two phase container. No prizes what I did

FROM python:3.9-alpine as base

RUN mkdir -p /home/microblog cd /home/microblog COPY . /home/microblog
WORKDIR /home/microblog

RUN apk add --update \
    postgresql-dev \
    gcc g++ \
    musl-dev \
    linux-headers

RUN python -m venv venv
RUN venv/bin/pip install greenlet

FROM python:3.9-alpine

RUN adduser -D microblog

COPY --from=base /home/microblog /home/microblog
WORKDIR /home/microblog

COPY requirements.txt requirements.txt

RUN venv/bin/pip install wheel
RUN venv/bin/pip install -r requirements.txt
RUN venv/bin/pip install gunicorn pymysql 

COPY app app
COPY migrations migrations
COPY microblog.py config.py boot.sh ./
RUN chmod +x boot.sh

ENV FLASK_APP microblog.py

RUN chown -R microblog:microblog ./
USER microblog

EXPOSE 5000
ENTRYPOINT ["./boot.sh"]

You can then run the container with the command below. The --net=host allows the container to access the host ports. e.g. localhost:3306

docker run --name microblog  --net=host  -d  -p 8000:5000 --rm -e SECRET_KEY=my-secret-key     -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true   -e REDIS_URL=redis://localhost:6379/0  -e MAIL_USERNAME=emailusername -e MAIL_PASSWORD=emailpass -e DATABASE_URL=mysql+pymysql://user:password@localhost:3306/databasename containername:latest

Two Features with JavaScript

This is showing how to implement two features where client and server are involved.

Show User Profile

This will display the Profile of a user when the user hovers over another user. On the server side we simply create a route with a popup page to be rendered. On the client side we mange the calling of this route using the mouse in and mouse out event.

Server Side

  • Create a route
  • Make a template

Create a route

@bp.route('/user/<username>/popup')
@login_required
def user_popup(username):
    user = User.query.filter_by(username=username).first_or_404()
    form = EmptyForm()
    return render_template('user_popup.html', user=user, form=form)

Create a template

<table class="table">
    <tr>
        <td width="64" style="border: 0px;"><img src="{{ user.avatar(64) }}"></td>
        <td style="border: 0px;">
            <p><a href="{{ url_for('main.user', username=user.username) }}">{{ user.username }}</a></p>
            <small>
                {% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
                {% if user.last_seen %}
                <p>{{ _('Last seen on') }}: {{ moment(user.last_seen).format('lll') }}</p>
                {% endif %}
                <p>
                    {{ _('%(count)d followers', count=user.followers.count()) }}, 
                    {{ _('%(count)d following', count=user.followed.count()) }
                }</p>
                {% if user != current_user %}
                    {% if not current_user.is_following(user) %}
                    <p>
                        <form action="{{ url_for('main.follow', username=user.username) }}" method="post">
                            {{ form.hidden_tag() }}
                            {{ form.submit(value=_('Follow'), class_='btn btn-default btn-sm') }}
                        </form>
                    </p>
                    {% else %}
                    <p>
                        <form action="{{ url_for('main.unfollow', username=user.username) }}" method="post">
                            {{ form.hidden_tag() }}
                            {{ form.submit(value=_('Unfollow'), class_='btn btn-default btm-sm') }}
                        </form>
                    </p>
                    {% endif %}
                {% endif %}
            </small>
        </td>
    </tr>
</table>

Client Side

Show how to use mouseover and implement the call to the route.

  • Mouse Over
  • Expanded Mouse Over

Mouse Over

We create a mouse over event when the user hover over a tag with the the class .user_popup. This is the shell of what to do. We need to hanndle mouse iin and mouse out.

    $(function() {
        $('.user_popup').hover(
            function(event) {
                // mouse in event handler
                var elem = $(event.currentTarget);
            },
            function(event) {
                // mouse out event handler
                var elem = $(event.currentTarget);
            }
        )
    });

Expanded Mouse Over

This is the expanded version which calls the server-side route above. The mouse-out event cleans up the timer event which will leak without it.

    $(function() {
        var timer = null;
        var xhr = null;
        $('.user_popup').hover(
            function(event) {
                // mouse in event handler
                var elem = $(event.currentTarget);
                timer = setTimeout(function() {
                    timer = null;
                    xhr = $.ajax(
                        '/user/' + elem.first().text().trim() + '/popup').done(
                             function(data) {
                                 xhr = null;
                                 elem.popover({
                                     trigger: 'manual',
                                     html: true,
                                     animation: false,
                                     container: elem,
                                     content: data
                                 }).popover('show');
                                 flask_moment_render_all();
                             }
                        );
                }, 1000);
            },
            function(event) {
                // mouse out event handler
                var elem = $(event.currentTarget);
                if (timer) {
                    clearTimeout(timer);
                    timer = null;
                }
                else if (xhr) {
                    xhr.abort();
                    xhr = null;
                }
                else {
                    elem.popover('destroy');
                }
            }
        )
    });

Show Notifications

This feature will poll a new table Notification and display the new messages when they arrive.

Server Side

On the server side we will store the data and initiate the insert into the new Notification table when a message is created

  • Create a notification Table
  • Create relationship to User
  • Create an Add notification function
  • Create Route which returns the message count
  • Set Notifications to 0 when Messages route is Rendered

Create a notification Table

Create to hold entry when new message created.

class Notification(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), index=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    timestamp = db.Column(db.Float, index=True, default=time)
    payload_json = db.Column(db.Text)

    def get_data(self):
        return json.loads(str(self.payload_json))

Create relationship to User

Now connect the table to the user.

class User(UserMixin, db.Model):
    # ...
    notifications = db.relationship('Notification', backref='user',
                                    lazy='dynamic')

Create an Add notification function

Add function to insert a row when user create a private message. On the creation of a message this is the function to be called.

    def add_notification(self, name, data):
        self.notifications.filter_by(name=name).delete()
        n = Notification(name=name, payload_json=json.dumps(data), user=self)
        db.session.add(n)
        return n

Create Route which returns the message count

This will be called by the client

@bp.route('/notifications')
@login_required
def notifications():
    since = request.args.get('since', 0.0, type=float)
    notifications = current_user.notifications.filter(
        Notification.timestamp > since).order_by(Notification.timestamp.asc())
    return jsonify([{
        'name': n.name,
        'data': n.get_data(),
        'timestamp': n.timestamp
    } for n in notifications])

Set Notifications to 0 when Messages route is Rendered

If the user is on the messages page the assumption is they must have read them so we can set the unread message count to 0

@bp.route('/messages')
@login_required
def messages():
    current_user.last_message_read_time = datetime.utcnow()
    current_user.add_notification('unread_message_count', 0)
    db.session.commit()
    # ...

Client Side

The first two functions are relatively simple. It is the last function which is the part which is important to understand.

  • Hide Message if there are none
  • Provide a function to set Message Count
  • Poll for notifications

Hide Message if there are none

This is just an example of using visible with jinja

   <li>
     <a href="{{ url_for('main.messages') }}">
        {{ _('Messages') }}
        {% set new_messages = current_user.new_messages() %}
        <span id="message_count" class="badge"
            style="visibility: {% if new_messages %}visible
                 {% else %}hidden {% endif %};">
                                {{ new_messages }}
        </span>
      </a>
   </li>

Provide a function to set Message Count=

This is just a function to set the message count used in the above

{% block scripts %}
    <script>
        // ...
        function set_message_count(n) {
            $('#message_count').text(n);
            $('#message_count').css('visibility', n ? 'visible' : 'hidden');
        }
    </script>
{% endblock %}

Poll for notifications

This essentially is an Ajax call every 10 seconds. If the user is authenticated then call the route on the server and record the timestamp.

{% block scripts %}
    <script>
        // ...
        {% if current_user.is_authenticated %}
        $(function() {
            var since = 0;
            setInterval(function() {
                $.ajax('{{ url_for('main.notifications') }}?since=' + since).done(
                    function(notifications) {
                        for (var i = 0; i < notifications.length; i++) {
                            if (notifications[i].name == 'unread_message_count')
                                set_message_count(notifications[i].data);
                            since = notifications[i].timestamp;
                        }
                    }
                );
            }, 10000);
        });
        {% endif %}
    </script>

Background Jobs (Message Queues)

Introduction

This is a simple example of use Redis to perform a background job. We are going to Redis to do this which have be installed on ubuntu.

Redis

Redis uses

Protocol PortConnection Source
TCP8070, 8071Internal, External
TCP8443External
TCP9081Active-Active
TCP9443 (Recommended), 8080Internal, External, Active-Active

you can install it with

sudo apt install redis

Demo

To run the demo you will need to terminals. One terminal where you submit the job and another where the job is executed.

Install the Redis Client

This is a demo of how to use redis. First install the python client

pip install rq

Start the Worker

On the worker terminal

For the job running the worker queue you will need to be in the directory where the task being submitted resides. In this case it is app.tasks.example. (See below)

 rq worker microblog-tasks

Create a Task

Now lets make an example task which just runs a function for a given number of seconds. Whilst we are not using the Flask App this is shown to illustrate creating a task within your app where the Worker will need an instance.

from app import create_app

app = create_app()
app.app_context().push()

import time
def example(seconds):
    job = get_current_job()
    print('Starting task')
    for i in range(seconds):
        job.meta['progress'] = 100.0 * i / seconds
        job.save_meta()
        print(i)
        time.sleep(1)
    job.meta['progress'] = 100
    job.save_meta()
    print('Task completed')

Execute a Task

On the submission terminal

We need too

  • import the client
  • connect the worker to the server
  • queue the example
from redis import Redis
import rq
queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
job = queue.enqueue('app.tasks.example', 23)
job.get_id()

Result

The task is executed on the worker terminal and we can monitor this in the submission terminal with

>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True

Using Redis In the App

Configuration In the App

In the tutorial they created a Task table to store the ongoing tasks which will allow for when the user logs out. To initialize the Redis a config item is created and initialized in the __init__.py

def create_app(config_class=Config):
...
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)

Set User State

We need to make sure we restore the state of the tasks when the use logs in. To do this we provide three functions.

class User(UserMixin, db.Model):
    # ...

    def launch_task(self, name, description, *args, **kwargs):
        rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
                                                *args, **kwargs)
        task = Task(id=rq_job.get_id(), name=name, description=description,
                    user=self)
        db.session.add(task)
        return task

    def get_tasks_in_progress(self):
        return Task.query.filter_by(user=self, complete=False).all()

    def get_task_in_progress(self, name):
        return Task.query.filter_by(name=name, user=self,
                                    complete=False).first()

Create the Task

We then need to create a task which will be performed. In our cases it is an export task (Not Shown). At the end of the task an email is sent with the data.

Updating the Progress

def _set_task_progress(progress):
    job = get_current_job()
    if job:
        job.meta['progress'] = progress
        job.save_meta()
        task = Task.query.get(job.get_id())
        task.user.add_notification('task_progress', {'task_id': job.get_id(),
                                                     'progress': progress})
        if progress >= 100:
            task.complete = True
        db.session.commit()

Perform Export and Invoking Updates

This is the core of the task where it iterates over the posts and updates the progress. Wasn't going to list it but a good example for future.

        user = User.query.get(user_id)
        _set_task_progress(0)
        data = []
        i = 0
        total_posts = user.posts.count()
        for post in user.posts.order_by(Post.timestamp.asc()):
            data.append({'body': post.body,
                         'timestamp': post.timestamp.isoformat() + 'Z'})
            time.sleep(5)
            i += 1
            _set_task_progress(100 * i // total_posts)

Sending By Email

def export_posts(user_id):
    try:
        # ...

        send_email('[Microblog] Your blog posts',
                sender=app.config['ADMINS'][0], recipients=[user.email],
                text_body=render_template('email/export_posts.txt', user=user),
                html_body=render_template('email/export_posts.html', user=user),
                attachments=[('posts.json', 'application/json',
                              json.dumps({'posts': data}, indent=4))],
                sync=True)
    except:
        # ...
    finally:
        # ...

REST API with Flask

Introduction

We are going to create a RESET API for the users table. The structure offered was

  • users.py
  • errors.py
  • tokens.py
from app.api import bp

@bp.route('/users/<int:id>', methods=['GET'])
def get_user(id):
    pass

@bp.route('/users', methods=['GET'])
def get_users():
    pass

@bp.route('/users/<int:id>/followers', methods=['GET'])
def get_followers(id):
    pass

@bp.route('/users/<int:id>/followed', methods=['GET'])
def get_followed(id):
    pass

@bp.route('/users', methods=['POST'])
def create_user():
    pass

@bp.route('/users/<int:id>', methods=['PUT'])
def update_user(id):
    pass

Create Json from Python

Python using the dictionary pattern so this is relatively trivial to output Json

class User(UserMixin, db.Model):
...
    def to_dict(self, include_email=False):
        data = {
            'id': self.id,
            'username': self.username,
            'last_seen': self.last_seen.isoformat() + 'Z',
            'about_me': self.about_me,
            'post_count': self.posts.count(),
            'follower_count': self.followers.count(),
            'followed_count': self.followed.count(),
            '_links': {
                'self': url_for('api.get_user', id=self.id),
                'followers': url_for('api.get_followers', id=self.id),
                'followed': url_for('api.get_followed', id=self.id),
                'avatar': self.avatar(128)
            }
        }

Outputting lists of Users

This is a representation of how the users will look including pagination and hypermedia

{
    "items": [
        { ... user resource ... },
        { ... user resource ... },
        ...
    ],
    "_meta": {
        "page": 1,
        "per_page": 10,
        "total_pages": 20,
        "total_items": 195
    },
    "_links": {
        "self": "http://localhost:5000/api/users?page=1",
        "next": "http://localhost:5000/api/users?page=2",
        "prev": null
    }
}

Implementation in Python

We can use a Mixen for this

class PaginatedAPIMixin(object):
    @staticmethod
    def to_collection_dict(query, page, per_page, endpoint, **kwargs):
        resources = query.paginate(page, per_page, False)
        data = {
            'items': [item.to_dict() for item in resources.items],
            '_meta': {
                'page': page,
                'per_page': per_page,
                'total_pages': resources.pages,
                'total_items': resources.total
            },
            '_links': {
                'self': url_for(endpoint, page=page, per_page=per_page,
                                **kwargs),
                'next': url_for(endpoint, page=page + 1, per_page=per_page,
                                **kwargs) if resources.has_next else None,
                'prev': url_for(endpoint, page=page - 1, per_page=per_page,
                                **kwargs) if resources.has_prev else None
            }
        }
        return data

Error Handling

No must to see here.

def error_response(status_code, message=None):
    payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
    if message:
        payload['message'] = message
    response = jsonify(payload)
    response.status_code = status_code
    return response

def bad_request(message):
    return error_response(400, message)

Error Handlers to customize want is return to the client. The wants_json_response() looks at want was passed as a preference and responds appropriately for the status code.

from flask import render_template, request
from app import db
from app.errors import bp
from app.api.errors import error_response as api_error_response

def wants_json_response():
    return request.accept_mimetypes['application/json'] >= \
        request.accept_mimetypes['text/html']

@bp.app_errorhandler(404)
def not_found_error(error):
    if wants_json_response():
        return api_error_response(404)
    return render_template('errors/404.html'), 404

@bp.app_errorhandler(500)
def internal_error(error):
    db.session.rollback()
    if wants_json_response():
        return api_error_response(500)
    return render_template('errors/500.html'), 500

Implementations

GET Many

@bp.route('/users', methods=['GET'])
def get_users():
    page = request.args.get('page', 1, type=int)
    per_page = min(request.args.get('per_page', 10, type=int), 100)
    data = User.to_collection_dict(User.query, page, per_page, 'api.get_users')
    return jsonify(data)

GET One

@bp.route('/users/<int:id>', methods=['GET'])
def get_user(id):
    return jsonify(User.query.get_or_404(id).to_dict())

PUT

@bp.route('/users/<int:id>', methods=['PUT'])
def update_user(id):
    user = User.query.get_or_404(id)
    data = request.get_json() or {}
    if 'username' in data and data['username'] != user.username and \
            User.query.filter_by(username=data['username']).first():
        return bad_request('please use a different username')
    if 'email' in data and data['email'] != user.email and \
            User.query.filter_by(email=data['email']).first():
        return bad_request('please use a different email address')
    user.from_dict(data, new_user=False)
    db.session.commit()
    return jsonify(user.to_dict())

POST

@bp.route('/users', methods=['POST'])
def create_user():
    data = request.get_json() or {}
    if 'username' not in data or 'email' not in data or 'password' not in data:
        return bad_request('must include username, email and password fields')
    if User.query.filter_by(username=data['username']).first():
        return bad_request('please use a different username')
    if User.query.filter_by(email=data['email']).first():
        return bad_request('please use a different email address')
    user = User()
    user.from_dict(data, new_user=True)
    db.session.add(user)
    db.session.commit()
    response = jsonify(user.to_dict())
    response.status_code = 201
    response.headers['Location'] = url_for('api.get_user', id=user.id)
    return response

Token Authentication

Introduction

Flask-HttpAuth provide the mechanism for using tokens. It looks like it does support roles to but it seemed like this was not a big thing at the time but is a must now. This example is taken from https://flask-httpauth.readthedocs.io/en/latest/

from flask import Flask, g
from flask_httpauth import HTTPTokenAuth

app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')

tokens = {
    "secret-token-1": "john",
    "secret-token-2": "susan"
}

@auth.verify_token
def verify_token(token):
    if token in tokens:
        return tokens[token]

@app.route('/')
@auth.login_required
def index():
    return "Hello, {}!".format(auth.current_user())

if __name__ == '__main__':
    app.run()