Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

"""LICENSE 

Copyright 2017 Hermann Krumrey <hermann@krumreyh.com> 

 

This file is part of malscraper. 

 

malscraper is free software: you can redistribute it and/or modify 

it under the terms of the GNU General Public License as published by 

the Free Software Foundation, either version 3 of the License, or 

(at your option) any later version. 

 

malscraper is distributed in the hope that it will be useful, 

but WITHOUT ANY WARRANTY; without even the implied warranty of 

MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

GNU General Public License for more details. 

 

You should have received a copy of the GNU General Public License 

along with malscraper. If not, see <http://www.gnu.org/licenses/>. 

LICENSE""" 

 

import os 

import sys 

import time 

import requests 

from bs4 import BeautifulSoup 

from malscraper.types.MediaType import MediaType 

 

 

class Cache: 

""" 

Class that handles fetching and caching of myanimelist data 

""" 

 

initialized = False 

""" 

Global initialized variable set to true the first time the 

constructor is called 

""" 

 

in_memory = { 

MediaType.ANIME.value: {}, 

MediaType.MANGA.value: {}, 

"users": {} 

} 

""" 

In-Memory cache 

""" 

 

flush_time = 86400 # Keep data for one day 

""" 

Specifies the flush time for the cached data 

""" 

 

def __init__(self, preload: bool = False): 

""" 

Initializes the cache directories 

:param preload: Preloads the current cache into memory 

""" 

 

self.cache_dir = os.path.join(os.path.expanduser("~"), ".malscraper") 

self.anime_cache_dir = os.path.join(self.cache_dir, "anime") 

self.manga_cache_dir = os.path.join(self.cache_dir, "manga") 

self.user_cache_dir = os.path.join(self.cache_dir, "users") 

 

for directory in [self.anime_cache_dir, self.manga_cache_dir, 

self.user_cache_dir]: 

if not os.path.isdir(directory): 

os.makedirs(directory) 

 

if preload and not Cache.initialized: 

 

for element in os.listdir(directory): # Load cached files 

 

if directory == self.anime_cache_dir: 

media_type = MediaType.ANIME 

elif directory == self.manga_cache_dir: 

media_type = MediaType.MANGA 

else: 

media_type = None 

 

if media_type is not None: 

element_id = int(element) 

self.load_mal_page(element_id, media_type) 

 

else: 

self.load_user_xml(element) 

 

Cache.initialized = True 

 

def load_mal_page(self, mal_id: int, media_type: MediaType): 

""" 

Loads a myanimelist page 

:param mal_id: The ID of the anime/manga 

:param media_type: The type of media to load 

:return: The HTML data 

""" 

 

if mal_id in Cache.in_memory[media_type.value]: 

return Cache.in_memory[media_type.value][mal_id] 

 

else: 

if media_type == MediaType.ANIME: 

cache_dir = self.anime_cache_dir 

elif media_type == MediaType.MANGA: 

cache_dir = self.manga_cache_dir 

else: 

print("Invalid Media Type") 

sys.exit(1) 

 

cache_file = os.path.join(cache_dir, str(mal_id)) 

 

if self._needs_refresh(cache_file): 

url = "https://myanimelist.net/" + media_type.value + "/" 

url += str(mal_id) 

data = self._get_url_data(url) 

with open(cache_file, "w") as f: 

f.write(data) 

else: 

with open(cache_file, "r") as f: 

data = f.read() 

 

generated = BeautifulSoup(data, "html.parser") 

Cache.in_memory[media_type.value][mal_id] = generated 

return generated 

 

def load_user_xml(self, username: str) -> BeautifulSoup: 

""" 

Loads a user's XML data 

:param username: The username to fetch the data for 

:return: The XML user data 

""" 

 

if username in Cache.in_memory["users"]: 

return Cache.in_memory["users"][username] 

 

else: 

 

user_cache_file = os.path.join(self.user_cache_dir, username) 

if self._needs_refresh(user_cache_file): 

url = "https://myanimelist.net/malappinfo.php?" \ 

"type=anime&status=all&u=" + username 

data = self._get_url_data(url) 

with open(user_cache_file, "w") as f: 

f.write(data) 

 

else: 

with open(user_cache_file, "r") as f: 

data = f.read() 

 

generated = BeautifulSoup(data, features="xml") 

Cache.in_memory["users"][username] = generated 

return generated 

 

@staticmethod 

def _needs_refresh(file_path: str) -> float: 

""" 

Checks if a cache file needs to be updated 

:param file_path: The path to the file to check 

:return: The timestamp at which the file was last modified 

""" 

if os.path.isfile(file_path): 

age = os.stat(file_path).st_mtime 

return time.time() - age > Cache.flush_time 

else: 

return True 

 

@staticmethod 

def _get_url_data(url: str) -> str: 

""" 

Retrieves the data from the URL while circumventing rate limiting 

:param url: The URL from which to fetch data 

:return: The retrieved HTML text 

""" 

sleeper = 1 

response = requests.get(url) 

 

while response.status_code != 200: # Circumvent rate limiting 

 

print(response.status_code) 

 

time.sleep(sleeper) 

sleeper += 1 

response = requests.get(url) 

 

if sleeper > 30: 

print("Timeout: " + url) 

sys.exit(1) 

 

return response.text