Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""LICENSE 

2Copyright 2016 Hermann Krumrey <hermann@krumreyh.com> 

3 

4This file is part of xdcc-dl. 

5 

6xdcc-dl is free software: you can redistribute it and/or modify 

7it under the terms of the GNU General Public License as published by 

8the Free Software Foundation, either version 3 of the License, or 

9(at your option) any later version. 

10 

11xdcc-dl is distributed in the hope that it will be useful, 

12but WITHOUT ANY WARRANTY; without even the implied warranty of 

13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

14GNU General Public License for more details. 

15 

16You should have received a copy of the GNU General Public License 

17along with xdcc-dl. If not, see <http://www.gnu.org/licenses/>. 

18LICENSE""" 

19 

20import os 

21import time 

22import struct 

23import shlex 

24import socket 

25import random 

26import logging 

27import irc.events 

28import irc.client 

29from colorama import Fore, Back 

30from threading import Thread, Lock 

31from subprocess import check_output, CalledProcessError 

32from typing import Optional, IO, Any, List, Union 

33from puffotter.units import human_readable_bytes, byte_string_to_byte_count 

34from puffotter.print import pprint 

35from puffotter.logging import ColorLogger 

36from xdcc_dl.entities import User, XDCCPack 

37from xdcc_dl.xdcc.exceptions import InvalidCTCPException, \ 

38 AlreadyDownloadedException, DownloadCompleted, DownloadIncomplete, \ 

39 PackAlreadyRequested, UnrecoverableError, Timeout, BotDoesNotExist 

40from irc.client import SimpleIRCClient, ServerConnection, Event, \ 

41 ip_numstr_to_quad, DCCConnection 

42 

43 

44class XDCCClient(SimpleIRCClient): 

45 """ 

46 IRC Client that can download an XDCC pack 

47 """ 

48 

49 def __init__( 

50 self, 

51 pack: XDCCPack, 

52 retry: bool = False, 

53 timeout: int = 120, 

54 fallback_channel: Optional[str] = None, 

55 throttle: Union[int, str] = -1, 

56 wait_time: int = 0, 

57 username: str = "", 

58 channel_join_delay: Optional[int] = None 

59 ): 

60 """ 

61 Initializes the XDCC IRC client 

62 :param pack: The pack to downloadX 

63 :param retry: Set to true for retried downloads. 

64 :param timeout: Sets the timeout time for starting downloads 

65 :param fallback_channel: A fallback channel for when whois 

66 fails to find a valid channel 

67 :param throttle: Throttles the download to n bytes per second. 

68 If this value is <= 0, the download speed will be 

69 unlimited 

70 :param wait_time: Waits for the specified amount of time before sending 

71 a message 

72 :param username: If specified sets the username to log on with 

73 :param channel_join_delay: If specifies sets the channel join delay 

74 """ 

75 self.logger = ColorLogger( 

76 logging.getLogger(self.__class__.__name__), 

77 warning_bg=Back.RED, 

78 warning_fg=Fore.BLACK 

79 ) 

80 

81 # Save us from decoding errors and excessive logging output! 

82 irc.client.ServerConnection.buffer_class.errors = "replace" 

83 irc.client.log.setLevel(logging.ERROR) 

84 

85 if isinstance(throttle, str): 

86 self.download_limit = byte_string_to_byte_count(throttle) 

87 else: 

88 self.download_limit = throttle 

89 if self.download_limit <= 0: 

90 self.download_limit = -1 

91 

92 self.user = User(username) 

93 self.pack = pack 

94 self.server = pack.server 

95 self.downloading = False 

96 self.xdcc_timestamp = 0.0 

97 self.channels = None # type: Optional[List[str]] 

98 self.message_sent = False 

99 self.connect_start_time = 0.0 

100 self.timeout = timeout + wait_time 

101 self.timed_out = False 

102 self.fallback_channel = fallback_channel 

103 self.wait_time = wait_time 

104 self.connected = True 

105 self.disconnected = False 

106 

107 if channel_join_delay is None: 

108 self.channel_join_delay = random.randint(5, 10) 

109 else: 

110 self.channel_join_delay = channel_join_delay 

111 

112 # XDCC state variables 

113 self.peer_address = "" 

114 self.peer_port = -1 

115 self.filesize = -1 

116 self.progress = 0 

117 self.xdcc_file = None # type: Optional[IO[Any]] 

118 self.xdcc_connection = None # type: Optional[DCCConnection] 

119 self.retry = retry 

120 self.struct_format = b"!I" 

121 self.ack_queue: List[bytes] = [] 

122 self.ack_thread = Thread(target=self.send_acks) 

123 self.ack_lock = Lock() 

124 

125 if not self.retry: 

126 if self.download_limit == -1: 

127 limit = "\"unlimited\"" 

128 else: 

129 limit = str(self.download_limit) 

130 self.logger.info("Download Limit set to: " + limit) 

131 

132 self.timeout_watcher_thread = Thread(target=self.timeout_watcher) 

133 self.progress_printer_thread = Thread(target=self.progress_printer) 

134 

135 super().__init__() 

136 

137 # Create a log command for all events that are printed in debug mode. 

138 # If methods are overriden manually, these generated methods won't take 

139 # effect. 

140 for event in irc.events.all: 

141 exec( 

142 "def on_{}(self, c, e):\n" 

143 " self.handle_generic_event(\"{}\", c, e)" 

144 "".format(event, event) 

145 ) 

146 

147 def handle_generic_event( 

148 self, 

149 event_type: str, 

150 _: ServerConnection, 

151 event: Event 

152 ): 

153 """ 

