Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""LICENSE
2Copyright 2016 Hermann Krumrey <hermann@krumreyh.com>
4This file is part of xdcc-dl.
6xdcc-dl is free software: you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation, either version 3 of the License, or
9(at your option) any later version.
11xdcc-dl is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14GNU General Public License for more details.
16You should have received a copy of the GNU General Public License
17along with xdcc-dl. If not, see <http://www.gnu.org/licenses/>.
18LICENSE"""
20import os
21import time
22import struct
23import shlex
24import socket
25import random
26import logging
27import irc.events
28import irc.client
29from colorama import Fore, Back
30from threading import Thread, Lock
31from subprocess import check_output, CalledProcessError
32from typing import Optional, IO, Any, List, Union
33from puffotter.units import human_readable_bytes, byte_string_to_byte_count
34from puffotter.print import pprint
35from puffotter.logging import ColorLogger
36from xdcc_dl.entities import User, XDCCPack
37from xdcc_dl.xdcc.exceptions import InvalidCTCPException, \
38 AlreadyDownloadedException, DownloadCompleted, DownloadIncomplete, \
39 PackAlreadyRequested, UnrecoverableError, Timeout, BotDoesNotExist
40from irc.client import SimpleIRCClient, ServerConnection, Event, \
41 ip_numstr_to_quad, DCCConnection
44class XDCCClient(SimpleIRCClient):
45 """
46 IRC Client that can download an XDCC pack
47 """
49 def __init__(
50 self,
51 pack: XDCCPack,
52 retry: bool = False,
53 timeout: int = 120,
54 fallback_channel: Optional[str] = None,
55 throttle: Union[int, str] = -1,
56 wait_time: int = 0,
57 username: str = "",
58 channel_join_delay: Optional[int] = None
59 ):
60 """
61 Initializes the XDCC IRC client
62 :param pack: The pack to downloadX
63 :param retry: Set to true for retried downloads.
64 :param timeout: Sets the timeout time for starting downloads
65 :param fallback_channel: A fallback channel for when whois
66 fails to find a valid channel
67 :param throttle: Throttles the download to n bytes per second.
68 If this value is <= 0, the download speed will be
69 unlimited
70 :param wait_time: Waits for the specified amount of time before sending
71 a message
72 :param username: If specified sets the username to log on with
73 :param channel_join_delay: If specifies sets the channel join delay
74 """
75 self.logger = ColorLogger(
76 logging.getLogger(self.__class__.__name__),
77 warning_bg=Back.RED,
78 warning_fg=Fore.BLACK
79 )
81 # Save us from decoding errors and excessive logging output!
82 irc.client.ServerConnection.buffer_class.errors = "replace"
83 irc.client.log.setLevel(logging.ERROR)
85 if isinstance(throttle, str):
86 self.download_limit = byte_string_to_byte_count(throttle)
87 else:
88 self.download_limit = throttle
89 if self.download_limit <= 0:
90 self.download_limit = -1
92 self.user = User(username)
93 self.pack = pack
94 self.server = pack.server
95 self.downloading = False
96 self.xdcc_timestamp = 0.0
97 self.channels = None # type: Optional[List[str]]
98 self.message_sent = False
99 self.connect_start_time = 0.0
100 self.timeout = timeout + wait_time
101 self.timed_out = False
102 self.fallback_channel = fallback_channel
103 self.wait_time = wait_time
104 self.connected = True
105 self.disconnected = False
107 if channel_join_delay is None:
108 self.channel_join_delay = random.randint(5, 10)
109 else:
110 self.channel_join_delay = channel_join_delay
112 # XDCC state variables
113 self.peer_address = ""
114 self.peer_port = -1
115 self.filesize = -1
116 self.progress = 0
117 self.xdcc_file = None # type: Optional[IO[Any]]
118 self.xdcc_connection = None # type: Optional[DCCConnection]
119 self.retry = retry
120 self.struct_format = b"!I"
121 self.ack_queue: List[bytes] = []
122 self.ack_thread = Thread(target=self.send_acks)
123 self.ack_lock = Lock()
125 if not self.retry:
126 if self.download_limit == -1:
127 limit = "\"unlimited\""
128 else:
129 limit = str(self.download_limit)
130 self.logger.info("Download Limit set to: " + limit)
132 self.timeout_watcher_thread = Thread(target=self.timeout_watcher)
133 self.progress_printer_thread = Thread(target=self.progress_printer)
135 super().__init__()
137 # Create a log command for all events that are printed in debug mode.
138 # If methods are overriden manually, these generated methods won't take
139 # effect.
140 for event in irc.events.all:
141 exec(
142 "def on_{}(self, c, e):\n"
143 " self.handle_generic_event(\"{}\", c, e)"
144 "".format(event, event)
145 )
147 def handle_generic_event(
148 self,
149 event_type: str,
150 _: ServerConnection,
151 event: Event
152 ):
153 """
154 Handles a generic event that isn't handled explicitly
155 :param event_type: The event type to handle
156 :param _: The connection to use
157 :param event: The received event
158 :return: None
159 """
160 self.logger.debug("{}:{} {}".format(
161 event_type,
162 event.source,
163 event.arguments
164 ))
166 def download(self) -> str:
167 """
168 Downloads the pack
169 :return: The path to the downloaded file
170 """
171 error = False
172 completed = False
173 pause = 0
175 message = ""
177 try:
178 self.logger.info(f"Connecting to "
179 f"{self.server.address}:{self.server.port} "
180 f"as user '{self.user.username}'")
181 self.connect(
182 self.server.address,
183 self.server.port,
184 self.user.username
185 )
187 self.logger.info(f"Delaying download initialization by "
188 f"{self.channel_join_delay}s")
189 time.sleep(self.channel_join_delay)
191 self.connected = True
193 self.connect_start_time = time.time()
195 self.timeout_watcher_thread.start()
196 self.progress_printer_thread.start()
198 self.start()
199 except AlreadyDownloadedException:
200 self.logger.warning("File already downloaded")
201 completed = True
202 except DownloadCompleted:
203 message = "File {} downloaded successfully"\
204 .format(self.pack.filename)
205 completed = True
206 except DownloadIncomplete:
207 message = "File {} not downloaded successfully" \
208 .format(self.pack.filename)
209 completed = False
210 except PackAlreadyRequested:
211 message = "Pack already requested."
212 completed = False
213 pause = 60
214 except UnrecoverableError:
215 error = True
216 finally:
217 self.connected = False
218 self.disconnected = True
219 self.logger.info("Joining threads")
220 self.timeout_watcher_thread.join()
221 self.progress_printer_thread.join()
222 self.ack_thread.join()
223 print("\n" + message)
225 self.logger.info("Disconnecting")
226 try:
227 self._disconnect()
228 except (DownloadCompleted, ):
229 pass
231 if error:
232 self.logger.info("Aborting because of unrecoverable error")
233 return "Failed"
235 self.logger.debug("Pausing for {}s".format(pause))
236 time.sleep(pause)
238 if not completed:
239 self.logger.warning("Download Incomplete. Retrying.")
240 retry_client = XDCCClient(self.pack, True, self.timeout)
241 retry_client.download_limit = self.download_limit
242 retry_client.download()
244 if not self.retry:
245 dl_time = str(int(abs(time.time() - self.connect_start_time)))
246 self.logger.info("Download completed in " + dl_time + " seconds.")
248 return self.pack.get_filepath()
250 def on_ping(self, _: ServerConnection, __: Event):
251 """
252 Handles a ping event.
253 Used for timeout checks
254 :param _: The IRC connection
255 :param __: The received event
256 :return: None
257 """
258 self.logger.debug("PING")
259 if not self.message_sent \
260 and self.timeout < (time.time() - self.connect_start_time) \
261 and not self.timed_out:
262 self.logger.warning("Timeout")
263 self.timed_out = True
264 raise Timeout()
266 def on_nosuchnick(self, _: ServerConnection, __: Event):
267 """
268 When a bot does not exist or is not online right now, aborts.
269 :param _: The IRC connection
270 :param __: The received event
271 :return: None
272 """
273 self.logger.warning("This bot does not exist on this server")
274 raise BotDoesNotExist()
276 def on_welcome(self, conn: ServerConnection, _: Event):
277 """
278 The 'welcome' event indicates a successful connection to the server
279 Sends a whois command to find the bot on the server
280 :param conn: The connection
281 :param _: The 'welcome' event
282 :return: None
283 """
284 self.logger.info("Connected to server")
285 conn.whois(self.pack.bot)
287 def on_whoischannels(self, conn: ServerConnection, event: Event):
288 """
289 The 'whoischannels' event indicates that a whois request has found
290 channels that the bot is a part of.
291 Channels that the bot has joined will be joined as well.
292 :param conn: The connection
293 :param event: The 'whoischannels' event
294 :return: None
295 """
296 self.logger.info("WHOIS: " + str(event.arguments))
297 channels = event.arguments[1].split("#")
298 channels.pop(0)
299 channels = list(map(lambda x: "#" + x.split(" ")[0], channels))
300 self.channels = channels
302 for channel in channels:
303 # Join all channels to avoid only joining a members-only channel
304 time.sleep(random.randint(1, 3))
305 self.logger.info(f"Joining channel {channel}")
306 conn.join(channel)
308 def on_endofwhois(self, conn: ServerConnection, _: Event):
309 """
310 The 'endofwhois' event indicates the end of a whois request.
311 This manually calls on_join in case the bot
312 has not joined any channels.
313 :param conn: The connection
314 :param _: The 'endofwhois' event
315 :return: None
316 """
317 self.logger.info("WHOIS End")
318 if self.channels is None:
319 if self.fallback_channel is not None:
320 channel = self.fallback_channel
321 if not channel.startswith("#"):
322 channel = "#" + channel
323 conn.join(channel)
324 return
325 else:
326 self.on_join(conn, _, True)
328 def on_join(
329 self,
330 conn: ServerConnection,
331 event: Event,
332 force: bool = False
333 ):
334 """
335 The 'join' event indicates that a channel was successfully joined.
336 The first on_join call will send a message to the bot that requests
337 the initialization of the XDCC file transfer.
338 :param conn: The connection
339 :param event: The 'join' event
340 :param force: If set to True, will force sending an XDCC message
341 :return: None
342 """
343 # Make sure we were the ones joining
344 if not event.source.startswith(self.user.get_name()) and not force:
345 return
346 if force:
347 self.logger.info(
348 "Didn't find a channel using WHOIS, "
349 "trying to send message anyways"
350 )
351 else:
352 self.logger.info("Joined Channel: " + event.target)
354 if not self.message_sent:
355 self._send_xdcc_request_message(conn)
357 def on_ctcp(self, conn: ServerConnection, event: Event):
358 """
359 The 'ctcp' event indicates that a CTCP message was received.
360 The downloader receives a CTCP from the bot to initialize the
361 XDCC file transfer.
362 Handles DCC ACCEPT and SEND messages. Other DCC messages will result
363 in a raised InvalidCTCP exception.
364 DCC ACCEPT will only occur when a resume request was sent successfully.
365 DCC SEND will occur when the bot offers a file.
366 :param conn: The connection
367 :param event: The 'ctcp' event
368 :return: None
369 :raise InvalidCTCPException: In case no valid DCC message was received
370 """
372 def start_download(append: bool = False):
373 """
374 Helper method that starts the download of an XDCC pack
375 :param append: If set to True, opens the file in append mode
376 :return: None
377 """
378 self.xdcc_timestamp = time.time()
379 mode = "ab" if append else "wb"
380 self.logger.info("Starting Download (" + mode + ")")
381 self.downloading = True
383 self.xdcc_file = open(self.pack.get_filepath(), mode)
384 self.xdcc_connection = self.dcc("raw")
385 self.xdcc_connection.connect(self.peer_address, self.peer_port)
386 # self.xdcc_connection.socket.settimeout(5)
388 self.ack_thread.start()
390 self.logger.info("CTCP Message: " + str(event.arguments))
391 if event.arguments[0] == "DCC":
392 payload = shlex.split(event.arguments[1])
394 if payload[0] == "SEND":
396 filename = payload[1]
397 self.peer_address = ip_numstr_to_quad(payload[2])
398 self.peer_port = int(payload[3])
399 self.filesize = int(payload[4])
401 self.pack.set_filename(filename)
403 if os.path.isfile(self.pack.get_filepath()):
405 position = os.path.getsize(self.pack.get_filepath())
407 if position >= self.filesize:
408 raise AlreadyDownloadedException(self.pack.filename)
410 self.logger.info("Requesting Resume")
411 self.progress = position
412 bot = event.source.split("!")[0]
413 resume_param = "\"" + filename + "\" " + \
414 str(self.peer_port) + " " + str(position)
415 conn.ctcp("DCC RESUME", bot, resume_param)
417 else:
418 start_download()
420 elif payload[0] == "ACCEPT":
421 start_download(append=True)
423 else:
424 raise InvalidCTCPException(payload[0])
426 def on_dccmsg(self, _: ServerConnection, event: Event):
427 """
428 The 'dccmsg' event contains the file data.
429 :param _: The connection
430 :param event: The 'dccmsg' event
431 :return: None
432 """
433 if self.xdcc_file is None:
434 return
436 data = event.arguments[0]
437 chunk_size = len(data)
439 self.xdcc_file.write(data)
440 self.progress += chunk_size
442 # Limit the download speed
443 if self.download_limit != -1:
444 delta = abs(time.time() - self.xdcc_timestamp)
445 chunk_time = chunk_size / self.download_limit
446 sleep_time = chunk_time - delta
448 if sleep_time > 0:
449 self.logger.debug(
450 "{Throttling for %.2f seconds} " % sleep_time
451 )
452 time.sleep(sleep_time)
454 self._ack()
455 self.xdcc_timestamp = time.time()
457 def on_dcc_disconnect(self, _: ServerConnection, __: Event):
458 """
459 The 'dccmsg' event contains the file data.
460 :param _: The connection
461 :param __: The 'dccmsg' event
462 :return: None
463 """
464 self.downloading = False
466 if self.xdcc_file is not None:
467 self.xdcc_file.close()
469 if self.progress >= self.filesize:
470 raise DownloadCompleted()
471 else:
472 raise DownloadIncomplete()
474 # noinspection PyMethodMayBeStatic
475 def on_privnotice(self, _: ServerConnection, event: Event):
476 """
477 Handles privnotices. Bots sometimes send privnotices when
478 a pack was already requested or the user is put into a queue.
480 If the privnotice indicates that a pack was already requested,
481 the downloader will pause for 60 seconds
483 :param _: The connection
484 :param event: The privnotice event
485 :return: None
486 """
487 if "you already requested" in event.arguments[0].lower():
488 raise PackAlreadyRequested()
489 else:
490 self.logger.debug("privnotice: {}:{}".format(
491 str(event.source), str(event.arguments))
492 )
493 # TODO Handle queues
495 def on_error(self, _: ServerConnection, __: Event):
496 """
497 Sometimes, the connection gives an error which may prove fatal for
498 the download process. A possible cause of error events is a banned
499 IP address.
500 :param _: The connection
501 :param __: The error event
502 :return: None
503 """
504 self.logger.warning("Unrecoverable Error: Is this IP banned?")
505 raise UnrecoverableError()
507 def _send_xdcc_request_message(self, conn: ServerConnection):
508 """
509 Sends an XDCC request message
510 :param conn: The connection to use
511 :return: None
512 """
513 self.logger.info("Waiting for {}s before sending message"
514 .format(self.wait_time))
515 time.sleep(self.wait_time)
517 msg = self.pack.get_request_message()
518 self.logger.info("Send XDCC Message: " + msg)
519 self.message_sent = True
520 conn.privmsg(self.pack.bot, msg)
522 def _ack(self):
523 """
524 Sends the acknowledgement to the XDCC bot that a chunk
525 has been received.
526 This process is sometimes responsible for the program hanging due to a
527 stuck socket.send.
528 This is mitigated by completely disconnecting the client and restarting
529 the download process with a new XDCC CLient
530 :return: None
531 """
532 # It seems the DCC connection dies when downloading with
533 # max speed and using Q structs. Why that is I do not know.
534 # But because of this we'll use progressively larger struct types
535 # Whenever the old one gets too small
536 try:
537 payload = struct.pack(self.struct_format, self.progress)
538 self.ack_queue.append(payload)
539 except struct.error:
541 if self.struct_format == b"!I":
542 self.struct_format = b"!L"
543 elif self.struct_format == b"!L":
544 self.struct_format = b"!Q"
545 else:
546 self.logger.error("File too large for structs")
547 self._disconnect()
548 return
550 self._ack()
551 return
553 def _disconnect(self):
554 """
555 Disconnects all connections of the XDCC Client
556 :return: None
557 """
558 self.logger.info("Initializing Disconnect")
559 self.connection.reactor.disconnect_all()
561 def timeout_watcher(self):
562 """
563 Monitors when the XDCC message is sent. If it is not sent by the
564 timeout time, a ping will be sent and handled by the on_ping method
565 :return: None
566 """
567 while not self.connected \
568 or self.connect_start_time + self.wait_time > time.time():
569 time.sleep(0.5)
571 self.logger.info("Timeout watcher started")
572 while not self.message_sent and not self.disconnected:
573 time.sleep(1)
574 if self.timeout < (time.time() - self.connect_start_time):
575 self.logger.info("Timeout detected")
576 self.connection.ping(self.server.address)
577 break
578 self.logger.info("Message sent without timeout")
580 def send_acks(self):
581 while self.downloading:
582 self.ack_lock.acquire()
583 try:
584 if len(self.ack_queue) > 0:
585 self.xdcc_connection.socket.send(self.ack_queue.pop(0))
586 else:
587 time.sleep(0.5)
588 except socket.timeout:
589 self.logger.debug("ACK timed out")
590 continue
591 except AttributeError:
592 self.logger.warning("Missing XDCC socket")
593 # This happens sometimes, don't ask me why though
594 continue
595 finally:
596 self.ack_lock.release()
598 def progress_printer(self):
599 """
600 Prints the download progress
601 Should run in a separate thread to avoid blocking up the IO which
602 could lead to reduced download speeds
603 :return: None
604 """
605 speed_progress = []
606 while not self.downloading and not self.disconnected:
607 pass
608 self.logger.info("Progress Printer started")
609 time.sleep(1)
611 printing = self.downloading and not self.disconnected
612 while printing:
613 printing = self.downloading and not self.disconnected
615 speed_progress.append({
616 "timestamp": time.time(),
617 "progress": self.progress
618 })
619 while len(speed_progress) > 0 \
620 and time.time() - speed_progress[0]["timestamp"] > 7:
621 speed_progress.pop(0)
623 if len(speed_progress) > 0:
624 bytes_delta = self.progress - speed_progress[0]["progress"]
625 time_delta = time.time() - speed_progress[0]["timestamp"]
626 ratio = int(bytes_delta / time_delta)
627 speed = human_readable_bytes(ratio) + "/s"
628 else:
629 speed = "0B/s"
631 percentage = "%.2f" % (100 * (self.progress / self.filesize))
633 log_message = "[{}]: ({}%) |{}/{}| ({})".format(
634 self.pack.filename,
635 percentage,
636 human_readable_bytes(
637 self.progress, remove_trailing_zeroes=False
638 ),
639 human_readable_bytes(self.filesize),
640 speed
641 )
643 try:
644 rows, _columns = check_output(['stty', 'size']).split()
645 columns = int(_columns)
646 except (ValueError, CalledProcessError):
647 columns = 80
648 log_message = log_message[0:columns]
650 pprint(log_message, end="\r", bg="lyellow", fg="black")
651 time.sleep(0.1)
652 self.logger.info("Progress Printer stopped")