Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2016 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of xdcc-dl. 

5 

6xdcc-dl is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11xdcc-dl is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with xdcc-dl. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import os 

21import time 

22import struct 

23import shlex 

24import socket 

25import random 

26import logging 

27import irc.events 

28import irc.client 

29from colorama import Fore, Back 

30from threading import Thread, Lock 

31from subprocess import check_output, CalledProcessError 

32from typing import Optional, IO, Any, List, Union 

33from puffotter.units import human_readable_bytes, byte_string_to_byte_count 

34from puffotter.print import pprint 

35from puffotter.logging import ColorLogger 

36from xdcc_dl.entities import User, XDCCPack 

37from xdcc_dl.xdcc.exceptions import InvalidCTCPException, \ 

38 AlreadyDownloadedException, DownloadCompleted, DownloadIncomplete, \ 

39 PackAlreadyRequested, UnrecoverableError, Timeout, BotDoesNotExist 

40from irc.client import SimpleIRCClient, ServerConnection, Event, \ 

41 ip_numstr_to_quad, DCCConnection 

42 

43 

44class XDCCClient(SimpleIRCClient): 

45 """ 

46 IRC Client that can download an XDCC pack 

47 """ 

48 

49 def __init__( 

50 self, 

51 pack: XDCCPack, 

52 retry: bool = False, 

53 timeout: int = 120, 

54 fallback_channel: Optional[str] = None, 

55 throttle: Union[int, str] = -1, 

56 wait_time: int = 0, 

57 username: str = "", 

58 channel_join_delay: Optional[int] = None 

59 ): 

60 """ 

61 Initializes the XDCC IRC client 

62 :param pack: The pack to downloadX 

63 :param retry: Set to true for retried downloads. 

64 :param timeout: Sets the timeout time for starting downloads 

65 :param fallback_channel: A fallback channel for when whois 

66 fails to find a valid channel 

67 :param throttle: Throttles the download to n bytes per second. 

68 If this value is <= 0, the download speed will be 

69 unlimited 

70 :param wait_time: Waits for the specified amount of time before sending 

71 a message 

72 :param username: If specified sets the username to log on with 

73 :param channel_join_delay: If specifies sets the channel join delay 

74 """ 

75 self.logger = ColorLogger( 

76 logging.getLogger(self.__class__.__name__), 

77 warning_bg=Back.RED, 

78 warning_fg=Fore.BLACK 

79 ) 

80 

81 # Save us from decoding errors and excessive logging output! 

82 irc.client.ServerConnection.buffer_class.errors = "replace" 

83 irc.client.log.setLevel(logging.ERROR) 

84 

85 if isinstance(throttle, str): 

86 self.download_limit = byte_string_to_byte_count(throttle) 

87 else: 

88 self.download_limit = throttle 

89 if self.download_limit <= 0: 

90 self.download_limit = -1 

91 

92 self.user = User(username) 

93 self.pack = pack 

94 self.server = pack.server 

95 self.downloading = False 

96 self.xdcc_timestamp = 0.0 

97 self.channels = None # type: Optional[List[str]] 

98 self.message_sent = False 

99 self.connect_start_time = 0.0 

100 self.timeout = timeout + wait_time 

101 self.timed_out = False 

102 self.fallback_channel = fallback_channel 

103 self.wait_time = wait_time 

104 self.connected = True 

105 self.disconnected = False 

106 

107 if channel_join_delay is None: 

108 self.channel_join_delay = random.randint(5, 10) 

109 else: 

110 self.channel_join_delay = channel_join_delay 

111 

112 # XDCC state variables 

113 self.peer_address = "" 

114 self.peer_port = -1 

115 self.filesize = -1 

116 self.progress = 0 

117 self.xdcc_file = None # type: Optional[IO[Any]] 

118 self.xdcc_connection = None # type: Optional[DCCConnection] 

119 self.retry = retry 

120 self.struct_format = b"!I" 

121 self.ack_lock = Lock() 

122 

123 if not self.retry: 

124 if self.download_limit == -1: 

125 limit = "\"unlimited\"" 

126 else: 

127 limit = str(self.download_limit) 

128 self.logger.info("Download Limit set to: " + limit) 

129 

130 self.timeout_watcher_thread = Thread(target=self.timeout_watcher) 

131 self.progress_printer_thread = Thread(target=self.progress_printer) 

