Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2020 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of jerrycan. 

5 

6jerrycan is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11jerrycan is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with jerrycan. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import os 

21from typing import Union 

22from werkzeug import Response 

23from smtplib import SMTPAuthenticationError 

24from flask import Blueprint, redirect, url_for, request, render_template, flash 

25from flask_login import login_required, current_user, logout_user, login_user 

26from puffotter.crypto import generate_hash, generate_random 

27from puffotter.recaptcha import verify_recaptcha 

28from puffotter.smtp import send_email 

29from jerrycan.base import app, db 

30from jerrycan.Config import Config 

31from jerrycan.db.User import User 

32from jerrycan.db.TelegramChatId import TelegramChatId 

33 

34 

35def define_blueprint(blueprint_name: str) -> Blueprint: 

36 """ 

37 Defines the blueprint for this route 

38 :param blueprint_name: The name of the blueprint 

39 :return: The blueprint 

40 """ 

41 blueprint = Blueprint(blueprint_name, __name__) 

42 

43 @blueprint.route("/login", methods=["GET", "POST"]) 

44 def login() -> Union[Response, str]: 

45 """ 

46 Page that allows the user to log in 

47 :return: The response 

48 """ 

49 if request.method == "POST": 

50 username = request.form["username"].strip() 

51 password = request.form["password"] 

52 remember_me = request.form.get("remember_me") in ["on", True] 

53 

54 user: User = User.query.filter_by(email=username).first() 

55 if user is None: 

56 user = User.query.filter_by(username=username).first() 

57 

58 if user is None: 

59 flash(Config.STRINGS["user_does_not_exist"], "danger") 

60 elif current_user.is_authenticated: 

61 flash(Config.STRINGS["user_already_logged_in"], "info") 

62 elif not user.confirmed: 

63 flash(Config.STRINGS["user_is_not_confirmed"], "danger") 

64 elif not user.verify_password(password): 

65 flash(Config.STRINGS["invalid_password"], "danger") 

66 else: 

67 login_user(user, remember=remember_me) 

68 flash(Config.STRINGS["logged_in"], "success") 

69 app.logger.info(f"User {current_user.username} logged in.") 

70 return redirect(url_for("static.index")) 

71 return redirect(url_for("user_management.login")) 

72 else: 

73 return render_template( 

74 "user_management/login.html", 

75 **Config.TEMPLATE_EXTRAS["login"]() 

76 ) 

77 

78 @blueprint.route("/logout", methods=["GET"]) 

79 @login_required 

80 def logout() -> Union[Response, str]: 

81 """ 

82 Logs out the user 

83 :return: The response 

84 """ 

85 app.logger.info("User {} logged out.".format(current_user.username)) 

86 logout_user() 

87 flash(Config.STRINGS["logged_out"], "info") 

88 return redirect(url_for("static.index")) 

89 

90 @blueprint.route("/register", methods=["GET", "POST"]) 

91 def register() -> Union[Response, str]: 

92 """ 

93 Page that allows a new user to register 

94 :return: The response 

95 """ 

96 if request.method == "POST": 

97 username = request.form["username"] 

98 email = request.form["email"] 

99 password = request.form["password"] 

100 password_repeat = request.form["password-repeat"] 

101 recaptcha_result = verify_recaptcha( 

102 request.remote_addr, 

103 request.form.get("g-recaptcha-response", ""), 

104 Config.RECAPTCHA_SECRET_KEY 

105 ) 

106 

107 all_users = User.query.all() 

108 usernames = [user.username for user in all_users] 

109 emails = [user.email for user in all_users] 

110 

111 _min, _max = Config.MIN_USERNAME_LENGTH, Config.MAX_USERNAME_LENGTH 

112 

113 if len(username) < _min or len(username) > _max: 

114 flash(Config.STRINGS["username_length"] 

115 .format(_min, _max), "danger") 

116 elif password != password_repeat: 

117 flash(Config.STRINGS["passwords_do_not_match"], "danger") 

118 elif username in usernames: 

119 flash(Config.STRINGS["username_already_exists"], "danger") 

