commit 5d9f4d5e0afbf3568b71ffff1425cf62ed82f80a Author: uttarayan21 Date: Fri Dec 5 16:14:06 2025 +0530 feat: initial commit diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..42334da --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,62 @@ +name: build + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +env: + CARGO_TERM_COLOR: always + +jobs: + checks-matrix: + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + steps: + - uses: actions/checkout@v4 + - uses: DeterminateSystems/nix-installer-action@main + - uses: DeterminateSystems/magic-nix-cache-action@main + - id: set-matrix + name: Generate Nix Matrix + run: | + set -Eeu + matrix="$(nix eval --json '.#githubActions.matrix')" + echo "matrix=$matrix" >> "$GITHUB_OUTPUT" + + checks-build: + needs: checks-matrix + runs-on: ${{ matrix.os }} + strategy: + matrix: ${{fromJSON(needs.checks-matrix.outputs.matrix)}} + steps: + - uses: actions/checkout@v4 + - uses: DeterminateSystems/nix-installer-action@main + - uses: DeterminateSystems/magic-nix-cache-action@main + - run: nix build -L '.#${{ matrix.attr }}' + + codecov: + runs-on: ubuntu-latest + permissions: + id-token: "write" + contents: "read" + + steps: + - uses: actions/checkout@v4 + - uses: DeterminateSystems/nix-installer-action@main + - uses: DeterminateSystems/magic-nix-cache-action@main + + - name: Run codecov + run: nix build .#checks.x86_64-linux.teewriter-llvm-cov + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4.0.1 + with: + flags: unittests + name: codecov-hello + fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} + files: ./result + verbose: true + diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml new file mode 100644 index 0000000..c7048c4 --- /dev/null +++ b/.github/workflows/docs.yaml @@ -0,0 +1,38 @@ +name: docs + +on: + push: + branches: [ master ] + +env: + CARGO_TERM_COLOR: always + +jobs: + docs: + runs-on: ubuntu-latest + permissions: + id-token: "write" + contents: "read" + pages: "write" + + steps: + - uses: actions/checkout@v4 + - uses: DeterminateSystems/nix-installer-action@main + - uses: DeterminateSystems/magic-nix-cache-action@main + - uses: DeterminateSystems/flake-checker-action@main + + - name: Generate docs + run: nix build .#checks.x86_64-linux.hello-docs + + - name: Setup Pages + uses: actions/configure-pages@v5 + + - name: Upload artifact + uses: actions/upload-pages-artifact@v3 + with: + path: result/share/doc + + - name: Deploy to gh-pages + id: deployment + uses: actions/deploy-pages@v4 + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6210698 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/result +/target +.direnv diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..4a3a458 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,187 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "bytes" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "memchr" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + +[[package]] +name = "syn" +version = "2.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "teewriter" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + +[[package]] +name = "tokio" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +dependencies = [ + "bytes", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4ff8b06 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "teewriter" +version = "0.1.0" +edition = "2024" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +futures = { version = "0.3.31", optional = true } +tokio = { version = "1.48.0", features = [ + "io-std", + "io-util", +], optional = true } + +[dev-dependencies] +futures = "0.3.31" +tokio = { version = "1.48.0", features = [ + "rt", + "macros", + "io-util", + "io-std", + "rt-multi-thread", +] } + +[features] +tokio = ["dep:tokio"] # Enables tokio::io AsyncRead / AsyncWrite support +futures = ["dep:futures"] # Enables futures::io AsyncRead / AsyncWrite support +io-std = [] # Enables std::io Read / Write support +default = ["tokio", "io-std"] diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..1b367c5 --- /dev/null +++ b/flake.lock @@ -0,0 +1,136 @@ +{ + "nodes": { + "advisory-db": { + "flake": false, + "locked": { + "lastModified": 1764858199, + "narHash": "sha256-oxTIH77Kc2PVJKHQDFFImBiDzRkKZOg1q/E45yfYPFI=", + "owner": "rustsec", + "repo": "advisory-db", + "rev": "f414b4d1ff5df405ea74240bc8fc2e4ce5f0d6c3", + "type": "github" + }, + "original": { + "owner": "rustsec", + "repo": "advisory-db", + "type": "github" + } + }, + "crane": { + "locked": { + "lastModified": 1764903584, + "narHash": "sha256-RSkJtNtx0SEaQiYqsoFoRynwfZLo2OZ9z6rUq1DJR6g=", + "owner": "ipetkov", + "repo": "crane", + "rev": "2b3a5a88d852575758e1eb6ac9ee677fcd633fc1", + "type": "github" + }, + "original": { + "owner": "ipetkov", + "repo": "crane", + "type": "github" + } + }, + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nix-github-actions": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1737420293, + "narHash": "sha256-F1G5ifvqTpJq7fdkT34e/Jy9VCyzd5XfJ9TO8fHhJWE=", + "owner": "nix-community", + "repo": "nix-github-actions", + "rev": "f4158fa080ef4503c8f4c820967d946c2af31ec9", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "nix-github-actions", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1764667669, + "narHash": "sha256-7WUCZfmqLAssbDqwg9cUDAXrSoXN79eEEq17qhTNM/Y=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "418468ac9527e799809c900eda37cbff999199b6", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "advisory-db": "advisory-db", + "crane": "crane", + "flake-utils": "flake-utils", + "nix-github-actions": "nix-github-actions", + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1764902447, + "narHash": "sha256-wNqkDBj+tjK619sTHPEA7uhjr7DHHEY8OsFou31dxy0=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "d914a744a83098eeb28125d2848ad383b209223f", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..55a9146 --- /dev/null +++ b/flake.nix @@ -0,0 +1,157 @@ +{ + description = "A simple rust flake using rust-overlay and craneLib"; + + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + crane.url = "github:ipetkov/crane"; + nix-github-actions = { + url = "github:nix-community/nix-github-actions"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + rust-overlay = { + url = "github:oxalica/rust-overlay"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + advisory-db = { + url = "github:rustsec/advisory-db"; + flake = false; + }; + }; + + outputs = { + self, + crane, + flake-utils, + nixpkgs, + rust-overlay, + advisory-db, + nix-github-actions, + ... + }: + flake-utils.lib.eachDefaultSystem ( + system: let + pkgs = import nixpkgs { + inherit system; + overlays = [ + rust-overlay.overlays.default + ]; + }; + inherit (pkgs) lib; + cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml); + name = cargoToml.package.name; + + stableToolchain = pkgs.rust-bin.stable.latest.default; + stableToolchainWithLLvmTools = stableToolchain.override { + extensions = ["rust-src" "llvm-tools"]; + }; + stableToolchainWithRustAnalyzer = stableToolchain.override { + extensions = ["rust-src" "rust-analyzer"]; + }; + craneLib = (crane.mkLib pkgs).overrideToolchain stableToolchain; + craneLibLLvmTools = (crane.mkLib pkgs).overrideToolchain stableToolchainWithLLvmTools; + + src = let + filterBySuffix = path: exts: lib.any (ext: lib.hasSuffix ext path) exts; + sourceFilters = path: type: (craneLib.filterCargoSources path type) || filterBySuffix path [".c" ".h" ".hpp" ".cpp" ".cc"]; + in + lib.cleanSourceWith { + filter = sourceFilters; + src = ./.; + }; + commonArgs = + { + inherit src; + pname = name; + stdenv = p: p.clangStdenv; + doCheck = false; + # LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib"; + # nativeBuildInputs = with pkgs; [ + # cmake + # llvmPackages.libclang.lib + # ]; + buildInputs = with pkgs; + [] + ++ (lib.optionals pkgs.stdenv.isDarwin [ + libiconv + apple-sdk_26 + ]); + } + // (lib.optionalAttrs pkgs.stdenv.isLinux { + # BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.llvmPackages.libclang.lib}/lib/clang/18/include"; + }); + cargoArtifacts = craneLib.buildPackage commonArgs; + in { + checks = + { + "${name}-clippy" = craneLib.cargoClippy (commonArgs + // { + inherit cargoArtifacts; + cargoClippyExtraArgs = "--all-targets -- --deny warnings"; + }); + "${name}-docs" = craneLib.cargoDoc (commonArgs // {inherit cargoArtifacts;}); + "${name}-fmt" = craneLib.cargoFmt {inherit src;}; + "${name}-toml-fmt" = craneLib.taploFmt { + src = pkgs.lib.sources.sourceFilesBySuffices src [".toml"]; + }; + # Audit dependencies + "${name}-audit" = craneLib.cargoAudit { + inherit src advisory-db; + }; + + # Audit licenses + "${name}-deny" = craneLib.cargoDeny { + inherit src; + }; + "${name}-nextest" = craneLib.cargoNextest (commonArgs + // { + inherit cargoArtifacts; + partitions = 1; + partitionType = "count"; + }); + } + // lib.optionalAttrs (!pkgs.stdenv.isDarwin) { + "${name}-llvm-cov" = craneLibLLvmTools.cargoLlvmCov (commonArgs // {inherit cargoArtifacts;}); + }; + + packages = let + pkg = craneLib.buildPackage (commonArgs + // {inherit cargoArtifacts;} + // { + # postInstall = '' + # mkdir -p $out/bin + # mkdir -p $out/share/bash-completions + # mkdir -p $out/share/fish/vendor_completions.d + # mkdir -p $out/share/zsh/site-functions + # $out/bin/${name} completions bash > $out/share/bash-completions/${name}.bash + # $out/bin/${name} completions fish > $out/share/fish/vendor_completions.d/${name}.fish + # $out/bin/${name} completions zsh > $out/share/zsh/site-functions/_${name} + # ''; + }); + in { + "${name}" = pkg; + default = pkg; + }; + + devShells = { + default = pkgs.mkShell.override {stdenv = pkgs.clangStdenv;} (commonArgs + // { + packages = with pkgs; + [ + stableToolchainWithRustAnalyzer + cargo-nextest + cargo-deny + ] + ++ (lib.optionals pkgs.stdenv.isDarwin [ + apple-sdk_26 + ]); + }); + }; + } + ) + // { + githubActions = nix-github-actions.lib.mkGithubMatrix { + checks = nixpkgs.lib.getAttrs ["x86_64-linux"] self.checks; + }; + }; +} diff --git a/src/either.rs b/src/either.rs new file mode 100644 index 0000000..3122f95 --- /dev/null +++ b/src/either.rs @@ -0,0 +1,117 @@ +#[derive(Debug)] +pub enum Either { + Left(L), + Right(R), +} + +impl Either { + pub fn is_left(&self) -> bool { + matches!(self, Either::Left(_)) + } + + pub fn is_right(&self) -> bool { + matches!(self, Either::Right(_)) + } +} + +pub trait EitherExt: Sized { + fn either(self) -> Either; + fn or(self) -> Either; +} + +impl EitherExt for L { + fn either(self) -> Either { + Either::Left(self) + } + fn or(self) -> Either { + Either::Right(self) + } +} + +#[cfg(feature = "futures")] +impl futures::io::AsyncWrite for Either +where + L: futures::io::AsyncWrite + Unpin, + R: futures::io::AsyncWrite + Unpin, +{ + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_write(cx, buf), + Either::Right(r) => std::pin::Pin::new(r).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_flush(cx), + Either::Right(r) => std::pin::Pin::new(r).poll_flush(cx), + } + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_close(cx), + Either::Right(r) => std::pin::Pin::new(r).poll_close(cx), + } + } +} + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncWrite for Either +where + L: tokio::io::AsyncWrite + Unpin, + R: tokio::io::AsyncWrite + Unpin, +{ + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_write(cx, buf), + Either::Right(r) => std::pin::Pin::new(r).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_flush(cx), + Either::Right(r) => std::pin::Pin::new(r).poll_flush(cx), + } + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + Either::Left(l) => std::pin::Pin::new(l).poll_shutdown(cx), + Either::Right(r) => std::pin::Pin::new(r).poll_shutdown(cx), + } + } +} + +#[test] +fn test_either_is_left_right() { + let left: Either = Either::Left(42); + let right: Either = Either::Right("hello"); + + assert!(left.is_left()); + assert!(!left.is_right()); + + assert!(right.is_right()); + assert!(!right.is_left()); +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..9b1f578 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,7 @@ +use error_stack::{Report, ResultExt}; + +#[derive(Debug, thiserror::Error)] +#[error("An error occurred")] +pub struct Error; + +pub type Result> = core::result::Result; diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..b286c31 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,62 @@ +//! Utilities for working with sync and async tee writers +//! This crate provides implementations for creating tee writers that can write to two or more underlying writers simultaneously. +//! +//! # Example with std::io::Write +//! #[cfg(feature = "io-std")] +//! ```rust +//! use teewriter::TeeWriterExt; +//! use std::io::Write; +//! let mut writer1 = Vec::new(); +//! let mut writer2 = Vec::new(); +//! let mut tee = writer1.tee(&mut writer2); +//! tee.write_all(b"Hello, world!").unwrap(); +//! let (writer1, writer2, offset) = tee.into_inner(); +//! if offset != 0 { +//! panic!("Writers are out of sync by {}", offset); +//! } +//! assert_eq!(writer1, b"Hello, world!"); +//! assert_eq!(writer2, b"Hello, world!"); +//! ``` +//! # Example with tokio::io::AsyncWrite +//! #[cfg(feature = "tokio")] +//! ```rust +//! use teewriter::TeeWriterExt; +//! use tokio::io::AsyncWriteExt; +//! #[tokio::main] +//! async fn main() { +//! let mut writer1 = tokio::io::sink(); +//! let mut writer2 = tokio::io::sink(); +//! let mut tee = writer1.tee(&mut writer2); +//! tee.write_all(b"Hello, async world!").await.unwrap(); +//! let (writer1, writer2, offset) = tee.into_inner(); +//! if offset != 0 { +//! panic!("Writers are out of sync by {}", offset); +//! } +//! } +//! ``` +//! +//! # Example with futures::io::AsyncWrite +//! #[cfg(feature = "futures")] +//! ```rust +//! use teewriter::TeeWriterExt; +//! use futures::io::AsyncWriteExt; +//! use futures::executor::block_on; +//! fn main() { +//! let mut writer1 = futures::io::sink(); +//! let mut writer2 = futures::io::sink(); +//! let mut tee = writer1.tee(&mut writer2); +//! block_on(tee.write_all(b"Hello, futures world!")).unwrap(); +//! let (writer1, writer2, offset) = tee.into_inner(); +//! if offset != 0 { +//! panic!("Writers are out of sync by {}", offset); +//! } +//! assert_eq!(offset, 0); +//! } +//! +pub mod either; +pub mod tee; + +#[doc(inline)] +pub use either::*; +#[doc(inline)] +pub use tee::*; diff --git a/src/tee.rs b/src/tee.rs new file mode 100644 index 0000000..c25fff8 --- /dev/null +++ b/src/tee.rs @@ -0,0 +1,54 @@ +#[cfg(feature = "futures")] +mod futures_impl; +#[cfg(feature = "io-std")] +mod std_impl; +#[cfg(feature = "tokio")] +mod tokio_impl; + +/// A writer that writes to two underlying writers, in sync +#[derive(Debug)] +pub struct TeeWriter { + pub(crate) this: This, + pub(crate) other: Other, + pub(crate) offset: isize, +} + +/// Extension trait to add `tee` method to any writer +/// +/// # Example +/// ```rust +/// use teewriter::TeeWriterExt; +/// use std::io::Write; +/// let mut writer1 = Vec::new(); +/// let mut writer2 = Vec::new(); +/// let mut tee = writer1.tee(&mut writer2); +/// tee.write_all(b"Hello, world!").unwrap(); +/// let (writer1, writer2, offset) = tee.into_inner(); +/// if offset != 0 { +/// panic!("Writers are out of sync by {}", offset); +/// } +/// assert_eq!(writer1, b"Hello, world!"); +/// assert_eq!(writer2, b"Hello, world!"); +/// ``` +pub trait TeeWriterExt: Sized { + fn tee(self, other: Other) -> TeeWriter { + TeeWriter { + this: self, + other, + offset: 0, + } + } +} + +impl TeeWriterExt for T {} + +impl TeeWriter { + /// Consumes the TeeWriter and returns the inner writers and the current offset. + /// Offset indicates how many more bytes have been written to `this` than `other`. + /// If offset is positive, `this` has written more bytes. + /// If offset is negative, `other` has written more bytes. + /// If offset is zero, both writers are in sync. + pub fn into_inner(self) -> (This, Other, isize) { + (self.this, self.other, self.offset) + } +} diff --git a/src/tee/futures_impl.rs b/src/tee/futures_impl.rs new file mode 100644 index 0000000..f936dbc --- /dev/null +++ b/src/tee/futures_impl.rs @@ -0,0 +1,106 @@ +use super::TeeWriter; + +#[cfg(feature = "futures")] +impl futures::io::AsyncWrite for TeeWriter +where + This: futures::io::AsyncWrite + Unpin, + Other: futures::io::AsyncWrite + Unpin, +{ + fn poll_write( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + buf: &[u8], + ) -> core::task::Poll> { + use core::task::Poll::*; + if self.offset > 0 { + let offset = self.offset as usize; + let offset = core::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?; + if let Ready(written) = written { + if written == 0 { + return Pending; + } + self.offset -= written as isize; + } + return written.map(Ok); + } else if self.offset < 0 { + let offset = (-self.offset) as usize; + let offset = core::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?; + if let Ready(written) = written { + if written == 0 { + return Pending; + } + self.offset += written as isize; + } + return written.map(Ok); + } + let this_written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?; + let other_written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?; + + match (this_written, other_written) { + (Ready(this), Ready(other)) => { + if this == other { + Ready(Ok(this)) + } else { + let offset = this as isize - other as isize; + self.offset += offset; + let min = core::cmp::min(this, other); + // next call should write to the one that wrote less + Ready(Ok(min)) + } + } + (Ready(this), Pending) => { + // next call should write to other + self.offset = this as isize; + Pending + } + (Pending, Ready(other)) => { + // next call should write to this + self.offset = -(other as isize); + Pending + } + (Pending, Pending) => Pending, + } + } + + fn poll_flush( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let this_flushed = core::pin::Pin::new(&mut self.this).poll_flush(cx)?; + let other_flushed = core::pin::Pin::new(&mut self.other).poll_flush(cx)?; + + if this_flushed.is_ready() && other_flushed.is_ready() { + core::task::Poll::Ready(Ok(())) + } else { + core::task::Poll::Pending + } + } + + fn poll_close( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let this_close = core::pin::Pin::new(&mut self.this).poll_close(cx)?; + let other_close = core::pin::Pin::new(&mut self.other).poll_close(cx)?; + + if this_close.is_ready() && other_close.is_ready() { + core::task::Poll::Ready(Ok(())) + } else { + core::task::Poll::Pending + } + } +} + +// #[cfg(feature = "futures")] +// #[cfg(test)] +// #[futures::test] +// async fn test_poll_uneven() { +// let sink = futures::io::sink(); +// use futures::io::AsyncWriteExt; +// let mut stream = futures::io::SimplexStream::new_unsplit(5).tee(sink); +// stream.write_all(b"Hello, world!").await.unwrap(); +// } diff --git a/src/tee/std_impl.rs b/src/tee/std_impl.rs new file mode 100644 index 0000000..05ea727 --- /dev/null +++ b/src/tee/std_impl.rs @@ -0,0 +1,41 @@ +use super::TeeWriter; +#[cfg(feature = "io-std")] +impl std::io::Write for TeeWriter +where + This: std::io::Write, + Other: std::io::Write, +{ + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if self.offset > 0 { + let offset = self.offset as usize; + let offset = std::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = self.this.write(buf)?; + self.offset -= written as isize; + return Ok(written); + } else if self.offset < 0 { + let offset = (-self.offset) as usize; + let offset = std::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = self.other.write(buf)?; + self.offset += written as isize; + return Ok(written); + } + let this_written = self.this.write(buf)?; + let other_written = self.other.write(buf)?; + + if this_written == other_written { + Ok(this_written) + } else { + let offset = this_written as isize - other_written as isize; + self.offset += offset; + let min = std::cmp::min(this_written, other_written); + // next call should write to the one that wrote less + Ok(min) + } + } + fn flush(&mut self) -> std::io::Result<()> { + self.this.flush()?; + self.other.flush() + } +} diff --git a/src/tee/tokio_impl.rs b/src/tee/tokio_impl.rs new file mode 100644 index 0000000..4c77714 --- /dev/null +++ b/src/tee/tokio_impl.rs @@ -0,0 +1,108 @@ +use super::TeeWriter; + +#[cfg(feature = "tokio")] +impl tokio::io::AsyncWrite for TeeWriter +where + This: tokio::io::AsyncWrite + Unpin, + Other: tokio::io::AsyncWrite + Unpin, +{ + fn poll_write( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + buf: &[u8], + ) -> core::task::Poll> { + use core::task::Poll::*; + if self.offset > 0 { + let offset = self.offset as usize; + let offset = core::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?; + if let Ready(written) = written { + if written == 0 { + return Pending; + } + self.offset -= written as isize; + } + return written.map(Ok); + } else if self.offset < 0 { + let offset = (-self.offset) as usize; + let offset = core::cmp::min(offset, buf.len()); + let buf = &buf[offset..]; + let written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?; + if let Ready(written) = written { + if written == 0 { + return Pending; + } + self.offset += written as isize; + } + return written.map(Ok); + } + let this_written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?; + let other_written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?; + + match (this_written, other_written) { + (Ready(this), Ready(other)) => { + if this == other { + Ready(Ok(this)) + } else { + let offset = this as isize - other as isize; + self.offset += offset; + let min = core::cmp::min(this, other); + // next call should write to the one that wrote less + Ready(Ok(min)) + } + } + (Ready(this), Pending) => { + // next call should write to other + self.offset = this as isize; + Pending + } + (Pending, Ready(other)) => { + // next call should write to this + self.offset = -(other as isize); + Pending + } + (Pending, Pending) => Pending, + } + } + + fn poll_flush( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let this_flushed = core::pin::Pin::new(&mut self.this).poll_flush(cx)?; + let other_flushed = core::pin::Pin::new(&mut self.other).poll_flush(cx)?; + + if this_flushed.is_ready() && other_flushed.is_ready() { + core::task::Poll::Ready(Ok(())) + } else { + core::task::Poll::Pending + } + } + + fn poll_shutdown( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let this_shutdown = core::pin::Pin::new(&mut self.this).poll_shutdown(cx)?; + let other_shutdown = core::pin::Pin::new(&mut self.other).poll_shutdown(cx)?; + + if this_shutdown.is_ready() && other_shutdown.is_ready() { + core::task::Poll::Ready(Ok(())) + } else { + core::task::Poll::Pending + } + } +} + +#[cfg(feature = "tokio")] +#[cfg(test)] +#[tokio::test] +async fn test_poll_uneven() { + let sink = tokio::io::sink(); + use crate::tee::TeeWriterExt; + use tokio::io::AsyncWriteExt; + let mut stream = tokio::io::SimplexStream::new_unsplit(20).tee(sink); + stream.write_all(b"Hello, world!").await.unwrap(); + // let out = stream.read_until_end().await.unwrap(); +}