132 

133 super().__init__() 

134 

135 # Create a log command for all events that are printed in debug mode. 

136 # If methods are overriden manually, these generated methods won't take 

137 # effect. 

138 for event in irc.events.all: 

139 exec( 

140 "def on_{}(self, c, e):\n" 

141 " self.handle_generic_event(\"{}\", c, e)" 

142 "".format(event, event) 

143 ) 

144 

145 def handle_generic_event( 

146 self, 

147 event_type: str, 

148 _: ServerConnection, 

149 event: Event 

150 ): 

151 """ 

152 Handles a generic event that isn't handled explicitly 

153 :param event_type: The event type to handle 

154 :param _: The connection to use 

155 :param event: The received event 

156 :return: None 

157 """ 

158 self.logger.debug("{}:{} {}".format( 

159 event_type, 

160 event.source, 

161 event.arguments 

162 )) 

163 

164 def download(self) -> str: 

165 """ 

166 Downloads the pack 

167 :return: The path to the downloaded file 

168 """ 

169 error = False 

170 completed = False 

171 pause = 0 

172 

173 message = "" 

174 

175 try: 

176 self.logger.info(f"Connecting to " 

177 f"{self.server.address}:{self.server.port} " 

178 f"as user '{self.user.username}'") 

179 self.connect( 

180 self.server.address, 

181 self.server.port, 

182 self.user.username 

183 ) 

184 

185 self.logger.info(f"Delaying download initialization by " 

186 f"{self.channel_join_delay}s") 

187 time.sleep(self.channel_join_delay) 

188 

189 self.connected = True 

190 

191 self.connect_start_time = time.time() 

192 

193 self.timeout_watcher_thread.start() 

194 self.progress_printer_thread.start() 

195 

196 self.start() 

197 except AlreadyDownloadedException: 

198 self.logger.warning("File already downloaded") 

199 completed = True 

200 except DownloadCompleted: 

201 message = "File {} downloaded successfully"\ 

202 .format(self.pack.filename) 

203 completed = True 

204 except DownloadIncomplete: 

205 message = "File {} not downloaded successfully" \ 

206 .format(self.pack.filename) 

207 completed = False 

208 except PackAlreadyRequested: 

209 message = "Pack already requested." 

210 completed = False 

211 pause = 60 

212 except UnrecoverableError: 

213 error = True 

214 finally: 

215 self.connected = False 

216 self.disconnected = True 

217 self.timeout_watcher_thread.join() 

218 self.progress_printer_thread.join() 

219 print("\n" + message) 

220 

221 self.logger.info("Disconnecting") 

222 try: 

223 self._disconnect() 

224 except (DownloadCompleted, ): 

225 pass 

226 

227 if error: 

228 self.logger.info("Aborting because of unrecoverable error") 

229 return "Failed" 

230 

231 self.logger.debug("Pausing for {}s".format(pause)) 

232 time.sleep(pause) 

233 

234 if not completed: 

235 self.logger.warning("Download Incomplete. Retrying.") 

236 retry_client = XDCCClient(self.pack, True, self.timeout) 

237 retry_client.download_limit = self.download_limit 

238 retry_client.download() 

239 

240 if not self.retry: 

241 dl_time = str(int(abs(time.time() - self.connect_start_time))) 

242 self.logger.info("Download completed in " + dl_time + " seconds.") 

243 

244 return self.pack.get_filepath() 

245 

246 def on_ping(self, _: ServerConnection, __: Event): 

247 """ 

248 Handles a ping event. 

249 Used for timeout checks 

250 :param _: The IRC connection 

251 :param __: The received event 

252 :return: None 

253 """ 

254 self.logger.debug("PING") 

255 if not self.message_sent \ 

256 and self.timeout < (time.time() - self.connect_start_time) \ 

257 and not self.timed_out: 

258 self.logger.warning("Timeout") 

259 self.timed_out = True 

260 raise Timeout() 

261 

262 def on_nosuchnick(self, _: ServerConnection, __: Event): 

263 """ 

264 When a bot does not exist or is not online right now, aborts. 

265 :param _: The IRC connection 

266 :param __: The received event 

267 :return: None 

268 """ 

269 self.logger.warning("This bot does not exist on this server") 

270 raise BotDoesNotExist() 

271 

272 def on_welcome(self, conn: ServerConnection, _: Event): 

