Flask
Introduction
Quick tour of the python framework flask. Most of this has been taken from https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-i-hello-world
When I started the project flask we not found. I either started a new terminal to fix the probably or install flask with
python3 -m pip install flask --upgrade
Getting Started
Create Python Environment and Activate
I don't use python all of the time so this is a reminder
mkdir myProject
cd myProject
python3 -m venv venv
source venv/bin/activate
Create Flask App
Install Flask
pip3 install flask
mkdir app
touch app/__init__.py
touch app/routes.py
Now edit __init__.py to have
from flask import Flask
app = Flask(__name__)
from app import routes
And for Routes
from app import app
@app.route('/')
@app.route('/index')
def index():
return "Hello, World!"
Now you can run flask and go to 127.0.0.1:5000. Once this works you start the microblog stuff
from app import app
@app.route('/')
@app.route('/index')
def index():
user = {'username': 'Miguel'}
return '''
<html>
<head>
<title>Home Page - Microblog</title>
</head>
<body>
<h1>Hello, ''' + user['username'] + '''!</h1>
</body>
</html>'''
Routing
Templates
So flask like others such pug or egs has templates. Very similar indeed. It supports inheritance for navigation and footers
from flask import render_template
from app import app
@app.route('/')
@app.route('/index')
def index():
user = {'username': 'Miguel'}
posts = [
{
'author': {'username': 'John'},
'body': 'Beautiful day in Portland!'
},
{
'author': {'username': 'Susan'},
'body': 'The Avengers movie was so cool!'
}
]
return render_template('index.html', title='Home', user=user, posts=posts)
And the template
<html>
<head>
{% if title %}
<title>{{ title }} - Microblog</title>
{% else %}
<title>Welcome to Microblog</title>
{% endif %}
</head>
<body>
<h1>Hi, {{ user.username }}!</h1>
{% for post in posts %}
<div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
{% endfor %}
</body>
</html>
We can include subtemplates like this.
{% for post in posts %}
{% include '_post.html' %}
{% endfor %}
Forms
Introduction
Flask uses Flask-WTF for forms which is a wrapper for WTFForms. To configure this we need a secret key which is attached to requests to help prevent CSRF. This is stored in the root or the project e.g. config.py and loaded by the
import os
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
And in __init__.py
from flask import Flask
from config import Config
app = Flask(__name__)
app.config.from_object(Config)
from app import routes
Form Example
Don't want to cut and paste too must from the example but here is the basics for forms
- Define the form in python
- Define the template
- Render the form
Python Code
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Sign In')
Template
The novalidate attribute is used to tell the web browser to not apply validation to the fields in this form, which effectively leaves this task to the Flask application running in the server. Using novalidate is entirely optional.
The form.hidden_tag() template argument generates a hidden field that includes a token that is used to protect the form against CSRF attacks.
{% extends "base.html" %}
{% block content %}
<h1>Sign In</h1>
<form action="" method="post" novalidate>
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
Rendering
from flask import render_template
from app import app
from app.forms import LoginForm
# ...
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
flash('Login requested for user {}, remember_me={}'.format(
form.username.data, form.remember_me.data))
return redirect('/index')
return render_template('login.html', title='Sign In', form=form)
Databases
Tools of the Trade
For Flask we use flask-sqlalchemy which is a wrapper for SQLAlchemy which is in turn a ORM (Object Relational Mapper). Like Room we can also use Flask-Migrate which is a wrapper for Alembic and database migration tool.
SQLite Config Example
First we add the connection string to the config
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config(object):
# ...
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
Then initialise in the __init__.py by adding the db and the migrate
from flask import Flask
from config import Config
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
from app import routes, models
Table Definition Example
Nothing to see here except the __repr__ which is the python equivalent of toString(). It really could use some descent formatting. Almost unreadable.
from app import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True)
password_hash = db.Column(db.String(128))
def __repr__(self):
return '<User {}>'.format(self.username)
Initialize
The flask command relies on FLASK_APP being set to work so make sure this is set.
If flask not found look at comment at the top of this page. Make sure the comments are for your table and not users.
flask db init
flask db migrate
# flask db upgrade
Database Problems
It looks to me that the Flask 2.0 does not have concept of update/downgrade in it's current form as it looks for a application factory to work with. I have to change the tutorial to use the following
microblog.py
The shell_context_processor sets up the flask shell.
from app import create_app, db
from app.models import User, Post
app = create_app()
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post}
init.py
Had to change the __init__.py to below. This mean creating a blueprint and adding it to the app. The routes and forms were moved to app/main
from flask import Flask, current_app
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from config import Config
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
db.init_app(app)
migrate.init_app(app, db)
from app.main import bp as main_bp
app.register_blueprint(main_bp)
return app
from app import models
main package
As said above the blueprint is created in init.py. Good way to learn the approach with these problems.
init.py
from flask import Blueprint
bp = Blueprint('main', __name__)
from app.main import routes
Forms
These are unchanged
Routes
The routes now point to the blueprint rather than the app.
from flask import render_template, flash, redirect
from app.main.forms import LoginForm
from app.main import bp
@bp.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
flash('Login requested for user {}, remember_me={}'.format(
form.username.data, form.remember_me.data))
return redirect('/index')
return render_template('login.html', title='Sign In', form=form)
Adding Data
Once the database problems were resolve I could add data with the flask shell command. E.g.
flask shell
db
u1 = User(username='john', email='john@example.com')
u2 = User(username='sum', email='sum@example.com')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
u1.follow(u2)
u2.followers
u2.followers.all()
Relationships
Introduction
The tutorial built a messaging system between the users. So I thought it would be useful to show how this was achieved. This just shows how to implement the database changes.
Database
This is the table to hold the message. Nothing fancy here.
id = db.Column(db.Integer, primary_key=True)
sender_id = db.Column(db.Integer, db.ForeignKey('user.id'))
recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'))
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
def __repr__(self):
return '<Message {}>'.format(self.body)
One-2-Many Relationship
We add this against the User and provide a function to get the number of new messages.
messages_sent = db.relationship('Message',
foreign_keys='Message.sender_id',
backref='author', lazy='dynamic')
messages_received = db.relationship('Message',
foreign_keys='Message.recipient_id',
backref='recipient', lazy='dynamic')
last_message_read_time = db.Column(db.DateTime)
def new_messages(self):
last_read_time = self.last_message_read_time or datetime(1900, 1, 1)
return Message.query.filter_by(recipient=self).filter(
Message.timestamp > last_read_time).count()
Update Database
flask db migrate -m "private messages"
flask db upgrade
Authentication
Log In
The Flask-Login manages state of logins so I guess it is a bit like passport.
In the user model we specify we want to use the default implementations. The load_user is required as Flask-Login does not know about databases. Good example is provided.
from flask_login import UserMixin
@login.user_loader
def load_user(id):
return User.query.get(int(id))
class User(UserMixin,db.Model):
Protecting Routes
We can protect routes by add the folllowing decorator.
from flask_login import login_required
@bp.route('/')
@bp.route('/index')
@login_required
Next
There is some explanation of the use of next on the tutorial so maybe worth logging here as passport does redirecting.
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('main.login'))
login_user(user, remember=form.remember_me.data)
next_page = request.args.get('next')
if not next_page or url_parse(next_page).netloc != '':
next_page = url_for('main.index')
return redirect(next_page)
- If the login URL does not have a next argument, then the user is redirected to the index page.
- If the login URL includes a next argument that is set to a relative path (or in other words, a URL without the domain portion), then the user is redirected to that URL.
- If the login URL includes a next argument that is set to a full URL that includes a domain name, then the user is redirected to the index page.
The third case is in place to make the application more secure. To determine if the URL is relative or absolute, I parse it with Werkzeug's url_parse() function and then check if the netloc component is set or not.
Sign Up
This consisted of
- making a form
- making a template
- adding the route
All of the code was straight forward.
Error Handling
First thing is probably
export FLASK_DEBUG=1
flask run myapp
Error Handler Page
Here is the custom error page. You will need to register the blueprint and make an __init__.py
from flask import render_template
from app.errors import bp
from app import db
@bp.app_errorhandler(404)
def not_found_error(error):
return render_template('404.html'), 404
@bp.app_errorhandler(500)
def internal_error(error):
db.session.rollback()
return render_template('500.html'), 500
Here is the 404
{% extends "base.html" %}
{% block content %}
<h1>File Not Found</h1>
<p><a href="{{ url_for('main.index') }}">Back</a></p>
{% endblock %}
And the 500
{% extends "base.html" %}
{% block content %}
<h1>An unexpected error has occurred</h1>
<p>The administrator has been notified. Sorry for the inconvenience!</p>
<p><a href="{{ url_for('main.index') }}">Back</a></p>
{% endblock %}
Emailing the Error
Quite liked the proactive approach from the beginning. Lets define the extract config
MAIL_SERVER = os.environ.get('MAIL_SERVER')
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 25)
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') is not None
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
ADMINS = ['your-email@example.com']
And configure it at startup.
import logging
from logging.handlers import SMTPHandler
...
if not app.debug:
if app.config['MAIL_SERVER']:
auth = None
if app.config['MAIL_USERNAME'] or app.config['MAIL_PASSWORD']:
auth = (app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'])
secure = None
if app.config['MAIL_USE_TLS']:
secure = ()
mail_handler = SMTPHandler(
mailhost=(app.config['MAIL_SERVER'], app.config['MAIL_PORT']),
fromaddr='no-reply@' + app.config['MAIL_SERVER'],
toaddrs=app.config['ADMINS'], subject='Microblog Failure',
credentials=auth, secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
Run a fake python SMTP server
python -m smtpd -n -c DebuggingServer localhost:8025
Logging to File
Here is some sample code for logging. This comes with rotate.
file_handler = RotatingFileHandler('logs/microblog.log', maxBytes=10240,
backupCount=10)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Microblog startup')
And some sample output
2021-05-25 03:51:27,196 INFO: Microblog startup [in /workspaces/bcnu1701flask/microblog/app/__init__.py:44]
2021-05-25 03:51:29,723 INFO: Adding Mail Handler [in /workspaces/bcnu1701flask/microblog/app/__init__.py:60]
Unit Testing
Example
The tutorial has a database and the relationships are like twitter. A follows B, B follows A and C etc. Below is an example of the testing for the database.
import unittest
from datetime import datetime, timedelta
from app import create_app, db
from app.models import User, Post
from config import Config
class TestConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite://'
class UserModelCase(unittest.TestCase):
def setUp(self):
self.app = create_app(TestConfig)
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
def test_password_hashing(self):
u = User(username='susan')
u.set_password('cat')
self.assertFalse(u.check_password('dog'))
self.assertTrue(u.check_password('cat'))
def test_avatar(self):
u = User(username='john', email='john@example.com')
self.assertEqual(u.avatar(128), ('https://www.gravatar.com/avatar/'
'd4c74594d841139328695756648b6bd6'
'?d=identicon&s=128'))
def test_follow(self):
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
self.assertEqual(u1.followed.all(), [])
self.assertEqual(u1.followers.all(), [])
u1.follow(u2)
db.session.commit()
self.assertTrue(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 1)
self.assertEqual(u1.followed.first().username, 'susan')
self.assertEqual(u2.followers.count(), 1)
self.assertEqual(u2.followers.first().username, 'john')
u1.unfollow(u2)
db.session.commit()
self.assertFalse(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 0)
self.assertEqual(u2.followers.count(), 0)
def test_follow_posts(self):
# create four users
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
u3 = User(username='mary', email='mary@example.com')
u4 = User(username='david', email='david@example.com')
db.session.add_all([u1, u2, u3, u4])
# create four posts
now = datetime.utcnow()
p1 = Post(body="post from john", author=u1,
timestamp=now + timedelta(seconds=1))
p2 = Post(body="post from susan", author=u2,
timestamp=now + timedelta(seconds=4))
p3 = Post(body="post from mary", author=u3,
timestamp=now + timedelta(seconds=3))
p4 = Post(body="post from david", author=u4,
timestamp=now + timedelta(seconds=2))
db.session.add_all([p1, p2, p3, p4])
db.session.commit()
if __name__ == '__main__':
unittest.main()
Outputting Results
Not written many python tests but without the call to unittest.main() there is no output.
Pagination In Flask
No alarms and no surprises here.
- Page 1, implicit: http://localhost:5000/index
- Page 1, explicit: http://localhost:5000/index?page=1
- Page 3: http://localhost:5000/index?page=3
This is managed with Flask-SQLAlchemy with the paginate command
user.followed_posts().paginate(1, 20, False).items
Introduction
Flask provides two packages for Flask. With my recent work on JWt will be interested in how this works.
pip install flask-mail
pip install pyjwt
With Flask extension, like express or other approaches you need to create and instance and then attach it to the app
# ...
from flask_mail import Mail
app = Flask(__name__)
# ...
mail = Mail()
...
def create_app(config_class=Config):
app = Flask(__name__)
...
mail.init_app(app)
from threading import Thread
from flask import render_template, current_app
from flask_mail import Message
from app import mail
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(subject, sender, recipients, text_body, html_body,
attachments=None, sync=False):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
if attachments:
for attachment in attachments:
msg.attach(*attachment)
if sync:
mail.send(msg)
else:
Thread(target=send_async_email,
args=(current_app._get_current_object(), msg)).start()
def send_password_reset_email(user):
token = user.get_reset_password_token()
send_email('[Microblog] Reset Your Password',
sender=current_app.config['ADMINS'][0],
recipients=[user.email],
text_body=render_template('email/reset_password.txt',
user=user, token=token),
html_body=render_template('email/reset_password.html',
user=user, token=token))
Let's plug this into password reset.
Creating the Boiler Plate
As ever we need a
- link
- form
- html
- route
This is here a reminder of how to go and put any page into a flask app
Link
<p>
Forgot Your Password?
<a href="{{ url_for('main.reset_password_request') }}">Click to Reset It</a>
</p>
Form
class ResetPasswordRequestForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email()])
submit = SubmitField('Request Password Reset')
class ResetPasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField(
'Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Request Password Reset')
Html
{% extends "base.html" %}
{% block content %}
<h1>Reset Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
{% extends "base.html" %}
{% block content %}
<h1>Reset Your Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
Route
from app.forms import ResetPasswordRequestForm
from app.email import send_password_reset_email
@app.route('/reset_password_request', methods=['GET', 'POST'])
@bp.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = ResetPasswordRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_password_reset_email(user)
flash('Check your email for the instructions to reset your password')
return redirect(url_for('main.login'))
return render_template('reset_password_request.html',
title='Reset Password', form=form)
@bp.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for('main.index'))
user = User.verify_reset_password_token(token)
if not user:
return redirect(url_for('index'))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash('Your password has been reset.')
return redirect(url_for('main.login'))
return render_template('reset_password.html', form=form)
Managing Jwt
Here is an example of encode and decode for the Python implementation of Jwt. Few surprises here
import jwt
from app import app
class User(UserMixin, db.Model):
# ...
def get_reset_password_token(self, expires_in=600):
return jwt.encode(
{'reset_password': self.id, 'exp': time() + expires_in},
app.config['SECRET_KEY'], algorithm='HS256')
@staticmethod
def verify_reset_password_token(token):
try:
id = jwt.decode(token, app.config['SECRET_KEY'],
algorithms=['HS256'])['reset_password']
except:
return
return User.query.get(id)
UI
Before
It looks pretty awful the app as it stands. Let see what bootstrap can do. Here it is before.
Install
We need to install the wrapper. No shown but we need to set up the bootstrap in the app too.
pip install flask-bootstrap
Base Template
{% extends 'bootstrap/base.html' %}
{% block title %}
{% if title %}{{ title }} - Microblog{% else %}Welcome to Microblog{% endif %}
{% endblock %}
{% block navbar %}
<nav class="navbar navbar-default">
... navigation bar here (see complete code on GitHub) ...
</nav>
{% endblock %}
{% block content %}
<div class="container">
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for message in messages %}
<div class="alert alert-info" role="alert">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
{# application content needs to be provided in the app_content block #}
{% block app_content %}{% endblock %}
</div>
{% endblock %}
Quick Forms
This does look like a code saver but I do wonder how many will be happy with the software rendering the controls compared to you. Having said that it did do a pretty good job for a simple render.
First the old way
{% extends "base.html" %}
{% block content %}
<h1>Register</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
Using quick forms
{% extends "base.html" %}
{% import 'bootstrap/wtf.html' as wtf %}
{% block app_content %}
<h1>Register</h1>
<div class="row">
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
</div>
{% endblock %}
Post Bootstrap
Hopes were not high but it is not too bad. Still wondering really what the use case is for flask as a web frontend.
Dates and Times
Introduction
Someone somewhere needs to sort this out. There is just so much work involved in something which really has two parameters. Time in the world and where you are. For flask they have gone for Moment form moment.js. So we install as ever with
pip install flask-moment
Add it to the app (not shown)
Usage
And here is some usage. The scripts block that I added here is another block exported by Flask-Bootstrap's base template. The super is required to preserves the content from the base template
{% block scripts %}
{{ super() }}
{{ moment.include_moment() }}
{% endblock %}
And this will show the date/time
{% if user.last_seen %}
<p>Last seen on: {{ moment(user.last_seen).format('LLL') }}</p>
{% endif %}
And,,,
<a href="{{ url_for('user', username=post.author.username) }}">
{{ post.author.username }}
</a>
said {{ moment(post.timestamp).fromNow() }}:
<br>
{{ post.body }}
Multi Lanaguage
Configuration
For Flask we use Flask-Babel set up in the app. We specify in the Config the languages we support in an array.
LANGUAGES = ['en', 'es']
We need to load this into the app. This object provides a high-level interface to work with the Accept-Language header that clients send with a request.
from flask import request
...
@babel.localeselector
def get_locale():
return request.accept_languages.best_match(app.config['LANGUAGES'])
from app import models
Usage
From there we need to use it
from flask_babel import _
# ...
flash(_('Your post is now live!'))
Other approach for interpolation
flash(_('User %(username)s not found.', username=username))
Flask-Babel provides a lazy evaluation version of _() that is called lazy_gettext() This is used when the language is not known at the time of usage. E.g. build the labels for form fields.
from flask_babel import lazy_gettext as _l
class LoginForm(FlaskForm):
username = StringField(_l('Username'), validators=[DataRequired()])
# ...
Creating list of translations
Once you have wrapped or your string in _ or _1 then we need a list to provide the other languages. The instructions will be left to google and https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-xiii-i18n-and-l10n
To here is the quick answer because I needed it for other parts to the tutorial
Create a babel.cfg in the root
[python: app/**.py]
[jinja2: app/templates/**.html]
extensions=jinja2.ext.autoescape,jinja2.ext.with_
Do the extract and generate in the root (Gosh LC_MESSAGES back in time)
#Extract
pybabel extract -F babel.cfg -k _l -o messages.pot .
#Generate
pybabel init -i messages.pot -d app/translations -l es
Now we need to compile to .po format
pybabel compile -d app/translations
Update the file we can do the following
pybabel extract -F babel.cfg -k _l -o messages.pot .
pybabel update -i messages.pot -d app/translations
So to change the submit button we
- Change the form to have _1()
- Extract and Compile
Change the form to have _1()
from flask_babel import _, lazy_gettext as _l
...
class PostForm(FlaskForm):
post = TextAreaField('Say something', validators=[
DataRequired(), Length(min=1, max=140)])
submit = SubmitField(_l('Submit'))
...
Extract and Compile
pybabel extract -F babel.cfg -k _l -o messages.pot .
pybabel update -i messages.pot -d app/translations
Axios
Well the demo is going to add translation for new posts. The steps were
- Identify the source language of the text
- Get the preferred language
- Build a translate() Function
- Add a Route to do Call translate()
- Add Placeholder for Translated Text
- Add JS Function to Call Service
- Trigger the call to translate()
Identify the source language of the text
There is a package for this
pip install guess_language-spirit
We add a new column onto the Post table.
class Post(db.Model):
# ...
language = db.Column(db.String(5))
Uodate the database
flask db migrate -m "add language to posts"
flask db upgrade
Now add a default to the post form
def index():
form = PostForm()
if form.validate_on_submit():
language = guess_language(form.post.data)
if language == 'UNKNOWN' or len(language) > 5:
language = ''
post = Post(body=form.post.data, author=current_user)
db.session.add(post)
db.session.commit()
flash('Your post is now live!')
....
We also add a g.local to get the current language.
@bp.before_request
def before_request():
if current_user.is_authenticated:
current_user.last_seen = datetime.utcnow()
db.session.commit()
g.locale = str(get_locale())
Now add the to the post template to show the link if required
{% if post.language and post.language != g.locale %}
<br><br>
<a href="#">{{ _('Translate') }}</a>
{% endif %}
Build a translate() Function
Setup the Environment to use Azure Function
Signed up to Microsoft Azure to test this. Basically you set up and environment variable with the key in the service e.g.
"env": {
"FLASK_APP": "microblog.py",
"FLASK_ENV": "development",
"FLASK_DEBUG": "0",
"MAIL_SERVER": "localhost",
"MAIL_PORT": "8025",
"MS_TRANSLATOR_KEY": "xxxxxxxxxxx",
"MS_TRANSLATOR_TEXT_REGION": "australiaeast",
},
Create translate() function
Import request
pip install requests
And create a function to do the translation
import requests
from flask_babel import _, current_app
from app import create_app
def translate(text, source_language, dest_language):
if 'MS_TRANSLATOR_KEY' not in current_app.config or \
not current_app.config['MS_TRANSLATOR_KEY']:
return _('Error: the translation key service is not configured.')
if 'MS_TRANSLATOR_TEXT_REGION' not in current_app.config or \
not current_app.config['MS_TRANSLATOR_TEXT_REGION']:
return _('Error: the translation service region is not configured.')
auth = {
'Ocp-Apim-Subscription-Key': current_app.config['MS_TRANSLATOR_KEY'],
'Ocp-Apim-Subscription-Region': current_app.config['MS_TRANSLATOR_TEXT_REGION']
}
r = requests.post(
'https://api.cognitive.microsofttranslator.com/translate?api-version=3.0'
'&from={}&to={}'.format(
source_language, dest_language), headers=auth, json=[{'Text': text}])
if r.status_code != 200:
print(current_app.config['MS_TRANSLATOR_KEY'])
print('Hekko')
print(r)
return _('Error: the translation service failed.')
return r.json()[0]['translations'][0]['text']
Add a Route to do Call translate()
We just wrap the call in a post request
from flask import jsonify
from app.translate import translate
@bp.route('/translate', methods=['POST'])
@login_required
def translate_text():
return jsonify({'text': translate(request.form['text'],
request.form['source_language'],
request.form['dest_language'])})
Add Placeholder for Translated Text
<span id="post{{ post.id }}">{{ post.body }}</span>
<span id="translation{{ post.id }}">
<a href="#">{{ _('Translate') }}</a>
</span>
Add JS Function to Call Service
This JavaScript is probably the interesting bit as it shows how to call a asynchronous function in Flask.
{% block scripts %}
...
<script>
function translate(sourceElem, destElem, sourceLang, destLang) {
$(destElem).html('<img src="{{ url_for('static', filename='loading.gif') }}">');
$.post('/translate', {
text: $(sourceElem).text(),
source_language: sourceLang,
dest_language: destLang
}).done(function(response) {
$(destElem).text(response['text'])
}).fail(function() {
$(destElem).text("{{ _('Error: Could not contact server.') }}");
});
}
</script>
{% endblock %}
Trigger the call to translate()
<span id="translation{{ post.id }}">
<a href="javascript:translate(
'#post{{ post.id }}',
'#translation{{ post.id }}',
'{{ post.language }}',
'{{ g.locale }}');">{{ _('Translate') }}</a>
</span>
Stuff to Know
DotEnv
Python support dotenv similar to nodejs
pip install python-dotenv
We can no initialize this in config
import os
from dotenv import load_dotenv
basedir = os.path.abspath(os.path.dirname(__file__))
load_dotenv(os.path.join(basedir, '.env'))
class Config(object):
...
SECRET_KEY = os.environ.get('SECRET_KEY')
MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY')
MS_TRANSLATOR_TEXT_REGION = os.environ.get('MS_TRANSLATOR_TEXT_REGION')
List Packages to Requirements.txt
Easy Pezzy lemon squeezy
pip freeze > requirements.txt
Elastic Search
Fixes
This was the hardest part of the tutorial. There was one small error where you need to create the index in the elastic search. It is there in the text but not obvious to do so here we go
flask shell
from app.models import Post
Post.reindex()
==Introduction==
The tutorial led me to using Elastic Search. Used a bit before so should be interesting how to integrate with Flask.<br><br>
I used this to install on ubuntu https://linuxize.com/post/how-to-install-elasticsearch-on-ubuntu-20-04
<br>
<br>
<syntaxhighlight lang="bash">
pip install elasticsearch
Don't forget to set the values in /etc/elasticsearch/elasticsearch.yml
network.host: 192.168.1.xx
discovery.seed_hosts: ["192.168.1.xx"]
Configuration
Lets add the URL to the config
class Config(object):
# ...
ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL')
Use it in the app
from elasticsearch import Elasticsearch
def create_app(config_class=Config):
...
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
if app.config['ELASTICSEARCH_URL'] else None
Update Model
Let change the model to include all the fields which can be searched. We only have one but who knows next time.
class Post(db.Model):
__searchable__ = ['body']
# ...
Search Function
So here is the big moment. The search function.
- Add a search
- Remove a search
- Query
from flask import current_app
def add_to_index(index, model):
if not current_app.elasticsearch:
return
payload = {}
for field in model.__searchable__:
payload[field] = getattr(model, field)
current_app.elasticsearch.index(index=index, id=model.id, body=payload)
def remove_from_index(index, model):
if not current_app.elasticsearch:
return
current_app.elasticsearch.delete(index=index, id=model.id)
def query_index(index, query, page, per_page):
if not current_app.elasticsearch:
return [], 0
search = current_app.elasticsearch.search(
index=index,
body={'query': {'multi_match': {'query': query, 'fields': ['*']}},
'from': (page - 1) * per_page, 'size': per_page})
ids = [int(hit['_id']) for hit in search['hits']['hits']]
return ids, search['hits']['total']['value']
Adding Search to the Post Model
Struggling to understand this so bear with. We create Mixens, which are shared static functions I think to the model. Not sure why they put them in the same file. Maybe convention rather than style. The Search returns a list of objects based on the ids which match the expression. The before_commit, after commit look like low level functions for locking.
from app.search import add_to_index, remove_from_index, query_index
class SearchableMixin(object):
@classmethod
def search(cls, expression, page, per_page):
ids, total = query_index(cls.__tablename__, expression, page, per_page)
if total == 0:
return cls.query.filter_by(id=0), 0
when = []
for i in range(len(ids)):
when.append((ids[i], i))
return cls.query.filter(cls.id.in_(ids)).order_by(
db.case(when, value=cls.id)), total
@classmethod
def before_commit(cls, session):
session._changes = {
'add': list(session.new),
'update': list(session.dirty),
'delete': list(session.deleted)
}
@classmethod
def after_commit(cls, session):
for obj in session._changes['add']:
if isinstance(obj, SearchableMixin):
add_to_index(obj.__tablename__, obj)
for obj in session._changes['update']:
if isinstance(obj, SearchableMixin):
add_to_index(obj.__tablename__, obj)
for obj in session._changes['delete']:
if isinstance(obj, SearchableMixin):
remove_from_index(obj.__tablename__, obj)
session._changes = None
@classmethod
def reindex(cls):
for obj in cls.query:
add_to_index(cls.__tablename__, obj)
db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)
Add a Search Form
class SearchForm(FlaskForm):
q = StringField(_l('Search'), validators=[DataRequired()])
def __init__(self, *args, **kwargs):
if 'formdata' not in kwargs:
kwargs['formdata'] = request.args
if 'csrf_enabled' not in kwargs:
kwargs['csrf_enabled'] = False
super(SearchForm, self).__init__(*args, **kwargs)
Create Search Form
We can create the app wide function in routes before request.
from flask import g
from app.main.forms import SearchForm
@bp.before_app_request
def before_request():
if current_user.is_authenticated:
current_user.last_seen = datetime.utcnow()
db.session.commit()
g.search_form = SearchForm()
g.locale = str(get_locale())
And here is the html
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
... home and explore links ...
</ul>
{% if g.search_form %}
<form class="navbar-form navbar-left" method="get"
action="{{ url_for('main.search') }}">
<div class="form-group">
{{ g.search_form.q(size=20, class='form-control',
placeholder=g.search_form.q.label.text) }}
</div>
</form>
{% endif %}
Search Results Form
And the results of the search
{% extends "base.html" %}
{% block app_content %}
<h1>{{ _('Search Results') }}</h1>
{% for post in posts %}
{% include '_post.html' %}
{% endfor %}
<nav aria-label="...">
<ul class="pager">
<li class="previous{% if not prev_url %} disabled{% endif %}">
<a href="{{ prev_url or '#' }}">
<span aria-hidden="true">←</span>
{{ _('Previous results') }}
</a>
</li>
<li class="next{% if not next_url %} disabled{% endif %}">
<a href="{{ next_url or '#' }}">
{{ _('Next results') }}
<span aria-hidden="true">→</span>
</a>
</li>
</ul>
</nav>
{% endblock %}
Deployment
WSGI
This stands for Web Service Gateway Interface. This is a specification used to implement the WebServer which originally came out of mod_python from Apache. Advantages include
- Flexibility (Gunicorn and uWSGI seem to be the favs)
- Scalability You can set the amount of requests to be handled
- Speed
- Used with Django, Flask, CherryPi etc
Gunincorn
Fairly simple to implement, number of workers, and the name of the app
gunicorn -b localhost:8000 -w 4 microblog:app
Supervisor
The supervisor utility uses configuration files that tell it what programs to monitor and how to restart them when necessary. Configuration files must be stored in /etc/supervisor/conf.d.
[program:microblog]
command=/home/ubuntu/microblog/venv/bin/gunicorn -b localhost:8000 -w 4 microblog:app
directory=/home/ubuntu/microblog
user=ubuntu
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
And now we can run our app
sudo supervisorctl reload
Deployment
Introduction
We are going to install with Gunicorn and Nginx. Why take two servers into the shower? Well the answer is functionality from what I understand. Nginx is the grunt but gunicorn has the WSGI interface.
WSGI
This stands for Web Service Gateway Interface. This is a specification used to implement the WebServer which originally came out of mod_python from Apache. Advantages include
- Flexibility (Gunicorn and uWSGI seem to be the favs)
- Scalability You can set the amount of requests to be handled
- Speed
- Used with Django, Flask, CherryPi etc
Gunincorn
Fairly simple to implement, number of workers, and the name of the app
gunicorn -b localhost:8000 -w 4 microblog:app
Supervisor
The supervisor utility uses configuration files that tell it what programs to monitor and how to restart them when necessary. Configuration files must be stored in /etc/supervisor/conf.d.
[program:microblog]
command=/home/ubuntu/microblog/venv/bin/gunicorn -b localhost:8000 -w 4 microblog:app
directory=/home/ubuntu/microblog
user=ubuntu
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
And now we can run our app
sudo supervisorctl reload
Nginx (N-GIN-X)
I am going to move off Apache and onto Nginx so lets see how this works. We have had a great relationship but cannot ignore how popular this is. Just migrated and wow the speed on Nginx is incredible.
server {
# listen on port 80 (http)
listen 80;
server_name _;
location / {
# redirect any requests to the same URL but on https
return 301 https://$host$request_uri;
}
}
server {
# listen on port 443 (https)
listen 443 ssl;
server_name _;
# location of the self-signed SSL certificate
ssl_certificate /home/ubuntu/microblog/certs/cert.pem;
ssl_certificate_key /home/ubuntu/microblog/certs/key.pem;
# write access and error logs to /var/log
access_log /var/log/microblog_access.log;
error_log /var/log/microblog_error.log;
location / {
# forward application requests to the gunicorn server
proxy_pass http://localhost:8000;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /static {
# handle static files directly, without forwarding to the application
alias /home/ubuntu/microblog/app/static;
expires 30d;
}
}
Docker
Well more on Docker and this time flask. Needd to put the work to date into a container. Attempting the script on the tutorial failed probably because I was using Fask 2.0. It relies on SQLAlchemy which in my version relied on greenlet and then gcc. Putting it in an Alpine container meant I either install gcc and g++ or went for a two phase container. No prizes what I did
FROM python:3.9-alpine as base
RUN mkdir -p /home/microblog cd /home/microblog COPY . /home/microblog
WORKDIR /home/microblog
RUN apk add --update \
postgresql-dev \
gcc g++ \
musl-dev \
linux-headers
RUN python -m venv venv
RUN venv/bin/pip install greenlet
FROM python:3.9-alpine
RUN adduser -D microblog
COPY --from=base /home/microblog /home/microblog
WORKDIR /home/microblog
COPY requirements.txt requirements.txt
RUN venv/bin/pip install wheel
RUN venv/bin/pip install -r requirements.txt
RUN venv/bin/pip install gunicorn pymysql
COPY app app
COPY migrations migrations
COPY microblog.py config.py boot.sh ./
RUN chmod +x boot.sh
ENV FLASK_APP microblog.py
RUN chown -R microblog:microblog ./
USER microblog
EXPOSE 5000
ENTRYPOINT ["./boot.sh"]
You can then run the container with the command below. The --net=host allows the container to access the host ports. e.g. localhost:3306
docker run --name microblog --net=host -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true -e REDIS_URL=redis://localhost:6379/0 -e MAIL_USERNAME=emailusername -e MAIL_PASSWORD=emailpass -e DATABASE_URL=mysql+pymysql://user:password@localhost:3306/databasename containername:latest
Two Features with JavaScript
This is showing how to implement two features where client and server are involved.
Show User Profile
This will display the Profile of a user when the user hovers over another user. On the server side we simply create a route with a popup page to be rendered. On the client side we mange the calling of this route using the mouse in and mouse out event.
Server Side
- Create a route
- Make a template
Create a route
@bp.route('/user/<username>/popup')
@login_required
def user_popup(username):
user = User.query.filter_by(username=username).first_or_404()
form = EmptyForm()
return render_template('user_popup.html', user=user, form=form)
Create a template
<table class="table">
<tr>
<td width="64" style="border: 0px;"><img src="{{ user.avatar(64) }}"></td>
<td style="border: 0px;">
<p><a href="{{ url_for('main.user', username=user.username) }}">{{ user.username }}</a></p>
<small>
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
{% if user.last_seen %}
<p>{{ _('Last seen on') }}: {{ moment(user.last_seen).format('lll') }}</p>
{% endif %}
<p>
{{ _('%(count)d followers', count=user.followers.count()) }},
{{ _('%(count)d following', count=user.followed.count()) }
}</p>
{% if user != current_user %}
{% if not current_user.is_following(user) %}
<p>
<form action="{{ url_for('main.follow', username=user.username) }}" method="post">
{{ form.hidden_tag() }}
{{ form.submit(value=_('Follow'), class_='btn btn-default btn-sm') }}
</form>
</p>
{% else %}
<p>
<form action="{{ url_for('main.unfollow', username=user.username) }}" method="post">
{{ form.hidden_tag() }}
{{ form.submit(value=_('Unfollow'), class_='btn btn-default btm-sm') }}
</form>
</p>
{% endif %}
{% endif %}
</small>
</td>
</tr>
</table>
Client Side
Show how to use mouseover and implement the call to the route.
- Mouse Over
- Expanded Mouse Over
Mouse Over
We create a mouse over event when the user hover over a tag with the the class .user_popup. This is the shell of what to do. We need to hanndle mouse iin and mouse out.
$(function() {
$('.user_popup').hover(
function(event) {
// mouse in event handler
var elem = $(event.currentTarget);
},
function(event) {
// mouse out event handler
var elem = $(event.currentTarget);
}
)
});
Expanded Mouse Over
This is the expanded version which calls the server-side route above. The mouse-out event cleans up the timer event which will leak without it.
$(function() {
var timer = null;
var xhr = null;
$('.user_popup').hover(
function(event) {
// mouse in event handler
var elem = $(event.currentTarget);
timer = setTimeout(function() {
timer = null;
xhr = $.ajax(
'/user/' + elem.first().text().trim() + '/popup').done(
function(data) {
xhr = null;
elem.popover({
trigger: 'manual',
html: true,
animation: false,
container: elem,
content: data
}).popover('show');
flask_moment_render_all();
}
);
}, 1000);
},
function(event) {
// mouse out event handler
var elem = $(event.currentTarget);
if (timer) {
clearTimeout(timer);
timer = null;
}
else if (xhr) {
xhr.abort();
xhr = null;
}
else {
elem.popover('destroy');
}
}
)
});
Show Notifications
This feature will poll a new table Notification and display the new messages when they arrive.
Server Side
On the server side we will store the data and initiate the insert into the new Notification table when a message is created
- Create a notification Table
- Create relationship to User
- Create an Add notification function
- Create Route which returns the message count
- Set Notifications to 0 when Messages route is Rendered
Create a notification Table
Create to hold entry when new message created.
class Notification(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), index=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
timestamp = db.Column(db.Float, index=True, default=time)
payload_json = db.Column(db.Text)
def get_data(self):
return json.loads(str(self.payload_json))
Create relationship to User
Now connect the table to the user.
class User(UserMixin, db.Model):
# ...
notifications = db.relationship('Notification', backref='user',
lazy='dynamic')
Create an Add notification function
Add function to insert a row when user create a private message. On the creation of a message this is the function to be called.
def add_notification(self, name, data):
self.notifications.filter_by(name=name).delete()
n = Notification(name=name, payload_json=json.dumps(data), user=self)
db.session.add(n)
return n
Create Route which returns the message count
This will be called by the client
@bp.route('/notifications')
@login_required
def notifications():
since = request.args.get('since', 0.0, type=float)
notifications = current_user.notifications.filter(
Notification.timestamp > since).order_by(Notification.timestamp.asc())
return jsonify([{
'name': n.name,
'data': n.get_data(),
'timestamp': n.timestamp
} for n in notifications])
Set Notifications to 0 when Messages route is Rendered
If the user is on the messages page the assumption is they must have read them so we can set the unread message count to 0
@bp.route('/messages')
@login_required
def messages():
current_user.last_message_read_time = datetime.utcnow()
current_user.add_notification('unread_message_count', 0)
db.session.commit()
# ...
Client Side
The first two functions are relatively simple. It is the last function which is the part which is important to understand.
- Hide Message if there are none
- Provide a function to set Message Count
- Poll for notifications
Hide Message if there are none
This is just an example of using visible with jinja
<li>
<a href="{{ url_for('main.messages') }}">
{{ _('Messages') }}
{% set new_messages = current_user.new_messages() %}
<span id="message_count" class="badge"
style="visibility: {% if new_messages %}visible
{% else %}hidden {% endif %};">
{{ new_messages }}
</span>
</a>
</li>
Provide a function to set Message Count=
This is just a function to set the message count used in the above
{% block scripts %}
<script>
// ...
function set_message_count(n) {
$('#message_count').text(n);
$('#message_count').css('visibility', n ? 'visible' : 'hidden');
}
</script>
{% endblock %}
Poll for notifications
This essentially is an Ajax call every 10 seconds. If the user is authenticated then call the route on the server and record the timestamp.
{% block scripts %}
<script>
// ...
{% if current_user.is_authenticated %}
$(function() {
var since = 0;
setInterval(function() {
$.ajax('{{ url_for('main.notifications') }}?since=' + since).done(
function(notifications) {
for (var i = 0; i < notifications.length; i++) {
if (notifications[i].name == 'unread_message_count')
set_message_count(notifications[i].data);
since = notifications[i].timestamp;
}
}
);
}, 10000);
});
{% endif %}
</script>
Background Jobs (Message Queues)
Introduction
This is a simple example of use Redis to perform a background job. We are going to Redis to do this which have be installed on ubuntu.
Redis
Redis uses
Protocol | Port | Connection Source |
---|---|---|
TCP | 8070, 8071 | Internal, External |
TCP | 8443 | External |
TCP | 9081 | Active-Active |
TCP | 9443 (Recommended), 8080 | Internal, External, Active-Active |
you can install it with
sudo apt install redis
Demo
To run the demo you will need to terminals. One terminal where you submit the job and another where the job is executed.
Install the Redis Client
This is a demo of how to use redis. First install the python client
pip install rq
Start the Worker
On the worker terminal
For the job running the worker queue you will need to be in the directory where the task being submitted resides. In this case it is app.tasks.example. (See below)
rq worker microblog-tasks
Create a Task
Now lets make an example task which just runs a function for a given number of seconds. Whilst we are not using the Flask App this is shown to illustrate creating a task within your app where the Worker will need an instance.
from app import create_app
app = create_app()
app.app_context().push()
import time
def example(seconds):
job = get_current_job()
print('Starting task')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed')
Execute a Task
On the submission terminal
We need too
- import the client
- connect the worker to the server
- queue the example
from redis import Redis
import rq
queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
job = queue.enqueue('app.tasks.example', 23)
job.get_id()
Result
The task is executed on the worker terminal and we can monitor this in the submission terminal with
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True
Using Redis In the App
Configuration In the App
In the tutorial they created a Task table to store the ongoing tasks which will allow for when the user logs out. To initialize the Redis a config item is created and initialized in the __init__.py
def create_app(config_class=Config):
...
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
Set User State
We need to make sure we restore the state of the tasks when the use logs in. To do this we provide three functions.
class User(UserMixin, db.Model):
# ...
def launch_task(self, name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
*args, **kwargs)
task = Task(id=rq_job.get_id(), name=name, description=description,
user=self)
db.session.add(task)
return task
def get_tasks_in_progress(self):
return Task.query.filter_by(user=self, complete=False).all()
def get_task_in_progress(self, name):
return Task.query.filter_by(name=name, user=self,
complete=False).first()
Create the Task
We then need to create a task which will be performed. In our cases it is an export task (Not Shown). At the end of the task an email is sent with the data.
Updating the Progress
def _set_task_progress(progress):
job = get_current_job()
if job:
job.meta['progress'] = progress
job.save_meta()
task = Task.query.get(job.get_id())
task.user.add_notification('task_progress', {'task_id': job.get_id(),
'progress': progress})
if progress >= 100:
task.complete = True
db.session.commit()
Perform Export and Invoking Updates
This is the core of the task where it iterates over the posts and updates the progress. Wasn't going to list it but a good example for future.
user = User.query.get(user_id)
_set_task_progress(0)
data = []
i = 0
total_posts = user.posts.count()
for post in user.posts.order_by(Post.timestamp.asc()):
data.append({'body': post.body,
'timestamp': post.timestamp.isoformat() + 'Z'})
time.sleep(5)
i += 1
_set_task_progress(100 * i // total_posts)
Sending By Email
def export_posts(user_id):
try:
# ...
send_email('[Microblog] Your blog posts',
sender=app.config['ADMINS'][0], recipients=[user.email],
text_body=render_template('email/export_posts.txt', user=user),
html_body=render_template('email/export_posts.html', user=user),
attachments=[('posts.json', 'application/json',
json.dumps({'posts': data}, indent=4))],
sync=True)
except:
# ...
finally:
# ...
REST API with Flask
Introduction
We are going to create a RESET API for the users table. The structure offered was
- users.py
- errors.py
- tokens.py
from app.api import bp
@bp.route('/users/<int:id>', methods=['GET'])
def get_user(id):
pass
@bp.route('/users', methods=['GET'])
def get_users():
pass
@bp.route('/users/<int:id>/followers', methods=['GET'])
def get_followers(id):
pass
@bp.route('/users/<int:id>/followed', methods=['GET'])
def get_followed(id):
pass
@bp.route('/users', methods=['POST'])
def create_user():
pass
@bp.route('/users/<int:id>', methods=['PUT'])
def update_user(id):
pass
Create Json from Python
Python using the dictionary pattern so this is relatively trivial to output Json
class User(UserMixin, db.Model):
...
def to_dict(self, include_email=False):
data = {
'id': self.id,
'username': self.username,
'last_seen': self.last_seen.isoformat() + 'Z',
'about_me': self.about_me,
'post_count': self.posts.count(),
'follower_count': self.followers.count(),
'followed_count': self.followed.count(),
'_links': {
'self': url_for('api.get_user', id=self.id),
'followers': url_for('api.get_followers', id=self.id),
'followed': url_for('api.get_followed', id=self.id),
'avatar': self.avatar(128)
}
}
Outputting lists of Users
This is a representation of how the users will look including pagination and hypermedia
{
"items": [
{ ... user resource ... },
{ ... user resource ... },
...
],
"_meta": {
"page": 1,
"per_page": 10,
"total_pages": 20,
"total_items": 195
},
"_links": {
"self": "http://localhost:5000/api/users?page=1",
"next": "http://localhost:5000/api/users?page=2",
"prev": null
}
}
Implementation in Python
We can use a Mixen for this
class PaginatedAPIMixin(object):
@staticmethod
def to_collection_dict(query, page, per_page, endpoint, **kwargs):
resources = query.paginate(page, per_page, False)
data = {
'items': [item.to_dict() for item in resources.items],
'_meta': {
'page': page,
'per_page': per_page,
'total_pages': resources.pages,
'total_items': resources.total
},
'_links': {
'self': url_for(endpoint, page=page, per_page=per_page,
**kwargs),
'next': url_for(endpoint, page=page + 1, per_page=per_page,
**kwargs) if resources.has_next else None,
'prev': url_for(endpoint, page=page - 1, per_page=per_page,
**kwargs) if resources.has_prev else None
}
}
return data
Error Handling
No must to see here.
def error_response(status_code, message=None):
payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
if message:
payload['message'] = message
response = jsonify(payload)
response.status_code = status_code
return response
def bad_request(message):
return error_response(400, message)
Error Handlers to customize want is return to the client. The wants_json_response() looks at want was passed as a preference and responds appropriately for the status code.
from flask import render_template, request
from app import db
from app.errors import bp
from app.api.errors import error_response as api_error_response
def wants_json_response():
return request.accept_mimetypes['application/json'] >= \
request.accept_mimetypes['text/html']
@bp.app_errorhandler(404)
def not_found_error(error):
if wants_json_response():
return api_error_response(404)
return render_template('errors/404.html'), 404
@bp.app_errorhandler(500)
def internal_error(error):
db.session.rollback()
if wants_json_response():
return api_error_response(500)
return render_template('errors/500.html'), 500
Implementations
GET Many
@bp.route('/users', methods=['GET'])
def get_users():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
data = User.to_collection_dict(User.query, page, per_page, 'api.get_users')
return jsonify(data)
GET One
@bp.route('/users/<int:id>', methods=['GET'])
def get_user(id):
return jsonify(User.query.get_or_404(id).to_dict())
PUT
@bp.route('/users/<int:id>', methods=['PUT'])
def update_user(id):
user = User.query.get_or_404(id)
data = request.get_json() or {}
if 'username' in data and data['username'] != user.username and \
User.query.filter_by(username=data['username']).first():
return bad_request('please use a different username')
if 'email' in data and data['email'] != user.email and \
User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
user.from_dict(data, new_user=False)
db.session.commit()
return jsonify(user.to_dict())
POST
@bp.route('/users', methods=['POST'])
def create_user():
data = request.get_json() or {}
if 'username' not in data or 'email' not in data or 'password' not in data:
return bad_request('must include username, email and password fields')
if User.query.filter_by(username=data['username']).first():
return bad_request('please use a different username')
if User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
user = User()
user.from_dict(data, new_user=True)
db.session.add(user)
db.session.commit()
response = jsonify(user.to_dict())
response.status_code = 201
response.headers['Location'] = url_for('api.get_user', id=user.id)
return response
Token Authentication
Introduction
Flask-HttpAuth provide the mechanism for using tokens. It looks like it does support roles to but it seemed like this was not a big thing at the time but is a must now. This example is taken from https://flask-httpauth.readthedocs.io/en/latest/
from flask import Flask, g
from flask_httpauth import HTTPTokenAuth
app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')
tokens = {
"secret-token-1": "john",
"secret-token-2": "susan"
}
@auth.verify_token
def verify_token(token):
if token in tokens:
return tokens[token]
@app.route('/')
@auth.login_required
def index():
return "Hello, {}!".format(auth.current_user())
if __name__ == '__main__':
app.run()