Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2020 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of otaku-info-web. 

5 

6otaku-info-web is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11otaku-info-web is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with otaku-info-web. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import time 

21from typing import List, Dict, Optional, Tuple 

22from puffotter.flask.base import db, app 

23from puffotter.flask.db.User import User 

24from otaku_info_web.db.MediaId import MediaId 

25from otaku_info_web.db.MediaItem import MediaItem 

26from otaku_info_web.db.MediaList import MediaList 

27from otaku_info_web.db.MediaListItem import MediaListItem 

28from otaku_info_web.db.MediaUserState import MediaUserState 

29from otaku_info_web.db.ServiceUsername import ServiceUsername 

30from otaku_info_web.utils.anilist.AnilistItem import AnilistUserItem 

31from otaku_info_web.utils.anilist.api import load_anilist 

32from otaku_info_web.utils.enums import ListService, MediaType, MediaSubType 

33 

34 

35def fetch_anilist_data(): 

36 """ 

37 Retrieves all entries on the anilists of all users that provided 

38 an anilist username 

39 :return: None 

40 """ 

41 start = time.time() 

42 app.logger.debug("Starting Anilist Update") 

43 usernames: List[ServiceUsername] = \ 

44 ServiceUsername.query.filter_by(service=ListService.ANILIST).all() 

45 anilist_data = { 

46 user: { 

47 media_type: load_anilist(user.username, media_type) 

48 for media_type in MediaType 

49 } 

50 for user in usernames 

51 } 

52 media_items, media_ids, media_user_states, media_lists, media_list_items\ 

53 = load_existing() 

54 

55 app.logger.debug("Updating Media Entries") 

56 update_media_entries( 

57 anilist_data, media_items, media_ids 

58 ) 

59 app.logger.debug("Updating Media User States") 

60 update_media_user_entries( 

61 anilist_data, media_items, media_ids, media_user_states 

62 ) 

63 app.logger.debug("Updating Media Lists") 

64 update_media_lists( 

65 anilist_data, 

66 media_items, 

67 media_ids, 

68 media_user_states, 

69 media_lists, 

70 media_list_items 

71 ) 

72 app.logger.info(f"Completed anilist update in {time.time() - start}") 

73 

74 

75def update_media_entries( 

76 anilist_data: Dict[ 

77 ServiceUsername, 

78 Dict[MediaType, List[AnilistUserItem]] 

79 ], 

80 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

81 media_ids: Dict[Tuple[ListService, int], MediaId] 

82): 

83 """ 

84 Updates the media entries and anilist IDs 

85 :param anilist_data: The anilist data to store 

86 :param media_items: The preloaded media items 

87 :param media_ids: The preloaded media IDs 

88 :return: None 

89 """ 

90 updated_ids: List[Tuple[ListService, int]] = [] 

91 updated_items: List[Tuple[str, MediaType, MediaSubType, str]] = [] 

92 

93 for media_type in MediaType: 

94 

95 anilist_entries: List[AnilistUserItem] = [] 

96 for data in anilist_data.values(): 

97 anilist_entries += data[media_type] 

98 

99 for anilist_entry in anilist_entries: 

100 item_tuple, media_item = fetch_media_item( 

101 anilist_entry, media_items 

102 ) 

103 if item_tuple not in updated_items: 

104 media_item = update_media_item(anilist_entry, media_item) 

105 media_items[item_tuple] = media_item 

106 updated_items.append(item_tuple) 

107 

108 id_tuple, media_id = fetch_media_id( 

109 anilist_entry, media_items, media_ids, media_item 

110 ) 

111 assert id_tuple is not None 

112 

113 if id_tuple not in updated_ids: 

114 media_item = media_items[item_tuple] 

115 media_id = update_media_id(anilist_entry, media_item, media_id) 

116 media_ids[id_tuple] = media_id 

117 updated_ids.append(id_tuple) 

118 

119 db.session.commit() 

120 return media_ids 

121 

122 

123def update_media_user_entries( 

124 anilist_data: Dict[ 

125 ServiceUsername, 

126 Dict[MediaType, List[AnilistUserItem]] 

127 ], 

128 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

129 media_ids: Dict[Tuple[ListService, int], MediaId], 

130 media_user_states: Dict[Tuple[int, int], MediaUserState] 

131): 

132 """ 

133 Updates the individual users' current state for media items in 

134 thei ranilist account. 

135 :param anilist_data: The anilist data to enter into the database 

136 :param media_items: Preloaded media items 

137 :param media_ids: Preloaded media IDs 

138 :param media_user_states: Preloaded media user states 

139 :return: None 

140 """ 

141 updated: List[Tuple[int, int]] = [] 

142 

143 for service_user, anilist in anilist_data.items(): 

144 user_states = { 

145 x: y for x, y in media_user_states.items() 

146 if y.user_id == service_user.user_id 

147 } 

148 

149 for media_type, anilist_entries in anilist.items(): 

150 for entry in anilist_entries: 

151 id_tuple, media_id = \ 