273 """ 

274 The 'welcome' event indicates a successful connection to the server 

275 Sends a whois command to find the bot on the server 

276 :param conn: The connection 

277 :param _: The 'welcome' event 

278 :return: None 

279 """ 

280 self.logger.info("Connected to server") 

281 conn.whois(self.pack.bot) 

282 

283 def on_whoischannels(self, conn: ServerConnection, event: Event): 

284 """ 

285 The 'whoischannels' event indicates that a whois request has found 

286 channels that the bot is a part of. 

287 Channels that the bot has joined will be joined as well. 

288 :param conn: The connection 

289 :param event: The 'whoischannels' event 

290 :return: None 

291 """ 

292 self.logger.info("WHOIS: " + str(event.arguments)) 

293 channels = event.arguments[1].split("#") 

294 channels.pop(0) 

295 channels = list(map(lambda x: "#" + x.split(" ")[0], channels)) 

296 self.channels = channels 

297 

298 for channel in channels: 

299 # Join all channels to avoid only joining a members-only channel 

300 time.sleep(random.randint(1, 3)) 

301 self.logger.info(f"Joining channel {channel}") 

302 conn.join(channel) 

303 

304 def on_endofwhois(self, conn: ServerConnection, _: Event): 

305 """ 

306 The 'endofwhois' event indicates the end of a whois request. 

307 This manually calls on_join in case the bot 

308 has not joined any channels. 

309 :param conn: The connection 

310 :param _: The 'endofwhois' event 

311 :return: None 

312 """ 

313 self.logger.info("WHOIS End") 

314 if self.channels is None: 

315 if self.fallback_channel is not None: 

316 channel = self.fallback_channel 

317 if not channel.startswith("#"): 

318 channel = "#" + channel 

319 conn.join(channel) 

320 return 

321 else: 

322 self.on_join(conn, _, True) 

323 

324 def on_join( 

325 self, 

326 conn: ServerConnection, 

327 event: Event, 

328 force: bool = False 

329 ): 

330 """ 

331 The 'join' event indicates that a channel was successfully joined. 

332 The first on_join call will send a message to the bot that requests 

333 the initialization of the XDCC file transfer. 

334 :param conn: The connection 

335 :param event: The 'join' event 

336 :param force: If set to True, will force sending an XDCC message 

337 :return: None 

338 """ 

339 # Make sure we were the ones joining 

340 if not event.source.startswith(self.user.get_name()) and not force: 

341 return 

342 if force: 

343 self.logger.info( 

344 "Didn't find a channel using WHOIS, " 

345 "trying to send message anyways" 

346 ) 

347 else: 

348 self.logger.info("Joined Channel: " + event.target) 

349 

350 if not self.message_sent: 

351 self._send_xdcc_request_message(conn) 

352 

353 def on_ctcp(self, conn: ServerConnection, event: Event): 

354 """ 

355 The 'ctcp' event indicates that a CTCP message was received. 

356 The downloader receives a CTCP from the bot to initialize the 

357 XDCC file transfer. 

358 Handles DCC ACCEPT and SEND messages. Other DCC messages will result 

359 in a raised InvalidCTCP exception. 

360 DCC ACCEPT will only occur when a resume request was sent successfully. 

361 DCC SEND will occur when the bot offers a file. 

362 :param conn: The connection 

363 :param event: The 'ctcp' event 

364 :return: None 

365 :raise InvalidCTCPException: In case no valid DCC message was received 

366 """ 

367 

368 def start_download(append: bool = False): 

369 """ 

370 Helper method that starts the download of an XDCC pack 

371 :param append: If set to True, opens the file in append mode 

372 :return: None 

373 """ 

374 self.xdcc_timestamp = time.time() 

375 mode = "ab" if append else "wb" 

376 self.logger.info("Starting Download (" + mode + ")") 

377 self.downloading = True 

378 

379 self.xdcc_file = open(self.pack.get_filepath(), mode) 

380 self.xdcc_connection = self.dcc("raw") 

381 self.xdcc_connection.connect(self.peer_address, self.peer_port) 

382 self.xdcc_connection.socket.settimeout(5) 

383 

384 self.logger.info("CTCP Message: " + str(event.arguments)) 

385 if event.arguments[0] == "DCC": 

386 payload = shlex.split(event.arguments[1]) 

387 

388 if payload[0] == "SEND": 

