這篇文章介紹了 flask-login 是如何實(shí)現(xiàn)一個(gè)不需要使用數(shù)據(jù)庫(kù)的用戶認(rèn)證組件的。
在介紹 flask-login 的工作原理之前先來簡(jiǎn)要回顧一下 flask-login 的使用方法。
首先要?jiǎng)?chuàng)建一個(gè) LoginManager
的實(shí)例并注冊(cè)在 Flask
實(shí)例上,然后提供一個(gè) user_loader
回調(diào)函數(shù)來根據(jù)會(huì)話中存儲(chǔ)的用戶 ID 來加載用戶對(duì)象。
login_manager = LoginManager()login_manager.init_app(app)@login_manager.user_loaderdef load_user(user_id): return User.get(user_id)
flask-login 還要求你對(duì)數(shù)據(jù)對(duì)象做一些改動(dòng),添加以下屬性和方法:
@propertydef is_active(self): return True@propertydef is_authenticated(self): return True@propertydef is_anonymous(self): return False#: 這個(gè)方法返回一個(gè)能夠識(shí)別唯一用戶的 IDdef get_id(self): try: return text_type(self.id) except AttributeError: raise NotImplementedError('No `id` attribute - override `get_id`')
完成這些設(shè)置工作之后,就可以使用 flask-login 了,一些典型的用法包括登錄和登出用戶:login_user(user)
及 logout_user()
,及使用 @login_required
保護(hù)一些視圖函數(shù),檢測(cè)當(dāng)前用戶是否有訪問的權(quán)限(根據(jù)是否認(rèn)證進(jìn)行區(qū)別):
@app.route("/settings")@login_requireddef settings(): pass
以及通過 current_user
對(duì)象來訪問當(dāng)前用戶。
我們按照使用過程中調(diào)用 flask-login 的順序來解析其源碼。
先來看 LoginManager
對(duì)象,它用于記錄所有的配置信息,其 __init__
方法中初始化了這些配置信息。一個(gè) LoginManager
對(duì)象通過 init_app 方法注冊(cè)到 Flask 實(shí)例上:
def init_app(self, app, add_context_processor=True): app.login_manager = self app.after_request(self._update_remember_cookie) self._login_disabled = app.config.get('LOGIN_DISABLED', False) if add_context_processor: app.context_processor(_user_context_processor)
這個(gè)方法的主要工作是在 Flask 實(shí)例的 after_request 鉤子上添加了一個(gè)用戶更新 remember_me cookie 的函數(shù),并在 Flask 的上下文處理器中添加了一個(gè)用戶上下文處理器。
def _user_context_processor(): return dict(current_user=_get_user())
這個(gè)上下文處理器設(shè)置了一個(gè)全局可訪問的變量 current_user
,這樣我們就可以在視圖函數(shù)或者模板文件中訪問這個(gè)變量了。
然后就到了這個(gè)方法,它是 LoginManager 的實(shí)例方法,把 user_callback 設(shè)置成我們傳入的函數(shù),在實(shí)際的使用過程中,我們是通過修飾器傳入這個(gè)函數(shù)的,就是 load_user(user_id)
函數(shù)。
def user_loader(self, callback): self.user_callback = callback return callback
該方法要求你的回調(diào)函數(shù)必須能夠接收一個(gè) unicode 編碼的 ID 并返回一個(gè)用戶對(duì)象,如果用戶不存在就返回 None。
我們跳過對(duì) User 類的修改,直接來看這個(gè)方法。
def login_user(user, remember=False, force=False, fresh=True): if not force and not user.is_active: return False user_id = getattr(user, current_app.login_manager.id_attribute)() session['user_id'] = user_id session['_fresh'] = fresh session['_id'] = current_app.login_manager._session_identifier_generator() if remember: session['remember'] = 'set' _request_ctx_stack.top.user = user user_logged_in.send(current_app._get_current_object(), user=_get_user()) return True
如果用戶不活躍 not.is_active
而且不要求強(qiáng)制登錄 force
,就返回失敗。否則,先得到 user_id
,它是通過 getattr
函數(shù)訪問 user
的 login_manager.id_attribute
屬性得到的。追根溯源,最終 getattr
訪問的是 user
的 get_id
方法,這就是為什么 flask-login 要求我們?cè)?User
類中添加該方法。
然后在 Flask 提供的 session 中添加以下三個(gè) session:user_id
_fresh
_id
,其中 _id
是通過 LoginManager
的 _session_identifier_generator
方法獲取到的,而這個(gè)方法默認(rèn)綁定在這個(gè)方法上:
def _create_identifier(): user_agent = request.headers.get('User-Agent') if user_agent is not None: user_agent = user_agent.encode('utf-8') base = '{0}|{1}'.format(_get_remote_addr(), user_agent) if str is bytes: base = text_type(base, 'utf-8', errors='replace') # pragma: no cover h = sha512() h.update(base.encode('utf8')) return h.hexdigest()
不用太深究,知道這個(gè)方法最終根據(jù)放著用戶代理和 IP 信息生成了一個(gè)加鹽的 ID 就行了,它的作用是防止有人偽造 cookie。
然后根據(jù)是否需要記住用戶添加 remember
session。最后,在 _request_ctx_stack.top
中添加該用戶,發(fā)出一個(gè)用戶登錄信號(hào)后返回成功。在這個(gè)登錄信號(hào)中,調(diào)用了 _get_user
方法,_get_user
方法的細(xì)節(jié)是先檢測(cè)在 _request_ctx_stack.top
中有沒有用戶信息,如果沒有,就通過 _load_user
方法在棧頂添加用戶信息,如果有就返回這個(gè)用戶對(duì)象。_load_user
方法很重要,但是在這里不會(huì)被調(diào)用,很明顯 _request_ctx_stack.top
中肯定有 user
值,我們待會(huì)再來看這個(gè)方法。
def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None)
這個(gè)修飾器常被用來保護(hù)只有登錄用戶才能訪問的視圖函數(shù),它會(huì)在實(shí)際調(diào)用視圖函數(shù)之前先檢查當(dāng)前用戶是否已經(jīng)登錄并認(rèn)證,如果沒有,就調(diào)用 LoginManager.unauthorized
這個(gè)回調(diào)函數(shù),它還對(duì)一些 HTTP 方法和測(cè)試情況提供了例外處理。
def login_required(func): @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.login_manager._login_disabled: return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view
在之前的分析中,可以看到這個(gè)變量經(jīng)常出現(xiàn)并大有用途,開發(fā)者可以通過訪問這個(gè)變量來獲取到當(dāng)前用戶,如果用戶未登錄,獲取到的就是一個(gè)匿名用戶,它的定義:
current_user = LocalProxy(lambda: _get_user())
_get_user()
方法之前已經(jīng)講過,我們直接跳到 _load_user
方法。顯然,如果用戶登錄后再次發(fā)出了請(qǐng)求,我們就要從 cookie,或者說,F(xiàn)lask 在此之上封裝的 session 中獲取用戶信息才能正確地進(jìn)行后續(xù)處理,_load_user
方法的作用就是這個(gè),該方法如下:
def _load_user(self): user_accessed.send(current_app._get_current_object()) config = current_app.config if config.get('SESSION_PROTECTION', self.session_protection): deleted = self._session_protection() if deleted: return self.reload_user() is_missing_user_id = 'user_id' not in session if is_missing_user_id: cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME) has_cookie = (cookie_name in request.cookies and session.get('remember') != 'clear') if has_cookie: return self._load_from_cookie(request.cookies[cookie_name]) elif self.request_callback: return self._load_from_request(request) elif header_name in request.headers: return self._load_from_header(request.headers[header_name]) return self.reload_user()def _session_protection(self): sess = session._get_current_object() ident = self._session_identifier_generator() app = current_app._get_current_object() mode = app.config.get('SESSION_PROTECTION', self.session_protection) if sess and ident != sess.get('_id', None): if mode == 'basic' or sess.permanent: sess['_fresh'] = False session_protected.send(app) return False elif mode == 'strong': for k in SESSION_KEYS: sess.pop(k, None) sess['remember'] = 'clear' session_protected.send(app) return True return False
該方法首先保證 session 的安全,如果 session 通過了安全驗(yàn)證,就通過 reload_user
方法重載用戶,否則檢查 session 中是否沒有 user_id
來重載用戶,如果沒有,通過三種不同的方式重載用戶。
def reload_user(self, user=None): ctx = _request_ctx_stack.top if user is None: user_id = session.get('user_id') if user_id is None: ctx.user = self.anonymous_user() else: if self.user_callback is None: raise Exception( "No user_loader has been installed for this " "LoginManager. Add one with the " "'LoginManager.user_loader' decorator.") user = self.user_callback(user_id) if user is None: ctx.user = self.anonymous_user() else: ctx.user = user else: ctx.user = user
在這個(gè)重載方法中,如果 user_id
不存在,就把匿名用戶加載到 _request_ctx_stack.top
,否則根據(jù) user_id
加載用戶,若該用戶不存在,仍加載匿用戶。
之后,current_user
就能獲取到用戶對(duì)象,或者是一個(gè)匿名用戶對(duì)象了。
current_user = LocalProxy(lambda: _get_user())
這個(gè)方法先獲取當(dāng)前用戶,然后移除 user_id
_fresh
等 session,然后移除 remember
,最后重載當(dāng)前用戶,很明顯,重載之后會(huì)是一個(gè)匿名用戶。
def logout_user(): user = _get_user() if 'user_id' in session: session.pop('user_id') if '_fresh' in session: session.pop('_fresh') cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) if cookie_name in request.cookies: session['remember'] = 'clear' user_logged_out.send(current_app._get_current_object(), user=user) current_app.login_manager.reload_user() return True
記得我們之前提到 flask-login 在 Flask 實(shí)例的 after_request 鉤子上添加了一個(gè)用戶更新 remember_me cookie 的函數(shù)嗎,我們顯然需要在請(qǐng)求的最后對(duì) remember
進(jìn)行處理。
def _update_remember_cookie(self, response): # Don't modify the session unless there's something to do. if 'remember' in session: operation = session.pop('remember', None) if operation == 'set' and 'user_id' in session: self._set_cookie(response) elif operation == 'clear': self._clear_cookie(response) return response
這個(gè)函數(shù)根據(jù)是否要設(shè)置 remember
來調(diào)用不同的函數(shù)
def _set_cookie(self, response): config = current_app.config cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) duration = config.get('REMEMBER_COOKIE_DURATION', COOKIE_DURATION) domain = config.get('REMEMBER_COOKIE_DOMAIN') path = config.get('REMEMBER_COOKIE_PATH', '/') secure = config.get('REMEMBER_COOKIE_SECURE', COOKIE_SECURE) httponly = config.get('REMEMBER_COOKIE_HTTPONLY', COOKIE_HTTPONLY) data = encode_cookie(text_type(session['user_id'])) try: expires = datetime.utcnow() + duration except TypeError: raise Exception('REMEMBER_COOKIE_DURATION must be a ' + 'datetime.timedelta, instead got: {0}'.format( duration)) response.set_cookie(cookie_name, value=data, expires=expires, domain=domain, path=path, secure=secure, httponly=httponly) def _clear_cookie(self, response): config = current_app.config cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) domain = config.get('REMEMBER_COOKIE_DOMAIN') path = config.get('REMEMBER_COOKIE_PATH', '/') response.delete_cookie(cookie_name, domain=domain, path=path)
user_id
來記錄用戶身份,_id
來防止攻擊者對(duì) session 的偽造。_request_ctx_stack.top.user
,flask-login 實(shí)現(xiàn)了線程安全。其他功能如 fresh login 請(qǐng)自行查看源碼了解。
聯(lián)系客服