152 fetch_media_id(entry, media_items, media_ids) 

153 assert media_id is not None 

154 

155 user_state_id = (media_id.id, service_user.user_id) 

156 

157 if user_state_id in updated: 

158 continue 

159 

160 media_user_state = media_user_states.get(user_state_id) 

161 

162 media_user_state = update_media_user_state( 

163 entry, media_id, service_user.user, media_user_state 

164 ) 

165 

166 updated.append(user_state_id) 

167 media_user_states[user_state_id] = media_user_state 

168 

169 for user_state_tuple, user_state in user_states.items(): 

170 if user_state_tuple not in updated: 

171 db.session.delete(user_state) 

172 media_user_states.pop(user_state_tuple) 

173 

174 db.session.commit() 

175 

176 

177def update_media_lists( 

178 anilist_data: Dict[ 

179 ServiceUsername, 

180 Dict[MediaType, List[AnilistUserItem]] 

181 ], 

182 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

183 media_ids: Dict[Tuple[ListService, int], MediaId], 

184 media_user_states: Dict[Tuple[int, int], MediaUserState], 

185 media_lists: Dict[Tuple[str, int, ListService, MediaType], MediaList], 

186 media_list_items: Dict[Tuple[int, int], MediaListItem] 

187): 

188 """ 

189 Updates the database for anilist user lists. 

190 This includes custom anilist lists. 

191 :param anilist_data: The anilist data to enter into the database 

192 :param media_items: Preloaded media items 

193 :param media_ids: Preloaded media IDs 

194 :param media_user_states: The current media user states in the database 

195 :param media_lists: The media lists currently in the database 

196 :param media_list_items: The media list items currently in the database 

197 :return: None 

198 """ 

199 list_tuples_to_remove = list(media_lists.keys()) 

200 list_item_tuples_to_remove = list(media_list_items.keys()) 

201 

202 for service_user, anilist in anilist_data.items(): 

203 for media_type, entries in anilist.items(): 

204 for entry in entries: 

205 

206 list_tuple = ( 

207 entry.list_name, 

208 service_user.user_id, 

209 ListService.ANILIST, 

210 media_type 

211 ) 

212 if list_tuple in list_tuples_to_remove: 

213 list_tuples_to_remove.remove(list_tuple) 

214 media_list = media_lists.get(list_tuple) 

215 

216 if media_list is None: 

217 media_list = MediaList( 

218 user_id=service_user.user_id, 

219 name=entry.list_name, 

220 service=ListService.ANILIST, 

221 media_type=media_type 

222 ) 

223 db.session.add(media_list) 

224 db.session.commit() 

225 media_lists[list_tuple] = media_list 

226 

227 _, media_id = fetch_media_id(entry, media_items, media_ids) 

228 assert media_id is not None 

229 

230 state_tuple = (media_id.id, service_user.user_id) 

231 media_user_state = media_user_states[state_tuple] 

232 

233 list_item_tuple = (media_list.id, media_user_state.id) 

234 if list_item_tuple in list_item_tuples_to_remove: 

235 list_item_tuples_to_remove.remove(list_item_tuple) 

236 

237 if list_item_tuple not in media_list_items: 

238 list_item = MediaListItem( 

239 media_list=media_list, 

240 media_user_state=media_user_state 

241 ) 

242 db.session.add(list_item) 

243 

244 db.session.commit() 

245 

246 for list_tuple in list_tuples_to_remove: 

247 if list_tuple in media_lists: 

248 db.session.delete(media_lists.pop(list_tuple)) 

249 for list_item_tuple in list_item_tuples_to_remove: 

250 if list_item_tuple in media_list_items: 

251 db.session.delete(media_list_items.pop(list_item_tuple)) 

252 

253 db.session.commit() 

254 

255 

256def load_existing() -> Tuple[ 

257 Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

258 Dict[Tuple[ListService, int], MediaId], 

259 Dict[Tuple[int, int], MediaUserState], 

260 Dict[Tuple[str, int, ListService, MediaType], MediaList], 

261 Dict[Tuple[int, int], MediaListItem] 

262]: 

263 """ 

264 Loads current database contents, mapped to unique identifer tuples 

265 :return: The database contents 

266 """ 

267 app.logger.debug("Loading Existing data for anilist update") 

268 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem] = { 

269 (x.romaji_title, x.media_type, x.media_subtype, x.cover_url): x 

270 for x in MediaItem.query.all() 

271 } 

272 app.logger.debug("Finished loading MediaItems") 

273 media_ids: Dict[Tuple[ListService, int], MediaId] = { 

274 (x.service, x.media_item_id): x 

275 for x in MediaId.query.all() 

276 } 

277 app.logger.debug("Finished loading MediaIds") 

278 media_user_states: Dict[Tuple[int, int], MediaUserState] = { 

279 (x.media_id_id, x.user_id): x 

280 for x in MediaUserState.query.all() 

281 } 

282 app.logger.debug("Finished loading MediaUserStates") 