154 Handles a generic event that isn't handled explicitly 

155 :param event_type: The event type to handle 

156 :param _: The connection to use 

157 :param event: The received event 

158 :return: None 

159 """ 

160 self.logger.debug("{}:{} {}".format( 

161 event_type, 

162 event.source, 

163 event.arguments 

164 )) 

165 

166 def download(self) -> str: 

167 """ 

168 Downloads the pack 

169 :return: The path to the downloaded file 

170 """ 

171 error = False 

172 completed = False 

173 pause = 0 

174 

175 message = "" 

176 

177 try: 

178 self.logger.info(f"Connecting to " 

179 f"{self.server.address}:{self.server.port} " 

180 f"as user '{self.user.username}'") 

181 self.connect( 

182 self.server.address, 

183 self.server.port, 

184 self.user.username 

185 ) 

186 

187 self.logger.info(f"Delaying download initialization by " 

188 f"{self.channel_join_delay}s") 

189 time.sleep(self.channel_join_delay) 

190 

191 self.connected = True 

192 

193 self.connect_start_time = time.time() 

194 

195 self.timeout_watcher_thread.start() 

196 self.progress_printer_thread.start() 

197 

198 self.start() 

199 except AlreadyDownloadedException: 

200 self.logger.warning("File already downloaded") 

201 completed = True 

202 except DownloadCompleted: 

203 message = "File {} downloaded successfully"\ 

204 .format(self.pack.filename) 

205 completed = True 

206 except DownloadIncomplete: 

207 message = "File {} not downloaded successfully" \ 

208 .format(self.pack.filename) 

209 completed = False 

210 except PackAlreadyRequested: 

211 message = "Pack already requested." 

212 completed = False 

213 pause = 60 

214 except UnrecoverableError: 

215 error = True 

216 finally: 

217 self.connected = False 

218 self.disconnected = True 

219 self.logger.info("Joining threads") 

220 self.timeout_watcher_thread.join() 

221 self.progress_printer_thread.join() 

222 self.ack_thread.join() 

223 print("\n" + message) 

224 

225 self.logger.info("Disconnecting") 

226 try: 

227 self._disconnect() 

228 except (DownloadCompleted, ): 

229 pass 

230 

231 if error: 

232 self.logger.info("Aborting because of unrecoverable error") 

233 return "Failed" 

234 

235 self.logger.debug("Pausing for {}s".format(pause)) 

236 time.sleep(pause) 

237 

238 if not completed: 

239 self.logger.warning("Download Incomplete. Retrying.") 

240 retry_client = XDCCClient(self.pack, True, self.timeout) 

241 retry_client.download_limit = self.download_limit 

242 retry_client.download() 

243 

244 if not self.retry: 

245 dl_time = str(int(abs(time.time() - self.connect_start_time))) 

246 self.logger.info("Download completed in " + dl_time + " seconds.") 

247 

248 return self.pack.get_filepath() 

249 

250 def on_ping(self, _: ServerConnection, __: Event): 

251 """ 

252 Handles a ping event. 

253 Used for timeout checks 

254 :param _: The IRC connection 

255 :param __: The received event 

256 :return: None 

257 """ 

258 self.logger.debug("PING") 

259 if not self.message_sent \ 

260 and self.timeout < (time.time() - self.connect_start_time) \ 

261 and not self.timed_out: 

262 self.logger.warning("Timeout") 

263 self.timed_out = True 

264 raise Timeout() 

265 

266 def on_nosuchnick(self, _: ServerConnection, __: Event): 

267 """ 

