Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2018 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of bokkichat. 

5 

6bokkichat is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11bokkichat is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with bokkichat. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import time 

21import socket 

22# noinspection PyPackageRequirements 

23import telegram 

24import requests 

25from typing import List, Dict, Any, Optional, Type, Callable 

26from bokkichat.entities.Address import Address 

27from bokkichat.entities.message.Message import Message 

28from bokkichat.entities.message.TextMessage import TextMessage 

29from bokkichat.entities.message.MediaType import MediaType 

30from bokkichat.entities.message.MediaMessage import MediaMessage 

31from bokkichat.connection.Connection import Connection 

32from bokkichat.settings.impl.TelegramBotSettings import TelegramBotSettings 

33from bokkichat.exceptions import InvalidMessageData, InvalidSettings 

34 

35 

36class TelegramBotConnection(Connection): 

37 """ 

38 Class that implements a Telegram bot connection 

39 """ 

40 

41 def __init__(self, settings: TelegramBotSettings): 

42 """ 

43 Initializes the connection, with credentials provided by a 

44 Settings object. 

45 :param settings: The settings for the connection 

46 """ 

47 super().__init__(settings) 

48 try: 

49 self.bot = telegram.Bot(settings.api_key) 

50 except telegram.error.InvalidToken: 

51 raise InvalidSettings() 

52 

53 try: 

54 self.update_id = self.bot.get_updates()[0].update_id 

55 except IndexError: 

56 self.update_id = 0 

57 

58 @classmethod 

59 def name(cls) -> str: 

60 """ 

61 The name of the connection class 

62 :return: The connection class name 

63 """ 

64 return "telegram-bot" 

65 

66 @property 

67 def address(self) -> Address: 

68 """ 

69 A connection must be able to specify its own entities 

70 :return: The entities of the connection 

71 """ 

72 return Address(str(self.bot.name)) 

73 

74 @classmethod 

75 def settings_cls(cls) -> Type[TelegramBotSettings]: 

76 """ 

77 The settings class used by this connection 

78 :return: The settings class 

79 """ 

80 return TelegramBotSettings 

81 

82 def send(self, message: Message): 

83 """ 

84 Sends a message. A message may be either a TextMessage 

85 or a MediaMessage. 

86 :param message: The message to send 

87 :return: None 

88 """ 

89 

90 self.logger.info("Sending message to " + message.receiver.address) 

91 

92 try: 

93 if isinstance(message, TextMessage): 

94 for chunk in message.split(4096): 

95 self.bot.send_message( 

96 chat_id=message.receiver.address, 

97 text=self._escape_invalid_characters(chunk), 

98 parse_mode=telegram.ParseMode.MARKDOWN 

99 ) 

100 elif isinstance(message, MediaMessage): 

101 media_map = { 

102 MediaType.AUDIO: ("audio", self.bot.send_audio), 

103 MediaType.VIDEO: ("video", self.bot.send_video), 

104 MediaType.IMAGE: ("photo", self.bot.send_photo) 

105 } 

106 

107 send_func = media_map[message.media_type][1] 

108 

109 # Write to file TODO: Check if this can be done with bytes 

110 with open("/tmp/bokkichat-telegram-temp", "wb") as f: 

111 f.write(message.data) 

112 

113 tempfile = open("/tmp/bokkichat-telegram-temp", "rb") 

114 params = { 

115 "chat_id": message.receiver.address, 

116 media_map[message.media_type][0]: tempfile, 

117 "parse_mode": telegram.ParseMode.MARKDOWN, 

118 "timeout": 30, 

119 "caption": "" 

120 } 

121 if message.caption is not None: 

122 params["caption"] = self._escape_invalid_characters( 

123 message.caption 

124 ) 

125 

126 if media_map[message.media_type][0] == "video": 

127 params["timeout"] = 60 # Increase timeout for videos 

128 

129 try: 

130 send_func(**params) 

131 except (socket.timeout, telegram.error.NetworkError): 

132 self.logger.error("Media Sending timed out") 

133 tempfile.close() 

134 

135 except (telegram.error.Unauthorized, telegram.error.BadRequest): 

136 self.logger.warning( 

137 "Failed to send message to {}".format(message.receiver) 

138 ) 