120 elif email in emails: 

121 flash(Config.STRINGS["email_already_in_use"], "danger") 

122 elif not recaptcha_result: 

123 flash(Config.STRINGS["recaptcha_incorrect"], "danger") 

124 else: 

125 confirmation_key = generate_random(32) 

126 confirmation_hash = generate_hash(confirmation_key) 

127 user = User( 

128 username=username, 

129 email=email, 

130 password_hash=generate_hash(password), 

131 confirmation_hash=confirmation_hash 

132 ) 

133 db.session.add(user) 

134 db.session.commit() 

135 email_msg = render_template( 

136 "email/registration.html", 

137 domain_name=Config.DOMAIN_NAME, 

138 host_url=Config.base_url(), 

139 target_url=os.path.join(Config.base_url(), "confirm"), 

140 username=username, 

141 user_id=user.id, 

142 confirm_key=confirmation_key, 

143 **Config.TEMPLATE_EXTRAS["registration_email"]() 

144 ) 

145 try: 

146 send_email( 

147 email, 

148 Config.STRINGS["registration_email_title"], 

149 email_msg, 

150 Config.SMTP_HOST, 

151 Config.SMTP_ADDRESS, 

152 Config.SMTP_PASSWORD, 

153 Config.SMTP_PORT 

154 ) 

155 except SMTPAuthenticationError: # pragma: no cover 

156 app.logger.error("Failed to authenticate SMTP, could not " 

157 "send confirmation email to user") 

158 flash("SMTP AUTHENTICATION ERROR", "danger") 

159 app.logger.info("User {} registered.".format(user.username)) 

160 flash(Config.STRINGS["registration_successful"], "info") 

161 return redirect(url_for("static.index")) 

162 return redirect(url_for("user_management.register")) 

163 else: 

164 return render_template( 

165 "user_management/register.html", 

166 **Config.TEMPLATE_EXTRAS["register"]() 

167 ) 

168 

169 @blueprint.route("/confirm", methods=["GET"]) 

170 def confirm() -> Union[Response, str]: 

171 """ 

172 Confirms a user 

173 :return: The response 

174 """ 

175 user_id = int(request.args["user_id"]) 

176 confirm_key = request.args["confirm_key"] 

177 user: User = User.query.get(user_id) 

178 

179 if user is None: 

180 flash(Config.STRINGS["user_does_not_exist"], "danger") 

181 elif user.confirmed: 

182 flash(Config.STRINGS["user_already_confirmed"], "warning") 

183 elif not user.verify_confirmation(confirm_key): 

184 flash(Config.STRINGS["confirmation_key_invalid"], "warning") 

185 else: 

186 user.confirmed = True 

187 db.session.commit() 

188 flash(Config.STRINGS["user_confirmed_successfully"], "success") 

189 return redirect(url_for("static.index")) 

190 

191 @blueprint.route("/forgot", methods=["POST", "GET"]) 

192 def forgot() -> Union[Response, str]: 

193 """ 

194 Allows a user to reset their password 

195 :return: The response 

196 """ 

197 if request.method == "POST": 

198 email = request.form["email"] 

199 recaptcha_result = verify_recaptcha( 

200 request.remote_addr, 

201 request.form.get("g-recaptcha-response", ""), 

202 Config.RECAPTCHA_SECRET_KEY 

203 ) 

204 user: User = User.query.filter_by(email=email).first() 

205 

206 if not recaptcha_result: 

207 flash(Config.STRINGS["recaptcha_incorrect"], "danger") 

208 return redirect(url_for("user_management.forgot")) 

209 else: 

210 if user is None: 

211 # Fail silently to ensure that a potential attacker can't 

212 # use the response to figure out information 

213 # on registered users 

214 pass 

215 else: 

216 new_pass = generate_random(20) 

217 user.password_hash = generate_hash(new_pass) 

218 db.session.commit() 

219 