268 When a bot does not exist or is not online right now, aborts. 

269 :param _: The IRC connection 

270 :param __: The received event 

271 :return: None 

272 """ 

273 self.logger.warning("This bot does not exist on this server") 

274 raise BotDoesNotExist() 

275 

276 def on_welcome(self, conn: ServerConnection, _: Event): 

277 """ 

278 The 'welcome' event indicates a successful connection to the server 

279 Sends a whois command to find the bot on the server 

280 :param conn: The connection 

281 :param _: The 'welcome' event 

282 :return: None 

283 """ 

284 self.logger.info("Connected to server") 

285 conn.whois(self.pack.bot) 

286 

287 def on_whoischannels(self, conn: ServerConnection, event: Event): 

288 """ 

289 The 'whoischannels' event indicates that a whois request has found 

290 channels that the bot is a part of. 

291 Channels that the bot has joined will be joined as well. 

292 :param conn: The connection 

293 :param event: The 'whoischannels' event 

294 :return: None 

295 """ 

296 self.logger.info("WHOIS: " + str(event.arguments)) 

297 channels = event.arguments[1].split("#") 

298 channels.pop(0) 

299 channels = list(map(lambda x: "#" + x.split(" ")[0], channels)) 

300 self.channels = channels 

301 

302 for channel in channels: 

303 # Join all channels to avoid only joining a members-only channel 

304 time.sleep(random.randint(1, 3)) 

305 self.logger.info(f"Joining channel {channel}") 

306 conn.join(channel) 

307 

308 def on_endofwhois(self, conn: ServerConnection, _: Event): 

309 """ 

310 The 'endofwhois' event indicates the end of a whois request. 

311 This manually calls on_join in case the bot 

312 has not joined any channels. 

313 :param conn: The connection 

314 :param _: The 'endofwhois' event 

315 :return: None 

316 """ 

317 self.logger.info("WHOIS End") 

318 if self.channels is None: 

319 if self.fallback_channel is not None: 

320 channel = self.fallback_channel 

321 if not channel.startswith("#"): 

322 channel = "#" + channel 

323 conn.join(channel) 

324 return 

325 else: 

326 self.on_join(conn, _, True) 

327 

328 def on_join( 

329 self, 

330 conn: ServerConnection, 

331 event: Event, 

332 force: bool = False 

333 ): 

334 """ 

335 The 'join' event indicates that a channel was successfully joined. 

336 The first on_join call will send a message to the bot that requests 

337 the initialization of the XDCC file transfer. 

338 :param conn: The connection 

339 :param event: The 'join' event 

340 :param force: If set to True, will force sending an XDCC message 

341 :return: None 

342 """ 

343 # Make sure we were the ones joining 

344 if not event.source.startswith(self.user.get_name()) and not force: 

345 return 

346 if force: 

347 self.logger.info( 

348 "Didn't find a channel using WHOIS, " 

349 "trying to send message anyways" 

350 ) 

351 else: 

352 self.logger.info("Joined Channel: " + event.target) 

353 

354 if not self.message_sent: 

355 self._send_xdcc_request_message(conn) 

356 

357 def on_ctcp(self, conn: ServerConnection, event: Event): 

358 """ 

359 The 'ctcp' event indicates that a CTCP message was received. 

360 The downloader receives a CTCP from the bot to initialize the 

361 XDCC file transfer. 

362 Handles DCC ACCEPT and SEND messages. Other DCC messages will result 

363 in a raised InvalidCTCP exception. 

364 DCC ACCEPT will only occur when a resume request was sent successfully. 

365 DCC SEND will occur when the bot offers a file. 

366 :param conn: The connection 

367 :param event: The 'ctcp' event 

368 :return: None 

369 :raise InvalidCTCPException: In case no valid DCC message was received 

370 """ 

371 

372 def start_download(append: bool = False): 

373 """ 

374 Helper method that starts the download of an XDCC pack 

375 :param append: If set to True, opens the file in append mode 

376 :return: None 