139 

140 def receive(self) -> List[Message]: 

141 """ 

142 Receives all pending messages. 

143 :return: A list of pending Message objects 

144 """ 

145 messages = [] 

146 

147 try: 

148 for update in self.bot.get_updates( 

149 offset=self.update_id, timeout=10 

150 ): 

151 self.update_id = update.update_id + 1 

152 

153 if update.message is None: 

154 continue 

155 

156 telegram_message = update.message.to_dict() 

157 

158 try: 

159 generated = self._parse_message(telegram_message) 

160 if generated is None: 

161 continue 

162 self.logger.info( 

163 "Received message from {}".format(generated.sender) 

164 ) 

165 self.logger.debug(str(generated)) 

166 messages.append(generated) 

167 except InvalidMessageData as e: 

168 self.logger.error(str(e)) 

169 

170 except telegram.error.Unauthorized: 

171 # The self.bot.get_update method may cause an 

172 # Unauthorized Error if the bot is blocked by the user 

173 self.update_id += 1 

174 

175 except telegram.error.TimedOut: 

176 pass 

177 

178 return messages 

179 

180 def _parse_message(self, message_data: Dict[str, Any]) -> \ 

181 Optional[Message]: 

182 """ 

183 Parses the message data of a Telegram message and generates a 

184 corresponding Message object. 

185 :param message_data: The telegram message data 

186 :return: The generated Message object. 

187 :raises: InvalidMessageData if the parsing failed 

188 """ 

189 address = Address(str(message_data["chat"]["id"])) 

190 

191 if "text" in message_data: 

192 body = message_data["text"] 

193 self.logger.debug("Message Body: {}".format(body)) 

194 return TextMessage(address, self.address, body) 

195 

196 else: 

197 

198 for media_key, media_type in { 

199 "photo": MediaType.IMAGE, 

200 "audio": MediaType.AUDIO, 

201 "video": MediaType.VIDEO, 

202 "voice": MediaType.AUDIO 

203 }.items(): 

204 

205 if media_key in message_data: 

206 

207 self.logger.debug("Media Type: {}".format(media_key)) 

208 media_info = message_data[media_key] 

209 

210 if len(media_info) == 0: 

211 continue 

212 

213 if isinstance(media_info, list): 

214 largest = media_info[len(media_info) - 1] 

215 file_id = largest["file_id"] 

216 elif isinstance(media_info, dict): 

217 file_id = media_info["file_id"] 

218 else: 

219 continue 

220 

221 file_info = self.bot.get_file(file_id) 

222 resp = requests.get(file_info["file_path"]) 

223 data = resp.content 

224 

225 return MediaMessage( 

226 address, 

227 self.address, 

228 media_type, 

229 data, 

230 message_data.get("caption", "") 

231 ) 

232 

233 raise InvalidMessageData(message_data) 

234 

235 def close(self): 

236 """ 

237 Disconnects the Connection. 

238 :return: None 

239 """ 

240 pass 

241 

242 def loop(self, callback: Callable, sleep_time: int = 1): 

243 """ 

244 Starts a loop that periodically checks for new messages, calling 

245 a provided callback function in the process. 

246 :param callback: The callback function to call for each 

247 received message. 

248 The callback should have the following format: 

249 lambda connection, message: do_stuff() 

250 :param sleep_time: The time to sleep between loops 

251 :return: None 

252 """ 

253 try: 

254 super().loop(callback, sleep_time) 

255 except telegram.error.NetworkError: 

256 self.logger.error("Encountered Network Error. Trying to reconnect") 

257 time.sleep(10) 

258 self.loop(callback, sleep_time) 

259 

260 @staticmethod 

261 def _escape_invalid_characters(text: str) -> str: 

262 """ 

263 Escapes invalid characters for telegram markdown in a text. 

264 If this is not done, it may cause sent text to fail. 

265 :param text: The text to escape 

266 :return: The text with the escaped characters 

267 """ 

268 for char in ["_", "*"]: 

269 text = text.replace("\\" + char, "@@@PLACEHOLDER@@@") 

270 text = text.replace(char, "\\" + char) 

271 text = text.replace("@@@PLACEHOLDER@@@", "\\" + char) 

272 

273 return text