Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""LICENSE
2Copyright 2018 Hermann Krumrey <hermann@krumreyh.com>
4This file is part of bokkichat.
6bokkichat is free software: you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation, either version 3 of the License, or
9(at your option) any later version.
11bokkichat is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14GNU General Public License for more details.
16You should have received a copy of the GNU General Public License
17along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
18LICENSE"""
20import time
21import socket
22# noinspection PyPackageRequirements
23import telegram
24import requests
25from typing import List, Dict, Any, Optional, Type, Callable
26from bokkichat.entities.Address import Address
27from bokkichat.entities.message.Message import Message
28from bokkichat.entities.message.TextMessage import TextMessage
29from bokkichat.entities.message.MediaType import MediaType
30from bokkichat.entities.message.MediaMessage import MediaMessage
31from bokkichat.connection.Connection import Connection
32from bokkichat.settings.impl.TelegramBotSettings import TelegramBotSettings
33from bokkichat.exceptions import InvalidMessageData, InvalidSettings
36class TelegramBotConnection(Connection):
37 """
38 Class that implements a Telegram bot connection
39 """
41 def __init__(self, settings: TelegramBotSettings):
42 """
43 Initializes the connection, with credentials provided by a
44 Settings object.
45 :param settings: The settings for the connection
46 """
47 super().__init__(settings)
48 try:
49 self.bot = telegram.Bot(settings.api_key)
50 except telegram.error.InvalidToken:
51 raise InvalidSettings()
53 try:
54 self.update_id = self.bot.get_updates()[0].update_id
55 except IndexError:
56 self.update_id = 0
58 @classmethod
59 def name(cls) -> str:
60 """
61 The name of the connection class
62 :return: The connection class name
63 """
64 return "telegram-bot"
66 @property
67 def address(self) -> Address:
68 """
69 A connection must be able to specify its own entities
70 :return: The entities of the connection
71 """
72 return Address(str(self.bot.name))
74 @classmethod
75 def settings_cls(cls) -> Type[TelegramBotSettings]:
76 """
77 The settings class used by this connection
78 :return: The settings class
79 """
80 return TelegramBotSettings
82 def send(self, message: Message):
83 """
84 Sends a message. A message may be either a TextMessage
85 or a MediaMessage.
86 :param message: The message to send
87 :return: None
88 """
90 self.logger.info("Sending message to " + message.receiver.address)
92 try:
93 if isinstance(message, TextMessage):
94 for chunk in message.split(4096):
95 self.bot.send_message(
96 chat_id=message.receiver.address,
97 text=self._escape_invalid_characters(chunk),
98 parse_mode=telegram.ParseMode.MARKDOWN
99 )
100 elif isinstance(message, MediaMessage):
101 media_map = {
102 MediaType.AUDIO: ("audio", self.bot.send_audio),
103 MediaType.VIDEO: ("video", self.bot.send_video),
104 MediaType.IMAGE: ("photo", self.bot.send_photo)
105 }
107 send_func = media_map[message.media_type][1]
109 # Write to file TODO: Check if this can be done with bytes
110 with open("/tmp/bokkichat-telegram-temp", "wb") as f:
111 f.write(message.data)
113 tempfile = open("/tmp/bokkichat-telegram-temp", "rb")
114 params = {
115 "chat_id": message.receiver.address,
116 media_map[message.media_type][0]: tempfile,
117 "parse_mode": telegram.ParseMode.MARKDOWN,
118 "timeout": 30,
119 "caption": ""
120 }
121 if message.caption is not None:
122 params["caption"] = self._escape_invalid_characters(
123 message.caption
124 )
126 if media_map[message.media_type][0] == "video":
127 params["timeout"] = 60 # Increase timeout for videos
129 try:
130 send_func(**params)
131 except (socket.timeout, telegram.error.NetworkError):
132 self.logger.error("Media Sending timed out")
133 tempfile.close()
135 except (telegram.error.Unauthorized, telegram.error.BadRequest):
136 self.logger.warning(
137 "Failed to send message to {}".format(message.receiver)
138 )
140 def receive(self) -> List[Message]:
141 """
142 Receives all pending messages.
143 :return: A list of pending Message objects
144 """
145 messages = []
147 try:
148 for update in self.bot.get_updates(
149 offset=self.update_id, timeout=10
150 ):
151 self.update_id = update.update_id + 1
153 if update.message is None:
154 continue
156 telegram_message = update.message.to_dict()
158 try:
159 generated = self._parse_message(telegram_message)
160 if generated is None:
161 continue
162 self.logger.info(
163 "Received message from {}".format(generated.sender)
164 )
165 self.logger.debug(str(generated))
166 messages.append(generated)
167 except InvalidMessageData as e:
168 self.logger.error(str(e))
170 except telegram.error.Unauthorized:
171 # The self.bot.get_update method may cause an
172 # Unauthorized Error if the bot is blocked by the user
173 self.update_id += 1
175 except telegram.error.TimedOut:
176 pass
178 return messages
180 def _parse_message(self, message_data: Dict[str, Any]) -> \
181 Optional[Message]:
182 """
183 Parses the message data of a Telegram message and generates a
184 corresponding Message object.
185 :param message_data: The telegram message data
186 :return: The generated Message object.
187 :raises: InvalidMessageData if the parsing failed
188 """
189 address = Address(str(message_data["chat"]["id"]))
191 if "text" in message_data:
192 body = message_data["text"]
193 self.logger.debug("Message Body: {}".format(body))
194 return TextMessage(address, self.address, body)
196 else:
198 for media_key, media_type in {
199 "photo": MediaType.IMAGE,
200 "audio": MediaType.AUDIO,
201 "video": MediaType.VIDEO,
202 "voice": MediaType.AUDIO
203 }.items():
205 if media_key in message_data:
207 self.logger.debug("Media Type: {}".format(media_key))
208 media_info = message_data[media_key]
210 if len(media_info) == 0:
211 continue
213 if isinstance(media_info, list):
214 largest = media_info[len(media_info) - 1]
215 file_id = largest["file_id"]
216 elif isinstance(media_info, dict):
217 file_id = media_info["file_id"]
218 else:
219 continue
221 file_info = self.bot.get_file(file_id)
222 resp = requests.get(file_info["file_path"])
223 data = resp.content
225 return MediaMessage(
226 address,
227 self.address,
228 media_type,
229 data,
230 message_data.get("caption", "")
231 )
233 raise InvalidMessageData(message_data)
235 def close(self):
236 """
237 Disconnects the Connection.
238 :return: None
239 """
240 pass
242 def loop(self, callback: Callable, sleep_time: int = 1):
243 """
244 Starts a loop that periodically checks for new messages, calling
245 a provided callback function in the process.
246 :param callback: The callback function to call for each
247 received message.
248 The callback should have the following format:
249 lambda connection, message: do_stuff()
250 :param sleep_time: The time to sleep between loops
251 :return: None
252 """
253 try:
254 super().loop(callback, sleep_time)
255 except telegram.error.NetworkError:
256 self.logger.error("Encountered Network Error. Trying to reconnect")
257 time.sleep(10)
258 self.loop(callback, sleep_time)
260 @staticmethod
261 def _escape_invalid_characters(text: str) -> str:
262 """
263 Escapes invalid characters for telegram markdown in a text.
264 If this is not done, it may cause sent text to fail.
265 :param text: The text to escape
266 :return: The text with the escaped characters
267 """
268 for char in ["_", "*"]:
269 text = text.replace("\\" + char, "@@@PLACEHOLDER@@@")
270 text = text.replace(char, "\\" + char)
271 text = text.replace("@@@PLACEHOLDER@@@", "\\" + char)
273 return text