220 email_msg = render_template( 

221 "email/forgot_password.html", 

222 domain_name=Config.DOMAIN_NAME, 

223 host_url=Config.base_url(), 

224 target_url=os.path.join(Config.base_url(), "login"), 

225 password=new_pass, 

226 username=user.username, 

227 **Config.TEMPLATE_EXTRAS["forgot_email"]() 

228 ) 

229 try: 

230 send_email( 

231 email, 

232 Config.STRINGS["password_reset_email_title"], 

233 email_msg, 

234 Config.SMTP_HOST, 

235 Config.SMTP_ADDRESS, 

236 Config.SMTP_PASSWORD, 

237 Config.SMTP_PORT 

238 ) 

239 except SMTPAuthenticationError: # pragma: no cover 

240 app.logger.error("SMTP Authentication failed") 

241 flash("SMTP AUTHENTICATION FAILED", "info") 

242 flash(Config.STRINGS["password_was_reset"], "success") 

243 return redirect(url_for("static.index")) 

244 

245 else: 

246 return render_template( 

247 "user_management/forgot.html", 

248 **Config.TEMPLATE_EXTRAS["forgot"]() 

249 ) 

250 

251 @blueprint.route("/profile", methods=["GET"]) 

252 @login_required 

253 def profile() -> Union[Response, str]: 

254 """ 

255 Allows a user to edit their profile details 

256 :return: The response 

257 """ 

258 chat_id = TelegramChatId.query.filter_by(user=current_user).first() 

259 return render_template( 

260 "user_management/profile.html", 

261 **Config.TEMPLATE_EXTRAS["profile"](), 

262 telegram_chat_id=chat_id 

263 ) 

264 

265 @blueprint.route("/change_password", methods=["POST"]) 

266 @login_required 

267 def change_password() -> Union[Response, str]: 

268 """ 

269 Allows the user to change their password 

270 :return: The response 

271 """ 

272 old_password = request.form["old_password"] 

273 new_password = request.form["new_password"] 

274 password_repeat = request.form["password_repeat"] 

275 user: User = current_user 

276 

277 if new_password != password_repeat: 

278 flash(Config.STRINGS["passwords_do_not_match"], "danger") 

279 elif not user.verify_password(old_password): 

280 flash(Config.STRINGS["invalid_password"], "danger") 

281 else: 

282 user.password_hash = generate_hash(new_password) 

283 db.session.commit() 

284 flash(Config.STRINGS["password_changed"], "success") 

285 return redirect(url_for("user_management.profile")) 

286 

287 @blueprint.route("/delete_user", methods=["POST"]) 

288 @login_required 

289 def delete_user() -> Union[Response, str]: 

290 """ 

291 Allows a user to delete their account 

292 :return: The response 

293 """ 

294 password = request.form["password"] 

295 user: User = current_user 

296 

297 if not user.verify_password(password): 

298 flash(Config.STRINGS["invalid_password"], "danger") 

299 else: 

300 app.logger.info("Deleting user {}".format(user)) 

301 db.session.delete(user) 

302 db.session.commit() 

303 logout_user() 

304 flash(Config.STRINGS["user_was_deleted"], "success") 

305 return redirect(url_for("static.index")) 

306 return redirect(url_for("user_management.profile")) 

307 

308 @blueprint.route("/register_telegram", methods=["POST"]) 

309 @login_required 

310 def register_telegram() -> Union[Response, str]: 

311 """ 

312 Allows the user to register a telegram chat ID 

313 :return: The response 

314 """ 

315 telegram_chat_id = request.form["telegram_chat_id"] 

316 user: User = current_user 

317 chat_id = TelegramChatId.query.filter_by(user=user).first() 

318 

319 if chat_id is None: 

320 chat_id = TelegramChatId(user=user, chat_id=telegram_chat_id) 

321 db.session.add(chat_id) 

322 else: 

323 chat_id.chat_id = telegram_chat_id 

324 db.session.commit() 

325 

326 flash(Config.STRINGS["telegram_chat_id_set"], "success") 

327 chat_id.send_message(Config.STRINGS["telegram_chat_id_set"]) 

328 return redirect(url_for("user_management.profile")) # pragma: no cover 

329 

330 return blueprint