377 """ 

378 self.xdcc_timestamp = time.time() 

379 mode = "ab" if append else "wb" 

380 self.logger.info("Starting Download (" + mode + ")") 

381 self.downloading = True 

382 

383 self.xdcc_file = open(self.pack.get_filepath(), mode) 

384 self.xdcc_connection = self.dcc("raw") 

385 self.xdcc_connection.connect(self.peer_address, self.peer_port) 

386 # self.xdcc_connection.socket.settimeout(5) 

387 

388 self.ack_thread.start() 

389 

390 self.logger.info("CTCP Message: " + str(event.arguments)) 

391 if event.arguments[0] == "DCC": 

392 payload = shlex.split(event.arguments[1]) 

393 

394 if payload[0] == "SEND": 

395 

396 filename = payload[1] 

397 self.peer_address = ip_numstr_to_quad(payload[2]) 

398 self.peer_port = int(payload[3]) 

399 self.filesize = int(payload[4]) 

400 

401 self.pack.set_filename(filename) 

402 

403 if os.path.isfile(self.pack.get_filepath()): 

404 

405 position = os.path.getsize(self.pack.get_filepath()) 

406 

407 if position >= self.filesize: 

408 raise AlreadyDownloadedException(self.pack.filename) 

409 

410 self.logger.info("Requesting Resume") 

411 self.progress = position 

412 bot = event.source.split("!")[0] 

413 resume_param = "\"" + filename + "\" " + \ 

414 str(self.peer_port) + " " + str(position) 

415 conn.ctcp("DCC RESUME", bot, resume_param) 

416 

417 else: 

418 start_download() 

419 

420 elif payload[0] == "ACCEPT": 

421 start_download(append=True) 

422 

423 else: 

424 raise InvalidCTCPException(payload[0]) 

425 

426 def on_dccmsg(self, _: ServerConnection, event: Event): 

427 """ 

428 The 'dccmsg' event contains the file data. 

429 :param _: The connection 

430 :param event: The 'dccmsg' event 

431 :return: None 

432 """ 

433 if self.xdcc_file is None: 

434 return 

435 

436 data = event.arguments[0] 

437 chunk_size = len(data) 

438 

439 self.xdcc_file.write(data) 

440 self.progress += chunk_size 

441 

442 # Limit the download speed 

443 if self.download_limit != -1: 

444 delta = abs(time.time() - self.xdcc_timestamp) 

445 chunk_time = chunk_size / self.download_limit 

446 sleep_time = chunk_time - delta 

447 

448 if sleep_time > 0: 

449 self.logger.debug( 

450 "{Throttling for %.2f seconds} " % sleep_time 

451 ) 

452 time.sleep(sleep_time) 

453 

454 self._ack() 

455 self.xdcc_timestamp = time.time() 

456 

457 def on_dcc_disconnect(self, _: ServerConnection, __: Event): 

458 """ 

459 The 'dccmsg' event contains the file data. 

460 :param _: The connection 

461 :param __: The 'dccmsg' event 

462 :return: None 

463 """ 

464 self.downloading = False 

465 

466 if self.xdcc_file is not None: 

467 self.xdcc_file.close() 

468 

469 if self.progress >= self.filesize: 

470 raise DownloadCompleted() 

471 else: 

472 raise DownloadIncomplete() 

473 

474 # noinspection PyMethodMayBeStatic 

475 def on_privnotice(self, _: ServerConnection, event: Event): 

476 """ 

477 Handles privnotices. Bots sometimes send privnotices when 

478 a pack was already requested or the user is put into a queue. 

479 

480 If the privnotice indicates that a pack was already requested, 

481 the downloader will pause for 60 seconds 

482 

483 :param _: The connection 

484 :param event: The privnotice event 

485 :return: None 

486 """ 

487 if "you already requested" in event.arguments[0].lower(): 

488 raise PackAlreadyRequested() 

489 else: 

490 self.logger.debug("privnotice: {}:{}".format( 

491 str(event.source), str(event.arguments)) 

492 ) 

493 # TODO Handle queues 

494 

495 def on_error(self, _: ServerConnection, __: Event): 

496 """ 

497 Sometimes, the connection gives an error which may prove fatal for 

498 the download process. A possible cause of error events is a banned 

499 IP address. 

500 :param _: The connection 

501 :param __: The error event 

502 :return: None 

503 """ 

504 self.logger.warning("Unrecoverable Error: Is this IP banned?") 

505 raise UnrecoverableError() 

506 

507 def _send_xdcc_request_message(self, conn: ServerConnection): 

508 """ 

509 Sends an XDCC request message 

510 :param conn: The connection to use 

511 :return: None 

512 """ 

513 self.logger.info("Waiting for {}s before sending message" 

514 .format(self.wait_time)) 

515 time.sleep(self.wait_time) 

516 

517 msg = self.pack.get_request_message() 

518 self.logger.info("Send XDCC Message: " + msg) 

519 self.message_sent = True 

520 conn.privmsg(self.pack.bot, msg) 

521 

522 def _ack(self): 

523 """ 