389 

390 filename = payload[1] 

391 self.peer_address = ip_numstr_to_quad(payload[2]) 

392 self.peer_port = int(payload[3]) 

393 self.filesize = int(payload[4]) 

394 

395 self.pack.set_filename(filename) 

396 

397 if os.path.isfile(self.pack.get_filepath()): 

398 

399 position = os.path.getsize(self.pack.get_filepath()) 

400 

401 if position >= self.filesize: 

402 raise AlreadyDownloadedException(self.pack.filename) 

403 

404 self.logger.info("Requesting Resume") 

405 self.progress = position 

406 bot = event.source.split("!")[0] 

407 resume_param = "\"" + filename + "\" " + \ 

408 str(self.peer_port) + " " + str(position) 

409 conn.ctcp("DCC RESUME", bot, resume_param) 

410 

411 else: 

412 start_download() 

413 

414 elif payload[0] == "ACCEPT": 

415 start_download(append=True) 

416 

417 else: 

418 raise InvalidCTCPException(payload[0]) 

419 

420 def on_dccmsg(self, _: ServerConnection, event: Event): 

421 """ 

422 The 'dccmsg' event contains the file data. 

423 :param _: The connection 

424 :param event: The 'dccmsg' event 

425 :return: None 

426 """ 

427 if self.xdcc_file is None: 

428 return 

429 

430 data = event.arguments[0] 

431 chunk_size = len(data) 

432 

433 self.xdcc_file.write(data) 

434 self.progress += chunk_size 

435 

436 # Limit the download speed 

437 if self.download_limit != -1: 

438 delta = abs(time.time() - self.xdcc_timestamp) 

439 chunk_time = chunk_size / self.download_limit 

440 sleep_time = chunk_time - delta 

441 

442 if sleep_time > 0: 

443 self.logger.debug( 

444 "{Throttling for %.2f seconds} " % sleep_time 

445 ) 

446 time.sleep(sleep_time) 

447 

448 self._ack() 

449 self.xdcc_timestamp = time.time() 

450 

451 def on_dcc_disconnect(self, _: ServerConnection, __: Event): 

452 """ 

453 The 'dccmsg' event contains the file data. 

454 :param _: The connection 

455 :param __: The 'dccmsg' event 

456 :return: None 

457 """ 

458 self.downloading = False 

459 

460 if self.xdcc_file is not None: 

461 self.xdcc_file.close() 

462 

463 if self.progress >= self.filesize: 

464 raise DownloadCompleted() 

465 else: 

466 raise DownloadIncomplete() 

467 

468 # noinspection PyMethodMayBeStatic 

469 def on_privnotice(self, _: ServerConnection, event: Event): 

470 """ 

471 Handles privnotices. Bots sometimes send privnotices when 

472 a pack was already requested or the user is put into a queue. 

473 

474 If the privnotice indicates that a pack was already requested, 

475 the downloader will pause for 60 seconds 

476 

477 :param _: The connection 

478 :param event: The privnotice event 

479 :return: None 

480 """ 

481 if "you already requested" in event.arguments[0].lower(): 

482 raise PackAlreadyRequested() 

483 else: 

484 self.logger.debug("privnotice: {}:{}".format( 

485 str(event.source), str(event.arguments)) 

486 ) 

487 # TODO Handle queues 

488 

489 def on_error(self, _: ServerConnection, __: Event): 

490 """ 

491 Sometimes, the connection gives an error which may prove fatal for 

492 the download process. A possible cause of error events is a banned 

493 IP address. 

494 :param _: The connection 

495 :param __: The error event 

496 :return: None 

497 """ 

498 self.logger.warning("Unrecoverable Error: Is this IP banned?") 

499 raise UnrecoverableError() 

500 

501 def _send_xdcc_request_message(self, conn: ServerConnection): 

502 """ 

503 Sends an XDCC request message 

504 :param conn: The connection to use 

505 :return: None 

506 """ 

507 self.logger.info("Waiting for {}s before sending message" 

508 .format(self.wait_time)) 

509 time.sleep(self.wait_time) 

510 

511 msg = self.pack.get_request_message() 

512 self.logger.info("Send XDCC Message: " + msg) 

513 self.message_sent = True 

514 conn.privmsg(self.pack.bot, msg) 

515 

516 def _ack(self): 

