Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""LICENSE
2Copyright 2020 Hermann Krumrey <hermann@krumreyh.com>
4This file is part of otaku-info-web.
6otaku-info-web is free software: you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation, either version 3 of the License, or
9(at your option) any later version.
11otaku-info-web is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14GNU General Public License for more details.
16You should have received a copy of the GNU General Public License
17along with otaku-info-web. If not, see <http://www.gnu.org/licenses/>.
18LICENSE"""
20import time
21from typing import List, Dict, Optional, Tuple
22from puffotter.flask.base import db, app
23from puffotter.flask.db.User import User
24from otaku_info_web.db.MediaId import MediaId
25from otaku_info_web.db.MediaItem import MediaItem
26from otaku_info_web.db.MediaList import MediaList
27from otaku_info_web.db.MediaListItem import MediaListItem
28from otaku_info_web.db.MediaUserState import MediaUserState
29from otaku_info_web.db.ServiceUsername import ServiceUsername
30from otaku_info_web.utils.anilist.AnilistItem import AnilistUserItem
31from otaku_info_web.utils.anilist.api import load_anilist
32from otaku_info_web.utils.enums import ListService, MediaType, MediaSubType
35def fetch_anilist_data():
36 """
37 Retrieves all entries on the anilists of all users that provided
38 an anilist username
39 :return: None
40 """
41 start = time.time()
42 app.logger.debug("Starting Anilist Update")
43 usernames: List[ServiceUsername] = \
44 ServiceUsername.query.filter_by(service=ListService.ANILIST).all()
45 anilist_data = {
46 user: {
47 media_type: load_anilist(user.username, media_type)
48 for media_type in MediaType
49 }
50 for user in usernames
51 }
52 media_items, media_ids, media_user_states, media_lists, media_list_items\
53 = load_existing()
55 app.logger.debug("Updating Media Entries")
56 update_media_entries(
57 anilist_data, media_items, media_ids
58 )
59 app.logger.debug("Updating Media User States")
60 update_media_user_entries(
61 anilist_data, media_items, media_ids, media_user_states
62 )
63 app.logger.debug("Updating Media Lists")
64 update_media_lists(
65 anilist_data,
66 media_items,
67 media_ids,
68 media_user_states,
69 media_lists,
70 media_list_items
71 )
72 app.logger.info(f"Completed anilist update in {time.time() - start}")
75def update_media_entries(
76 anilist_data: Dict[
77 ServiceUsername,
78 Dict[MediaType, List[AnilistUserItem]]
79 ],
80 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
81 media_ids: Dict[Tuple[ListService, int], MediaId]
82):
83 """
84 Updates the media entries and anilist IDs
85 :param anilist_data: The anilist data to store
86 :param media_items: The preloaded media items
87 :param media_ids: The preloaded media IDs
88 :return: None
89 """
90 updated_ids: List[Tuple[ListService, int]] = []
91 updated_items: List[Tuple[str, MediaType, MediaSubType, str]] = []
93 for media_type in MediaType:
95 anilist_entries: List[AnilistUserItem] = []
96 for data in anilist_data.values():
97 anilist_entries += data[media_type]
99 for anilist_entry in anilist_entries:
100 item_tuple, media_item = fetch_media_item(
101 anilist_entry, media_items
102 )
103 if item_tuple not in updated_items:
104 media_item = update_media_item(anilist_entry, media_item)
105 media_items[item_tuple] = media_item
106 updated_items.append(item_tuple)
108 id_tuple, media_id = fetch_media_id(
109 anilist_entry, media_items, media_ids, media_item
110 )
111 assert id_tuple is not None
113 if id_tuple not in updated_ids:
114 media_item = media_items[item_tuple]
115 media_id = update_media_id(anilist_entry, media_item, media_id)
116 media_ids[id_tuple] = media_id
117 updated_ids.append(id_tuple)
119 db.session.commit()
120 return media_ids
123def update_media_user_entries(
124 anilist_data: Dict[
125 ServiceUsername,
126 Dict[MediaType, List[AnilistUserItem]]
127 ],
128 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
129 media_ids: Dict[Tuple[ListService, int], MediaId],
130 media_user_states: Dict[Tuple[int, int], MediaUserState]
131):
132 """
133 Updates the individual users' current state for media items in
134 thei ranilist account.
135 :param anilist_data: The anilist data to enter into the database
136 :param media_items: Preloaded media items
137 :param media_ids: Preloaded media IDs
138 :param media_user_states: Preloaded media user states
139 :return: None
140 """
141 updated: List[Tuple[int, int]] = []
143 for service_user, anilist in anilist_data.items():
144 user_states = {
145 x: y for x, y in media_user_states.items()
146 if y.user_id == service_user.user_id
147 }
149 for media_type, anilist_entries in anilist.items():
150 for entry in anilist_entries:
151 id_tuple, media_id = \
152 fetch_media_id(entry, media_items, media_ids)
153 assert media_id is not None
155 user_state_id = (media_id.id, service_user.user_id)
157 if user_state_id in updated:
158 continue
160 media_user_state = media_user_states.get(user_state_id)
162 media_user_state = update_media_user_state(
163 entry, media_id, service_user.user, media_user_state
164 )
166 updated.append(user_state_id)
167 media_user_states[user_state_id] = media_user_state
169 for user_state_tuple, user_state in user_states.items():
170 if user_state_tuple not in updated:
171 db.session.delete(user_state)
172 media_user_states.pop(user_state_tuple)
174 db.session.commit()
177def update_media_lists(
178 anilist_data: Dict[
179 ServiceUsername,
180 Dict[MediaType, List[AnilistUserItem]]
181 ],
182 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
183 media_ids: Dict[Tuple[ListService, int], MediaId],
184 media_user_states: Dict[Tuple[int, int], MediaUserState],
185 media_lists: Dict[Tuple[str, int, ListService, MediaType], MediaList],
186 media_list_items: Dict[Tuple[int, int], MediaListItem]
187):
188 """
189 Updates the database for anilist user lists.
190 This includes custom anilist lists.
191 :param anilist_data: The anilist data to enter into the database
192 :param media_items: Preloaded media items
193 :param media_ids: Preloaded media IDs
194 :param media_user_states: The current media user states in the database
195 :param media_lists: The media lists currently in the database
196 :param media_list_items: The media list items currently in the database
197 :return: None
198 """
199 list_tuples_to_remove = list(media_lists.keys())
200 list_item_tuples_to_remove = list(media_list_items.keys())
202 for service_user, anilist in anilist_data.items():
203 for media_type, entries in anilist.items():
204 for entry in entries:
206 list_tuple = (
207 entry.list_name,
208 service_user.user_id,
209 ListService.ANILIST,
210 media_type
211 )
212 if list_tuple in list_tuples_to_remove:
213 list_tuples_to_remove.remove(list_tuple)
214 media_list = media_lists.get(list_tuple)
216 if media_list is None:
217 media_list = MediaList(
218 user_id=service_user.user_id,
219 name=entry.list_name,
220 service=ListService.ANILIST,
221 media_type=media_type
222 )
223 db.session.add(media_list)
224 db.session.commit()
225 media_lists[list_tuple] = media_list
227 _, media_id = fetch_media_id(entry, media_items, media_ids)
228 assert media_id is not None
230 state_tuple = (media_id.id, service_user.user_id)
231 media_user_state = media_user_states[state_tuple]
233 list_item_tuple = (media_list.id, media_user_state.id)
234 if list_item_tuple in list_item_tuples_to_remove:
235 list_item_tuples_to_remove.remove(list_item_tuple)
237 if list_item_tuple not in media_list_items:
238 list_item = MediaListItem(
239 media_list=media_list,
240 media_user_state=media_user_state
241 )
242 db.session.add(list_item)
244 db.session.commit()
246 for list_tuple in list_tuples_to_remove:
247 if list_tuple in media_lists:
248 db.session.delete(media_lists.pop(list_tuple))
249 for list_item_tuple in list_item_tuples_to_remove:
250 if list_item_tuple in media_list_items:
251 db.session.delete(media_list_items.pop(list_item_tuple))
253 db.session.commit()
256def load_existing() -> Tuple[
257 Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
258 Dict[Tuple[ListService, int], MediaId],
259 Dict[Tuple[int, int], MediaUserState],
260 Dict[Tuple[str, int, ListService, MediaType], MediaList],
261 Dict[Tuple[int, int], MediaListItem]
262]:
263 """
264 Loads current database contents, mapped to unique identifer tuples
265 :return: The database contents
266 """
267 app.logger.debug("Loading Existing data for anilist update")
268 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem] = {
269 (x.romaji_title, x.media_type, x.media_subtype, x.cover_url): x
270 for x in MediaItem.query.all()
271 }
272 app.logger.debug("Finished loading MediaItems")
273 media_ids: Dict[Tuple[ListService, int], MediaId] = {
274 (x.service, x.media_item_id): x
275 for x in MediaId.query.all()
276 }
277 app.logger.debug("Finished loading MediaIds")
278 media_user_states: Dict[Tuple[int, int], MediaUserState] = {
279 (x.media_id_id, x.user_id): x
280 for x in MediaUserState.query.all()
281 }
282 app.logger.debug("Finished loading MediaUserStates")
283 media_lists: Dict[Tuple[str, int, ListService, MediaType], MediaList] = {
284 (x.name, x.user_id, x.service, x.media_type): x
285 for x in MediaList.query.all()
286 }
287 app.logger.debug("Finished loading MediaLists")
288 media_list_items: Dict[Tuple[int, int], MediaListItem] = {
289 (x.media_list_id, x.media_user_state_id): x
290 for x in MediaListItem.query.all()
291 }
292 app.logger.debug("Finished loading MediaListItems")
293 return media_items, media_ids, media_user_states, \
294 media_lists, media_list_items
297def fetch_media_item(
298 anilist_entry: AnilistUserItem,
299 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
300) -> Tuple[
301 Tuple[str, MediaType, MediaSubType, str],
302 Optional[MediaItem]
303]:
304 """
305 Retrieves an existing media item based on anilist data
306 :param anilist_entry: The anilist entry to use
307 :param media_items: The preloaded media items
308 :return: The media item, or None if none exists
309 """
310 item_tuple = (
311 anilist_entry.romaji_title,
312 anilist_entry.media_type,
313 anilist_entry.media_subtype,
314 anilist_entry.cover_url
315 )
316 return item_tuple, media_items.get(item_tuple)
319def fetch_media_id(
320 anilist_entry: AnilistUserItem,
321 media_items: Dict[Tuple[str, MediaType, MediaSubType, str], MediaItem],
322 media_ids: Dict[Tuple[ListService, int], MediaId],
323 media_item: Optional[MediaId] = None
324) -> Tuple[Optional[Tuple[ListService, int]], Optional[MediaItem]]:
325 """
326 Retrieves an existing media ID based on anilist data
327 :param anilist_entry: The anilist entry to use
328 :param media_items: The preloaded media items
329 :param media_ids: The preloaded media IDs
330 :param media_item: Optional media item associated with the ID.
331 If not provided, will figure out using anilist data
332 :return: The media ID, or None if none exists
333 """
334 if media_item is None:
335 _, media_item = fetch_media_item(anilist_entry, media_items)
336 if media_item is None:
337 return None, None
338 else:
339 id_tuple = (
340 ListService.ANILIST,
341 media_item.id
342 )
343 return id_tuple, media_ids.get(id_tuple)
346def update_media_item(
347 new_data: AnilistUserItem,
348 existing: Optional[MediaItem]
349) -> MediaItem:
350 """
351 Updates or creates MediaItem database entries based on anilist data
352 :param new_data: The new anilist data
353 :param existing: The existing database entry. If None, will be created
354 :return: The updated/created MediaItem object
355 """
356 media_item = MediaItem() if existing is None else existing
357 media_item.media_type = new_data.media_type
358 media_item.media_subtype = new_data.media_subtype
359 media_item.english_title = new_data.english_title
360 media_item.romaji_title = new_data.romaji_title
361 media_item.cover_url = new_data.cover_url
362 media_item.latest_release = new_data.latest_release
363 media_item.releasing_state = new_data.releasing_state
365 if existing is None:
366 db.session.add(media_item)
367 db.session.commit()
368 return media_item
371def update_media_id(
372 new_data: AnilistUserItem,
373 media_item: MediaItem,
374 existing: Optional[MediaId]
375) -> MediaId:
376 """
377 Updates/Creates a MediaId database entry based on anilist data
378 :param new_data: The anilist data to use
379 :param media_item: The media item associated with the ID
380 :param existing: The existing database entry. If None, will be created
381 :return: The updated/created MediaId object
382 """
383 media_id = MediaId() if existing is None else existing
384 media_id.media_item = media_item
385 media_id.service = ListService.ANILIST
386 media_id.service_id = str(new_data.anilist_id)
388 if existing is None:
389 db.session.add(media_id)
390 db.session.commit()
391 return media_id
394def update_media_user_state(
395 new_data: AnilistUserItem,
396 media_id: MediaId,
397 user: User,
398 existing: Optional[MediaUserState]
399) -> MediaUserState:
400 """
401 Updates or creates a MediaUserState entry in the database
402 :param new_data: The new anilist data
403 :param media_id: The media ID of the anilist media item
404 :param user: The user associated with the data
405 :param existing: The existing database entry. If None, will be created
406 :return: The updated/created MediaUserState object
407 """
408 media_user_state = MediaUserState() if existing is None else existing
409 media_user_state.media_id = media_id
410 media_user_state.consuming_state = new_data.consuming_state
411 media_user_state.score = new_data.score
412 media_user_state.progress = new_data.progress
413 media_user_state.user = user
415 if existing is None:
416 db.session.add(media_user_state)
417 db.session.commit()
418 return media_user_state