283 media_lists: Dict[Tuple[str, int, ListService, MediaType], MediaList] = { 

284 (x.name, x.user_id, x.service, x.media_type): x 

285 for x in MediaList.query.all() 

286 } 

287 app.logger.debug("Finished loading MediaLists") 

288 media_list_items: Dict[Tuple[int, int], MediaListItem] = { 

289 (x.media_list_id, x.media_user_state_id): x 

290 for x in MediaListItem.query.all() 

291 } 

292 app.logger.debug("Finished loading MediaListItems") 

293 return media_items, media_ids, media_user_states, \ 

294 media_lists, media_list_items 

295 

296 

297def fetch_media_item( 

298 anilist_entry: AnilistUserItem, 

299 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

300) -> Tuple[ 

301 Tuple[str, MediaType, MediaSubType, str], 

302 Optional[MediaItem] 

303]: 

304 """ 

305 Retrieves an existing media item based on anilist data 

306 :param anilist_entry: The anilist entry to use 

307 :param media_items: The preloaded media items 

308 :return: The media item, or None if none exists 

309 """ 

310 item_tuple = ( 

311 anilist_entry.romaji_title, 

312 anilist_entry.media_type, 

313 anilist_entry.media_subtype, 

314 anilist_entry.cover_url 

315 ) 

316 return item_tuple, media_items.get(item_tuple) 

317 

318 

319def fetch_media_id( 

320 anilist_entry: AnilistUserItem, 

321 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem], 

322 media_ids: Dict[Tuple[ListService, int], MediaId], 

323 media_item: Optional[MediaId] = None 

324) -> Tuple[Optional[Tuple[ListService, int]], Optional[MediaItem]]: 

325 """ 

326 Retrieves an existing media ID based on anilist data 

327 :param anilist_entry: The anilist entry to use 

328 :param media_items: The preloaded media items 

329 :param media_ids: The preloaded media IDs 

330 :param media_item: Optional media item associated with the ID. 

331 If not provided, will figure out using anilist data 

332 :return: The media ID, or None if none exists 

333 """ 

334 if media_item is None: 

335 _, media_item = fetch_media_item(anilist_entry, media_items) 

336 if media_item is None: 

337 return None, None 

338 else: 

339 id_tuple = ( 

340 ListService.ANILIST, 

341 media_item.id 

342 ) 

343 return id_tuple, media_ids.get(id_tuple) 

344 

345 

346def update_media_item( 

347 new_data: AnilistUserItem, 

348 existing: Optional[MediaItem] 

349) -> MediaItem: 

350 """ 

351 Updates or creates MediaItem database entries based on anilist data 

352 :param new_data: The new anilist data 

353 :param existing: The existing database entry. If None, will be created 

354 :return: The updated/created MediaItem object 

355 """ 

356 media_item = MediaItem() if existing is None else existing 

357 media_item.media_type = new_data.media_type 

358 media_item.media_subtype = new_data.media_subtype 

359 media_item.english_title = new_data.english_title 

360 media_item.romaji_title = new_data.romaji_title 

361 media_item.cover_url = new_data.cover_url 

362 media_item.latest_release = new_data.latest_release 

363 media_item.releasing_state = new_data.releasing_state 

364 

365 if existing is None: 

366 db.session.add(media_item) 

367 db.session.commit() 

368 return media_item 

369 

370 

371def update_media_id( 

372 new_data: AnilistUserItem, 

373 media_item: MediaItem, 

374 existing: Optional[MediaId] 

375) -> MediaId: 

376 """ 

377 Updates/Creates a MediaId database entry based on anilist data 

378 :param new_data: The anilist data to use 

379 :param media_item: The media item associated with the ID 

380 :param existing: The existing database entry. If None, will be created 

381 :return: The updated/created MediaId object 

382 """ 

383 media_id = MediaId() if existing is None else existing 

384 media_id.media_item = media_item 

385 media_id.service = ListService.ANILIST 

386 media_id.service_id = str(new_data.anilist_id) 

387 

388 if existing is None: 

389 db.session.add(media_id) 

390 db.session.commit() 

391 return media_id 

392 

393 

394def update_media_user_state( 

395 new_data: AnilistUserItem, 

396 media_id: MediaId, 

397 user: User, 

398 existing: Optional[MediaUserState] 

399) -> MediaUserState: 

400 """ 

401 Updates or creates a MediaUserState entry in the database 

402 :param new_data: The new anilist data 

403 :param media_id: The media ID of the anilist media item 

404 :param user: The user associated with the data 

405 :param existing: The existing database entry. If None, will be created 

406 :return: The updated/created MediaUserState object 

407 """ 

408 media_user_state = MediaUserState() if existing is None else existing 

409 media_user_state.media_id = media_id 

410 media_user_state.consuming_state = new_data.consuming_state 

411 media_user_state.score = new_data.score 

412 media_user_state.progress = new_data.progress 

413 media_user_state.user = user 

414 

415 if existing is None: 

416 db.session.add(media_user_state) 

417 db.session.commit() 

418 return media_user_state