517 """ 

518 Sends the acknowledgement to the XDCC bot that a chunk 

519 has been received. 

520 This process is sometimes responsible for the program hanging due to a 

521 stuck socket.send. 

522 This is mitigated by completely disconnecting the client and restarting 

523 the download process with a new XDCC CLient 

524 :return: None 

525 """ 

526 # It seems the DCC connection dies when downloading with 

527 # max speed and using Q structs. Why that is I do not know. 

528 # But because of this we'll use progressively larger struct types 

529 # Whenever the old one gets too small 

530 try: 

531 payload = struct.pack(self.struct_format, self.progress) 

532 except struct.error: 

533 

534 if self.struct_format == b"!I": 

535 self.struct_format = b"!L" 

536 elif self.struct_format == b"!L": 

537 self.struct_format = b"!Q" 

538 else: 

539 self.logger.error("File too large for structs") 

540 self._disconnect() 

541 return 

542 

543 self._ack() 

544 return 

545 

546 def acker(): 

547 """ 

548 The actual ack will be sent using a different thread since that 

549 somehow avoids the socket timing out for some reason. 

550 :return: None 

551 """ 

552 

553 self.ack_lock.acquire() 

554 try: 

555 self.xdcc_connection.socket.send(payload) 

556 except socket.timeout: 

557 self.logger.debug("ACK timed out") 

558 self._disconnect() 

559 finally: 

560 self.ack_lock.release() 

561 Thread(target=acker).start() 

562 

563 def _disconnect(self): 

564 """ 

565 Disconnects all connections of the XDCC Client 

566 :return: None 

567 """ 

568 self.connection.reactor.disconnect_all() 

569 

570 def timeout_watcher(self): 

571 """ 

572 Monitors when the XDCC message is sent. If it is not sent by the 

573 timeout time, a ping will be sent and handled by the on_ping method 

574 :return: None 

575 """ 

576 while not self.connected \ 

577 or self.connect_start_time + self.wait_time > time.time(): 

578 pass 

579 self.logger.info("Timeout watcher started") 

580 while not self.message_sent and not self.disconnected: 

581 time.sleep(1) 

582 self.logger.debug("Iterating timeout thread") 

583 if self.timeout < (time.time() - self.connect_start_time): 

584 self.logger.info("Timeout detected") 

585 self.connection.ping(self.server.address) 

586 break 

587 self.logger.info("Message sent without timeout") 

588 

589 def progress_printer(self): 

590 """ 

591 Prints the download progress 

592 Should run in a separate thread to avoid blocking up the IO which 

593 could lead to reduced download speeds 

594 :return: None 

595 """ 

596 speed_progress = [] 

597 while not self.downloading and not self.disconnected: 

598 pass 

599 self.logger.info("Progress Printer started") 

600 time.sleep(1) 

601 

602 printing = self.downloading and not self.disconnected 

603 while printing: 

604 printing = self.downloading and not self.disconnected 

605 

606 speed_progress.append({ 

607 "timestamp": time.time(), 

608 "progress": self.progress 

609 }) 

610 while len(speed_progress) > 0 \ 

611 and time.time() - speed_progress[0]["timestamp"] > 7: 

612 speed_progress.pop(0) 

613 

614 if len(speed_progress) > 0: 

615 bytes_delta = self.progress - speed_progress[0]["progress"] 

616 time_delta = time.time() - speed_progress[0]["timestamp"] 

617 ratio = int(bytes_delta / time_delta) 

618 speed = human_readable_bytes(ratio) + "/s" 

619 else: 

620 speed = "0B/s" 

621 

622 percentage = "%.2f" % (100 * (self.progress / self.filesize)) 

623 

624 log_message = "[{}]: ({}%) |{}/{}| ({})".format( 

625 self.pack.filename, 

626 percentage, 

627 human_readable_bytes( 

628 self.progress, remove_trailing_zeroes=False 

629 ), 

630 human_readable_bytes(self.filesize), 

631 speed 

632 ) 

633 

634 try: 

635 rows, _columns = check_output(['stty', 'size']).split() 

636 columns = int(_columns) 

637 except (ValueError, CalledProcessError): 

638 columns = 80 

639 log_message = log_message[0:columns] 

640 

641 pprint(log_message, end="\r", bg="lyellow", fg="black") 

642 time.sleep(0.1) 

643 self.logger.info("Progress Printer stopped")