#!/usr/bin/env python3 """ Simple client for transfer.pw: reads from stdin or from files given as parameters Depends on following packages (tested with following version numbers): - requests v2.28.1 (https://pypi.org/project/requests/) - pycryptodome v3.15.0 (https://pypi.org/project/pycryptodome/) Install like: pip install 'requests>=2.28.1,<3.0.0' 'pycryptodome>=3.15.0,<4.0.0' Published under ISC License Copyright 2022 Arno Hollosi Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ import argparse import json import sys from base64 import b64encode from json import JSONDecodeError from typing import Tuple, BinaryIO, TypedDict import requests from Crypto.Cipher import AES from Crypto.Hash import SHA256 from Crypto.Protocol.KDF import PBKDF2 from Crypto.Random import get_random_bytes from requests import RequestException TRANSFER_PW = "https://transfer.pw" # server accepts message of 3750 chars, but overhead of base64 and metadata limits it to ~2660 MAX_TEXT_LENGTH = 2660 PBKDF_ITER = 2500 class TransferObject(TypedDict): """Parameters used by SJCL's encryption function""" v: int # version iter: int # PBKDF iterations salt: str # salt for PBKDF (bsae64) mode: str # block mode (GCM) cipher: str # cipher (AES) ks: int # key length ts: int # tag length (MAC) adata: str # authenticated date (not used) iv: str # IV for block mode (base64) ct: str # ciphertext + tag (base64) def generate_key() -> Tuple[str, bytes, bytes]: # generate random key of 25 alphanumeric chars pwd_input = get_random_bytes(64) pwd = b64encode(pwd_input).decode('UTF-8').translate({ord(char): None for char in "+=/"})[0:25] # SJCL's convenience function runs PBKDF2 on supplied password # so we need to do so as well (doesn't add security in our case) salt = get_random_bytes(16) key = PBKDF2(pwd, salt, count=PBKDF_ITER, dkLen=16, hmac_hash_module=SHA256) return pwd, salt, key def encrypt(key: bytes, data: bytes) -> Tuple[bytes, bytes]: iv = get_random_bytes(16) cipher = AES.new(key, AES.MODE_GCM, nonce=iv) ciphertext, tag = cipher.encrypt_and_digest(data) return iv, ciphertext + tag def create_transfer_object(salt: bytes, iv: bytes, ciphertext: bytes) -> TransferObject: return { "v": 1, "iter": PBKDF_ITER, "ks": 128, "ts": 128, "mode": "gcm", "cipher": "aes", "adata": "", "salt": b64encode(salt).decode('UTF-8'), "iv": b64encode(iv).decode('UTF-8'), "ct": b64encode(ciphertext).decode('UTF-8') } def send_object(obj: TransferObject) -> str: try: response = requests.post(TRANSFER_PW + "/save", data={"message": json.dumps(obj), "format": "json"}) answer = json.loads(response.text) except RequestException as ex: raise RuntimeError(f"Problem posting to server: {ex}") except JSONDecodeError as ex: raise RuntimeError(f"Unexpected response from server: {ex}") if "msgIsValid" not in answer or not answer["msgIsValid"]: raise RuntimeError("Server didn't like our request!?") if "msgIsStored" not in answer or not answer["msgIsStored"] or "id" not in answer: raise RuntimeError("Server didn't store our message") return answer["id"] def encrypt_and_send(file: BinaryIO, path: str, table_mode: bool): pwd, salt, key = generate_key() try: data = file.read() except IOError as ex: raise RuntimeError(f"Problem reading file: {ex}") if len(data) > MAX_TEXT_LENGTH: raise RuntimeError("plaintext too long") iv, ciphertext = encrypt(key, data) obj = create_transfer_object(salt, iv, ciphertext) msg_id = send_object(obj) if table_mode: print(path, end="\t") print(f"{TRANSFER_PW}/{msg_id}#{pwd}") parser = argparse.ArgumentParser() parser.add_argument('filepath', nargs='*', help="One or more paths to files, if omitted reads from stdin") parser.add_argument('-t', '--table', action='store_true', help="outputs filepath followed by URL [tab separated]") args = parser.parse_args() if len(args.filepath) == 0: # read from stdin try: encrypt_and_send(sys.stdin.buffer, "stdin", args.table) except RuntimeError as ex: print(f"Error while processing stdin: {ex}", file=sys.stderr) exit(1) else: exit_code = 0 for path in args.filepath: try: with open(path, "rb") as file: encrypt_and_send(file, path, args.table) except IOError as ex: print(f"Error opening {path}: {ex}", file=sys.stderr) exit_code = 1 except RuntimeError as ex: print(f"Error while processing {path}: {ex}", file=sys.stderr) exit_code = 1 exit(exit_code)