mirror of
				https://github.com/avatao-content/test-tutorial-framework
				synced 2025-11-04 15:52:55 +00:00 
			
		
		
		
	Perform mercilless separation of bussiness logic from request handling
This commit is contained in:
		
							
								
								
									
										11
									
								
								solvable/src/webservice/crypto.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								solvable/src/webservice/crypto.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,11 @@
 | 
			
		||||
from passlib.hash import pbkdf2_sha256
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PasswordHasher:
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def hash(password):
 | 
			
		||||
        return pbkdf2_sha256.hash(password)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def verify(password, hash):
 | 
			
		||||
        return pbkdf2_sha256.verify(password, hash)
 | 
			
		||||
							
								
								
									
										6
									
								
								solvable/src/webservice/exceptions.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										6
									
								
								solvable/src/webservice/exceptions.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,6 @@
 | 
			
		||||
class InvalidCredentialsError(RuntimeError):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class UserExistsError(RuntimeError):
 | 
			
		||||
    pass
 | 
			
		||||
@@ -1,12 +1,26 @@
 | 
			
		||||
from sqlalchemy import Column, Integer, String, create_engine
 | 
			
		||||
from sqlalchemy.ext.declarative import declarative_base
 | 
			
		||||
from sqlalchemy.orm import sessionmaker
 | 
			
		||||
