Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2019 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of xkcd-bot. 

5 

6xkcd-bot is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11xkcd-bot is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with xkcd-bot. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import json 

21import requests 

22from typing import List, Dict, Any, Optional 

23from bokkichat.entities.message.MediaType import MediaType 

24from bokkichat.entities.message.MediaMessage import MediaMessage 

25from kudubot.Bot import Bot 

26from kudubot.db.Address import Address 

27from kudubot.parsing.CommandParser import CommandParser 

28from sqlalchemy import desc 

29from sqlalchemy.orm import Session 

30from xkcd_bot import version 

31from xkcd_bot.XkcdCommandParser import XkcdCommandParser 

32from xkcd_bot.db.Comic import Comic 

33from xkcd_bot.db.Subscription import Subscription 

34 

35 

36class XkcdBot(Bot): 

37 """ 

38 The Xkcd Bot class that defines the bot's functionality 

39 """ 

40 

41 @classmethod 

42 def name(cls) -> str: 

43 """ 

44 :return: The name of the bot 

45 """ 

46 return "xkcd-bot" 

47 

48 @classmethod 

49 def version(cls) -> str: 

50 """ 

51 :return: The current version of the bot 

52 """ 

53 return version 

54 

55 @classmethod 

56 def parsers(cls) -> List[CommandParser]: 

57 """ 

58 :return: A list of parser the bot supports for commands 

59 """ 

60 return [XkcdCommandParser()] 

61 

62 def on_subscribe( 

63 self, 

64 address: Address, 

65 _: Dict[str, Any], 

66 db_session: Session 

67 ): 

68 """ 

69 Creates a new subscription for the user 

70 :param address: The user's address 

71 :param _: The arguments for the command 

72 :param db_session: The database session to use 

73 :return: None 

74 """ 

75 existing = db_session.query(Subscription)\ 

76 .filter_by(address=address).first() 

77 if existing is not None: 

78 self.send_txt(address, "You are already subscribed") 

79 else: 

80 subscription = Subscription( 

81 address=address, 

82 last_comic_id=self.__get_latest_comic_id(db_session) 

83 ) 

84 db_session.add(subscription) 

85 db_session.commit() 

86 self.logger.info("{} subscribed".format(address.address)) 

87 self.send_txt(address, "Subscription successful") 

88 

89 def on_unsubscribe( 

90 self, 

91 address: Address, 

92 _: Dict[str, Any], 

93 db_session: Session 

94 ): 

95 """ 

96 Unsubscribes a user 

97 :param address: The user's address 

98 :param _: The arguments for the command 

99 :param db_session: The database session to use 

100 :return: None 

101 """ 

102 subscription = db_session.query(Subscription) \ 

103 .filter_by(address=address).first() 

104 if subscription is not None: 

105 db_session.delete(subscription) 

106 db_session.commit() 

107 self.logger.info("{} unsubscribed".format(address.address)) 

108 self.send_txt(address, "Successfully unregistered") 

109 

110 def on_new( 

111 self, 

112 address: Address, 

113 _: Dict[str, Any], 

114 db_session: Session 

115 ): 

116 """ 

117 Sends the newest XKCD comic 

118 :param address: The user's address 

119 :param _: The arguments for the command 

120 :param db_session: The database session to use 

121 :return: None 

122 """ 

123 latest_id = self.__get_latest_comic_id(db_session) 

124 self.on_show(address, {"id": latest_id}, db_session) 

125 

126 def on_show( 

127 self, 

128 address: Address, 

129 args: Dict[str, Any], 

130 db_session: Session 

131 ): 

132 """ 

133 Sends a specific XKCD comic 

134 :param address: The user's address 

135 :param args: The arguments for the command 

136 :param db_session: The database session to use 

137 :return: None 

138 """ 

139 comic_id = args["id"] 

140 comic = self.__get_comic(comic_id, db_session) 

141 

142 if comic is not None: 

143 self.__send_comic(comic, address) 

144 else: 

145 self.send_txt(address, "This comic does not exist") 

146 

147 def __get_comic(self, comic_id: int, db_session: Session) \ 

148 -> Optional[Comic]: 

149 """ 

150 Retrieves a specific XKCD comic from the database or the xkcd website 

151 :param comic_id: The comic's ID 

152 :param db_session: The database session to use 

153 :return: The comic, or None if the comic does not exist 

154 """ 

155 comic = db_session.query(Comic).filter_by(id=comic_id).first() 

156 if comic is None: 

157 self.logger.debug("Comic {} not in database, fetching" 

158 .format(comic_id)) 

159 url = "https://xkcd.com/{}/info.0.json".format(comic_id) 

160 resp = requests.get(url) 

161 if resp.status_code >= 300: 

162 comic = None 

163 self.logger.debug("Comic {} does not exist".format(comic_id)) 

164 else: 

165 data = json.loads(resp.text) 

166 comic = Comic( 

167 id=data["num"], 

168 title=data["title"], 

169 alt_text=data["alt"], 

170 image_url=data["img"], 

171 image_data=requests.get(data["img"]).content 

172 ) 

173 self.logger.debug("Successfully downloaded comic {}" 

174 .format(comic_id)) 

175 db_session.add(comic) 

176 db_session.commit() 

177 return comic 

178 

179 def __get_latest_comic_id( 

180 self, 

181 db_session: Session, 

182 refresh: bool = False 

183 ) -> int: 

184 """ 

185 Retrieves the latest comic ID 

186 :param db_session: The database session to use 

187 :param refresh: Whether or not to retrieve the most up-to-date info 

188 :return: The ID of the newest comic 

189 """ 

190 if not refresh: 

191 self.logger.debug("Loading newest comic ID") 

192 latest = db_session.query(Comic).order_by(desc(Comic.id)).first() 

193 if latest is None: 

194 return self.__get_latest_comic_id(db_session, True) 

195 else: 

196 return latest.id 

197 else: 

198 resp = requests.get("https://xkcd.com/info.0.json").text 

199 data = json.loads(resp) 

200 return data["num"] 

201 

202 def __send_comic(self, comic: Comic, address: Address): 

203 """ 

204 Sends a comic to a user 

205 :param comic: The comic to send 

206 :param address: The address of the user 

207 :return: None 

208 """ 

209 self.logger.info("Sending comic {} to {}" 

210 .format(comic.id, address.address)) 

211 self.send_txt(address, comic.title) 

212 message = MediaMessage( 

213 self.connection.address, 

214 address, MediaType.IMAGE, 

215 comic.image_data, 

216 comic.alt_text 

217 ) 

218 self.connection.send(message) 

219 

220 def bg_iteration(self, _: int, db_session: Session): 

221 """ 

222 Periodically checks for new notifications and sends them out 

223 :param _: The iteration count 

224 :param db_session: The database session to use 

225 :return: None 

226 """ 

227 self.logger.info("Looking for due subscriptions") 

228 

229 latest_id = self.__get_latest_comic_id(db_session, True) 

230 comic = self.__get_comic(latest_id, db_session) 

231 if comic is None: 

232 self.logger.error("Latest comic did not load") 

233 return 

234 

235 for subscription in db_session.query(Subscription).all(): 

236 if subscription.last_comic_id < latest_id: 

237 self.logger.info("Subscription for {} is due" 

238 .format(subscription.address.address)) 

239 self.__send_comic(comic, subscription.address) 

240 subscription.last_comic = comic 

241 

242 db_session.commit()