524 Sends the acknowledgement to the XDCC bot that a chunk 

525 has been received. 

526 This process is sometimes responsible for the program hanging due to a 

527 stuck socket.send. 

528 This is mitigated by completely disconnecting the client and restarting 

529 the download process with a new XDCC CLient 

530 :return: None 

531 """ 

532 # It seems the DCC connection dies when downloading with 

533 # max speed and using Q structs. Why that is I do not know. 

534 # But because of this we'll use progressively larger struct types 

535 # Whenever the old one gets too small 

536 try: 

537 payload = struct.pack(self.struct_format, self.progress) 

538 self.ack_queue.append(payload) 

539 except struct.error: 

540 

541 if self.struct_format == b"!I": 

542 self.struct_format = b"!L" 

543 elif self.struct_format == b"!L": 

544 self.struct_format = b"!Q" 

545 else: 

546 self.logger.error("File too large for structs") 

547 self._disconnect() 

548 return 

549 

550 self._ack() 

551 return 

552 

553 def _disconnect(self): 

554 """ 

555 Disconnects all connections of the XDCC Client 

556 :return: None 

557 """ 

558 self.logger.info("Initializing Disconnect") 

559 self.connection.reactor.disconnect_all() 

560 

561 def timeout_watcher(self): 

562 """ 

563 Monitors when the XDCC message is sent. If it is not sent by the 

564 timeout time, a ping will be sent and handled by the on_ping method 

565 :return: None 

566 """ 

567 while not self.connected \ 

568 or self.connect_start_time + self.wait_time > time.time(): 

569 time.sleep(0.5) 

570 

571 self.logger.info("Timeout watcher started") 

572 while not self.message_sent and not self.disconnected: 

573 time.sleep(1) 

574 if self.timeout < (time.time() - self.connect_start_time): 

575 self.logger.info("Timeout detected") 

576 self.connection.ping(self.server.address) 

577 break 

578 self.logger.info("Message sent without timeout") 

579 

580 def send_acks(self): 

581 while self.downloading: 

582 self.ack_lock.acquire() 

583 try: 

584 if len(self.ack_queue) > 0: 

585 self.xdcc_connection.socket.send(self.ack_queue.pop(0)) 

586 else: 

587 time.sleep(0.5) 

588 except socket.timeout: 

589 self.logger.debug("ACK timed out") 

590 continue 

591 except AttributeError: 

592 self.logger.warning("Missing XDCC socket") 

593 # This happens sometimes, don't ask me why though 

594 continue 

595 finally: 

596 self.ack_lock.release() 

597 

598 def progress_printer(self): 

599 """ 

600 Prints the download progress 

601 Should run in a separate thread to avoid blocking up the IO which 

602 could lead to reduced download speeds 

603 :return: None 

604 """ 

605 speed_progress = [] 

606 while not self.downloading and not self.disconnected: 

607 pass 

608 self.logger.info("Progress Printer started") 

609 time.sleep(1) 

610 

611 printing = self.downloading and not self.disconnected 

612 while printing: 

613 printing = self.downloading and not self.disconnected 

614 

615 speed_progress.append({ 

616 "timestamp": time.time(), 

617 "progress": self.progress 

618 }) 

619 while len(speed_progress) > 0 \ 

620 and time.time() - speed_progress[0]["timestamp"] > 7: 

621 speed_progress.pop(0) 

622 

623 if len(speed_progress) > 0: 

624 bytes_delta = self.progress - speed_progress[0]["progress"] 

625 time_delta = time.time() - speed_progress[0]["timestamp"] 

626 ratio = int(bytes_delta / time_delta) 

627 speed = human_readable_bytes(ratio) + "/s" 

628 else: 

629 speed = "0B/s" 

630 

631 percentage = "%.2f" % (100 * (self.progress / self.filesize)) 

632 

633 log_message = "[{}]: ({}%) |{}/{}| ({})".format( 

634 self.pack.filename, 

635 percentage, 

636 human_readable_bytes( 

637 self.progress, remove_trailing_zeroes=False 

638 ), 

639 human_readable_bytes(self.filesize), 

640 speed 

641 ) 

642 

643 try: 

644 rows, _columns = check_output(['stty', 'size']).split() 

645 columns = int(_columns) 

646 except (ValueError, CalledProcessError): 

647 columns = 80 

648 log_message = log_message[0:columns] 

649 

650 pprint(log_message, end="\r", bg="lyellow", fg="black") 

651 time.sleep(0.1) 

652 self.logger.info("Progress Printer stopped")