from passlib.hash import pbkdf2_sha256
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
engine = create_engine('sqlite:///db.db', convert_unicode=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Base = declarative_base()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class User(Base):
 | 
			
		||||
    __tablename__ = 'users'
 | 
			
		||||
 | 
			
		||||
    id = Column(Integer, primary_key=True)
 | 
			
		||||
    username = Column(String, nullable=False, unique=True)
 | 
			
		||||
    passwordhash = Column(String, nullable=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def init_db():
 | 
			
		||||
    Base.metadata.create_all(bind=engine)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Session:
 | 
			
		||||
    session = None
 | 
			
		||||
 | 
			
		||||
@@ -23,28 +37,3 @@ class Session:
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, exc_type, exc_val, exc_tb):
 | 
			
		||||
        self.session.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Base = declarative_base()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class User(Base):
 | 
			
		||||
    __tablename__ = 'users'
 | 
			
		||||
 | 
			
		||||
    id = Column(Integer, primary_key=True)
 | 
			
		||||
    username = Column(String, nullable=False, unique=True)
 | 
			
		||||
    passwordhash = Column(String, nullable=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def init_db():
 | 
			
		||||
    Base.metadata.create_all(bind=engine)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PasswordHasher:
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def hash(password):
 | 
			
		||||
        return pbkdf2_sha256.hash(password)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def verify(password, hash):
 | 
			
		||||
        return pbkdf2_sha256.verify(password, hash)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,9 @@ from os import urandom, getenv
 | 
			
		||||
 | 
			
		||||
from flask import Flask, render_template, request, session, url_for
 | 
			
		||||
 | 
			
		||||
from model import init_db, User, Session, PasswordHasher
 | 
			
		||||
from model import init_db
 | 
			
		||||
from user_ops import UserOps
 | 
			
		||||
from exceptions import InvalidCredentialsError, UserExistsError
 | 
			
		||||
 | 
			
		||||
BASEURL = getenv('BASEURL', '')
 | 
			
		||||
init_db()
 | 
			
		||||
@@ -18,15 +20,15 @@ app.jinja_env.globals.update(get_url=get_url)
 | 
			
		||||
@app.route('/', methods=['GET', 'POST'])
 | 
			
		||||
def index():
 | 
			
		||||
    if request.method == 'POST':
 | 
			
		||||
        with Session() as db:
 | 
			
		||||
            user = db.query(User).filter(User.username == request.form['username']).first()
 | 
			
		||||
        try:
 | 
			
		||||
            UserOps(request.form.get('username'),
 | 
			
		||||
                    request.form.get('password')).authenticate()
 | 
			
		||||
        except InvalidCredentialsError:
 | 
			
		||||
            return render_template('login.html', alert='Invalid credentials!')
 | 
			
		||||
 | 
			
		||||
            if not user or not PasswordHasher.verify(request.form['password'], user.passwordhash):
 | 
			
		||||
                return render_template('login.html', alert='Invalid credentials!')
 | 
			
		||||
 | 
			
		||||
            session['logged_in'] = True
 | 
			
		||||
            session['username'] = request.form['username']
 | 
			
		||||
            return render_template('internal.html')
 | 
			
		||||
        session['logged_in'] = True
 | 
			
		||||
        session['username'] = request.form['username']
 | 
			
		||||
        return render_template('internal.html')
 | 
			
		||||
    
 | 
			
		||||
    if session.get('logged_in'):
 | 
			
		||||
        return render_template('internal.html')
 | 
			
		||||
@@ -36,20 +38,21 @@ def index():
 | 
			
		||||
@app.route('/register', methods=['GET', 'POST'])
 | 
			
		||||
def register():
 | 
			
		||||
    if request.method == 'POST':
 | 
			
		||||
        if not all([request.form.get('username'), request.form.get('password'), request.form.get('passwordconfirm')]):
 | 
			
		||||
        if not all([request.form.get('username'),
 | 
			
		||||
                    request.form.get('password'),
 | 
			
		||||
                    request.form.get('passwordconfirm')]):
 | 
			
		||||
            return render_template('register.html', alert='You need to fill everything.')
 | 
			
		||||
        if request.form['password'] != request.form['passwordconfirm']:
 | 
			
		||||
            return render_template('register.html', alert='Passwords do not match! Please try again.')
 | 
			
		||||
 | 
			
		||||
        with Session() as db:
 | 
			
		||||
            if db.query(User).filter(User.username == request.form['username']).all():
 | 
			
		||||
                return render_template('register.html', alert='Username already in use.')
 | 
			
		||||
        try:
 | 
			
		||||
            UserOps(request.form.get('username'),
 | 
			
		||||
                    request.form.get('password')).register()
 | 
			
		||||
        except UserExistsError:
 | 
			
		||||
            return render_template('register.html', alert='Username already in use.')
 | 
			
		||||
 | 
			
		||||
            db.add(User(username=request.form['username'],
 | 
			
		||||
                        passwordhash=PasswordHasher.hash(request.form['password'])))
 | 
			
		||||
            db.commit()
 | 
			
		||||
 | 
			
		||||
        return render_template('login.html', success='Account "{}" successfully registered. You can log in now!'.format(request.form['username']))
 | 
			
		||||
        return render_template('login.html', success=('Account "{}" successfully registered. '
 | 
			
		||||
                                                      'You can log in now!'.format(request.form['username'])))
 | 
			
		||||
 | 
			
		||||
    return render_template('register.html')
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										28
									
								
								solvable/src/webservice/user_ops.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								solvable/src/webservice/user_ops.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,28 @@
 | 
			
		||||
from crypto import PasswordHasher
 | 
			
		||||
from model import Session, User
 | 
			
		||||
from exceptions import InvalidCredentialsError, UserExistsError
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class UserOps:
 | 
			
		||||
    def __init__(self, username, password):
 | 
			
		||||
        self.username = username
 | 
			
		||||
        self.password = password
 | 
			
		||||
 | 
			
		||||
    def authenticate(self):
 | 
			
		||||
        with Session() as db:
 | 
			
		||||
 | 
			
		||||
            user = db.query(User).filter(User.username == self.username).first()
 | 
			
		||||
 | 
			
		||||
            if not user or not PasswordHasher.verify(self.password, user.passwordhash):
 | 
			
		||||
                raise InvalidCredentialsError
 | 
			
		||||
 | 
			
		||||
    def register(self):
 | 
			
		||||
        with Session() as db:
 | 
			
		||||
 | 
			
		||||
            if db.query(User).filter(User.username == self.username).all():
 | 
			
		||||
                raise UserExistsError
 | 
			
		||||
 | 
			
		||||
            user = User(username=self.username,
 | 
			
		||||
                        passwordhash=PasswordHasher.hash(self.password))
 | 
			
		||||
            db.add(user)
 | 
			
		||||
            db.commit()
 | 
			
		||||
		Reference in New Issue
	
	Block a user