项目结构
api_login/ ├── app/ │ ├── controllers/ │ │ └── account_controller.py │ ├── models/ │ │ ├── __init__.py │ │ ├── user.py │ │ └── user_device.py │ ├── services/ │ │ └── authentication_service.py │ ├── __init__.py │ └── config.py ├── migrations/ ├── tests/ ├── requirements.txt └── run.py
依赖安装
pip install flask flask_sqlalchemy flask_migrate flask_httpauth
配置文件(config.py)
import os class Config: SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess' SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///site.db' SQLALCHEMY_TRACK_MODIFICATIONS = False
初始化文件(run.py)
from app import create_app, db from app.models import User, UserDevice app = create_app() @app.shell_context_processor def make_shell_context(): return dict(db=db, User=User, UserDevice=UserDevice) if __name__ == '__main__': app.run(debug=True)
5. 应用工厂函数(app/__init__.py)
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from config import Config db = SQLAlchemy() migrate = Migrate() def create_app(): app = Flask(__name__) app.config.from_object(Config) db.init_app(app) migrate.init_app(app, db) from app import models from .controllers import account_controller as account_blueprint app.register_blueprint(account_blueprint) return app
6. 用户模型(app/models/user.py)
from . import db from werkzeug.security import generate_password_hash, check_password_hash from flask_sqlalchemy import SQLAlchemy class User(db.Model): id = db.Column(db.Integer, primary_key=True) login_id = db.Column(db.String(80), unique=True, nullable=False) password = db.Column(db.String(120), nullable=False) is_active = db.Column(db.Boolean, default=True) def set_password(self, password): self.password = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password, password)
7. 设备模型(app/models/user_device.py)
from . import db from datetime import datetime from itsdangerous import URLSafeTimedSerializer, SignatureExpired from flask import current_app, request, abort, jsonify class UserDevice(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) device_type = db.Column(db.Integer, nullable=False) client_id = db.Column(db.String(80), nullable=False) passkey = db.Column(db.String(255), nullable=False) create_time = db.Column(db.DateTime, default=datetime.utcnow)
8. 认证服务(app/services/authentication_service.py)
from . import db, User, UserDevice from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadData, SignatureExpired from flask import request, current_app, jsonify import datetime from hashlib import md5 import uuid auth_s = Serializer(current_app.config['SECRET_KEY'], expires_in=60*60*24) # 1 day expiration time def get_user_by_login_id(login_id): return User.query.filter_by(login_id=login_id).first() def get_user_device(user_id, device_type): return UserDevice.query.filter_by(user_id=user_id, device_type=device_type).first() def generate_sessionkey(user_id, login_name, device_type): key = f"{user_id}{login_name}{datetime.utcnow().isoformat()}{str(uuid.uuid4())}".encode('utf-8') return md5(key).hexdigest()
9. 控制器(app/controllers/account_controller.py)
from flask import Blueprint, request, jsonify, current_app as app from flask_httpauth import HTTPTokenAuth from itsdangerous import SignatureExpired, BadData from models import User, UserDevice from services.authentication_service import get_user_by_login_id, get_user_device, generate_sessionkey from flask import abort account_blueprint = Blueprint('account', __name__) auth = HTTPTokenAuth(scheme='Bearer') @account_blueprint.route('/account/login', methods=['POST']) def login(): data = request.get_json() or {} login_id = data.get('loginIdorEmail') hashedPassword = data.get('hashedPassword') deviceType = data.get('deviceType', 0) clientId = data.get('clientId', '') if not login_id or not hashedPassword: abort(400, description="Missing required parameters") nowUser = get_user_by_login_id(login_id) if not nowUser: abort(404, description="Account Not Exists") if not nowUser.check_password(hashedPassword): abort(401, description="Wrong Password") if not nowUser.is_active: abort(400, description="The user is inactive") existsDevice = get_user_device(nowUser.id, deviceType) if not existsDevice: passkey = generate_sessionkey(nowUser.id, nowUser.login_id, deviceType) existsDevice = UserDevice(user_id=nowUser.id, device_type=deviceType, client_id=clientId, passkey=passkey) db.session.add(existsDevice) db.session.commit() return jsonify({'SessionKey': existsDevice.passkey, 'UserInfo': {'user_id': nowUser.id, 'login_id': nowUser.login_id}})
各位小伙伴们,我刚刚为大家分享了有关“api登录源码”的知识,希望对你们有所帮助。如果您还有其他相关问题需要解决,欢迎随时提出哦!
原创文章,作者:K-seo,如若转载,请注明出处:https://www.kdun.cn/ask/695406.html