feat: initial commit
Some checks failed
build / checks-matrix (push) Has been cancelled
build / checks-build (push) Has been cancelled
build / codecov (push) Has been cancelled
docs / docs (push) Has been cancelled

This commit is contained in:
uttarayan21
2025-12-05 16:14:06 +05:30
commit 5d9f4d5e0a
15 changed files with 1110 additions and 0 deletions

1
.envrc Normal file
View File

@@ -0,0 +1 @@
use flake

62
.github/workflows/build.yaml vendored Normal file
View File

@@ -0,0 +1,62 @@
name: build
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
env:
CARGO_TERM_COLOR: always
jobs:
checks-matrix:
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- id: set-matrix
name: Generate Nix Matrix
run: |
set -Eeu
matrix="$(nix eval --json '.#githubActions.matrix')"
echo "matrix=$matrix" >> "$GITHUB_OUTPUT"
checks-build:
needs: checks-matrix
runs-on: ${{ matrix.os }}
strategy:
matrix: ${{fromJSON(needs.checks-matrix.outputs.matrix)}}
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- run: nix build -L '.#${{ matrix.attr }}'
codecov:
runs-on: ubuntu-latest
permissions:
id-token: "write"
contents: "read"
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: Run codecov
run: nix build .#checks.x86_64-linux.teewriter-llvm-cov
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4.0.1
with:
flags: unittests
name: codecov-hello
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
files: ./result
verbose: true

38
.github/workflows/docs.yaml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: docs
on:
push:
branches: [ master ]
env:
CARGO_TERM_COLOR: always
jobs:
docs:
runs-on: ubuntu-latest
permissions:
id-token: "write"
contents: "read"
pages: "write"
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
- uses: DeterminateSystems/magic-nix-cache-action@main
- uses: DeterminateSystems/flake-checker-action@main
- name: Generate docs
run: nix build .#checks.x86_64-linux.hello-docs
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: result/share/doc
- name: Deploy to gh-pages
id: deployment
uses: actions/deploy-pages@v4

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/result
/target
.direnv

187
Cargo.lock generated Normal file
View File

@@ -0,0 +1,187 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "bytes"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "memchr"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc"
dependencies = [
"proc-macro2",
]
[[package]]
name = "slab"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
[[package]]
name = "syn"
version = "2.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "teewriter"
version = "0.1.0"
dependencies = [
"futures",
"tokio",
]
[[package]]
name = "tokio"
version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
dependencies = [
"bytes",
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034"

31
Cargo.toml Normal file
View File

@@ -0,0 +1,31 @@
[package]
name = "teewriter"
version = "0.1.0"
edition = "2024"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
futures = { version = "0.3.31", optional = true }
tokio = { version = "1.48.0", features = [
"io-std",
"io-util",
], optional = true }
[dev-dependencies]
futures = "0.3.31"
tokio = { version = "1.48.0", features = [
"rt",
"macros",
"io-util",
"io-std",
"rt-multi-thread",
] }
[features]
tokio = ["dep:tokio"] # Enables tokio::io AsyncRead / AsyncWrite support
futures = ["dep:futures"] # Enables futures::io AsyncRead / AsyncWrite support
io-std = [] # Enables std::io Read / Write support
default = ["tokio", "io-std"]

136
flake.lock generated Normal file
View File

@@ -0,0 +1,136 @@
{
"nodes": {
"advisory-db": {
"flake": false,
"locked": {
"lastModified": 1764858199,
"narHash": "sha256-oxTIH77Kc2PVJKHQDFFImBiDzRkKZOg1q/E45yfYPFI=",
"owner": "rustsec",
"repo": "advisory-db",
"rev": "f414b4d1ff5df405ea74240bc8fc2e4ce5f0d6c3",
"type": "github"
},
"original": {
"owner": "rustsec",
"repo": "advisory-db",
"type": "github"
}
},
"crane": {
"locked": {
"lastModified": 1764903584,
"narHash": "sha256-RSkJtNtx0SEaQiYqsoFoRynwfZLo2OZ9z6rUq1DJR6g=",
"owner": "ipetkov",
"repo": "crane",
"rev": "2b3a5a88d852575758e1eb6ac9ee677fcd633fc1",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1737420293,
"narHash": "sha256-F1G5ifvqTpJq7fdkT34e/Jy9VCyzd5XfJ9TO8fHhJWE=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "f4158fa080ef4503c8f4c820967d946c2af31ec9",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1764667669,
"narHash": "sha256-7WUCZfmqLAssbDqwg9cUDAXrSoXN79eEEq17qhTNM/Y=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "418468ac9527e799809c900eda37cbff999199b6",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"advisory-db": "advisory-db",
"crane": "crane",
"flake-utils": "flake-utils",
"nix-github-actions": "nix-github-actions",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1764902447,
"narHash": "sha256-wNqkDBj+tjK619sTHPEA7uhjr7DHHEY8OsFou31dxy0=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "d914a744a83098eeb28125d2848ad383b209223f",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

157
flake.nix Normal file
View File

@@ -0,0 +1,157 @@
{
description = "A simple rust flake using rust-overlay and craneLib";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
crane.url = "github:ipetkov/crane";
nix-github-actions = {
url = "github:nix-community/nix-github-actions";
inputs.nixpkgs.follows = "nixpkgs";
};
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
advisory-db = {
url = "github:rustsec/advisory-db";
flake = false;
};
};
outputs = {
self,
crane,
flake-utils,
nixpkgs,
rust-overlay,
advisory-db,
nix-github-actions,
...
}:
flake-utils.lib.eachDefaultSystem (
system: let
pkgs = import nixpkgs {
inherit system;
overlays = [
rust-overlay.overlays.default
];
};
inherit (pkgs) lib;
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
name = cargoToml.package.name;
stableToolchain = pkgs.rust-bin.stable.latest.default;
stableToolchainWithLLvmTools = stableToolchain.override {
extensions = ["rust-src" "llvm-tools"];
};
stableToolchainWithRustAnalyzer = stableToolchain.override {
extensions = ["rust-src" "rust-analyzer"];
};
craneLib = (crane.mkLib pkgs).overrideToolchain stableToolchain;
craneLibLLvmTools = (crane.mkLib pkgs).overrideToolchain stableToolchainWithLLvmTools;
src = let
filterBySuffix = path: exts: lib.any (ext: lib.hasSuffix ext path) exts;
sourceFilters = path: type: (craneLib.filterCargoSources path type) || filterBySuffix path [".c" ".h" ".hpp" ".cpp" ".cc"];
in
lib.cleanSourceWith {
filter = sourceFilters;
src = ./.;
};
commonArgs =
{
inherit src;
pname = name;
stdenv = p: p.clangStdenv;
doCheck = false;
# LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib";
# nativeBuildInputs = with pkgs; [
# cmake
# llvmPackages.libclang.lib
# ];
buildInputs = with pkgs;
[]
++ (lib.optionals pkgs.stdenv.isDarwin [
libiconv
apple-sdk_26
]);
}
// (lib.optionalAttrs pkgs.stdenv.isLinux {
# BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.llvmPackages.libclang.lib}/lib/clang/18/include";
});
cargoArtifacts = craneLib.buildPackage commonArgs;
in {
checks =
{
"${name}-clippy" = craneLib.cargoClippy (commonArgs
// {
inherit cargoArtifacts;
cargoClippyExtraArgs = "--all-targets -- --deny warnings";
});
"${name}-docs" = craneLib.cargoDoc (commonArgs // {inherit cargoArtifacts;});
"${name}-fmt" = craneLib.cargoFmt {inherit src;};
"${name}-toml-fmt" = craneLib.taploFmt {
src = pkgs.lib.sources.sourceFilesBySuffices src [".toml"];
};
# Audit dependencies
"${name}-audit" = craneLib.cargoAudit {
inherit src advisory-db;
};
# Audit licenses
"${name}-deny" = craneLib.cargoDeny {
inherit src;
};
"${name}-nextest" = craneLib.cargoNextest (commonArgs
// {
inherit cargoArtifacts;
partitions = 1;
partitionType = "count";
});
}
// lib.optionalAttrs (!pkgs.stdenv.isDarwin) {
"${name}-llvm-cov" = craneLibLLvmTools.cargoLlvmCov (commonArgs // {inherit cargoArtifacts;});
};
packages = let
pkg = craneLib.buildPackage (commonArgs
// {inherit cargoArtifacts;}
// {
# postInstall = ''
# mkdir -p $out/bin
# mkdir -p $out/share/bash-completions
# mkdir -p $out/share/fish/vendor_completions.d
# mkdir -p $out/share/zsh/site-functions
# $out/bin/${name} completions bash > $out/share/bash-completions/${name}.bash
# $out/bin/${name} completions fish > $out/share/fish/vendor_completions.d/${name}.fish
# $out/bin/${name} completions zsh > $out/share/zsh/site-functions/_${name}
# '';
});
in {
"${name}" = pkg;
default = pkg;
};
devShells = {
default = pkgs.mkShell.override {stdenv = pkgs.clangStdenv;} (commonArgs
// {
packages = with pkgs;
[
stableToolchainWithRustAnalyzer
cargo-nextest
cargo-deny
]
++ (lib.optionals pkgs.stdenv.isDarwin [
apple-sdk_26
]);
});
};
}
)
// {
githubActions = nix-github-actions.lib.mkGithubMatrix {
checks = nixpkgs.lib.getAttrs ["x86_64-linux"] self.checks;
};
};
}

117
src/either.rs Normal file
View File

@@ -0,0 +1,117 @@
#[derive(Debug)]
pub enum Either<L, R> {
Left(L),
Right(R),
}
impl<L, R> Either<L, R> {
pub fn is_left(&self) -> bool {
matches!(self, Either::Left(_))
}
pub fn is_right(&self) -> bool {
matches!(self, Either::Right(_))
}
}
pub trait EitherExt<L, R>: Sized {
fn either(self) -> Either<L, R>;
fn or(self) -> Either<R, L>;
}
impl<L, R> EitherExt<L, R> for L {
fn either(self) -> Either<L, R> {
Either::Left(self)
}
fn or(self) -> Either<R, L> {
Either::Right(self)
}
}
#[cfg(feature = "futures")]
impl<L, R> futures::io::AsyncWrite for Either<L, R>
where
L: futures::io::AsyncWrite + Unpin,
R: futures::io::AsyncWrite + Unpin,
{
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_write(cx, buf),
Either::Right(r) => std::pin::Pin::new(r).poll_write(cx, buf),
}
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_flush(cx),
Either::Right(r) => std::pin::Pin::new(r).poll_flush(cx),
}
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_close(cx),
Either::Right(r) => std::pin::Pin::new(r).poll_close(cx),
}
}
}
#[cfg(feature = "tokio")]
impl<L, R> tokio::io::AsyncWrite for Either<L, R>
where
L: tokio::io::AsyncWrite + Unpin,
R: tokio::io::AsyncWrite + Unpin,
{
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_write(cx, buf),
Either::Right(r) => std::pin::Pin::new(r).poll_write(cx, buf),
}
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_flush(cx),
Either::Right(r) => std::pin::Pin::new(r).poll_flush(cx),
}
}
fn poll_shutdown(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match &mut *self {
Either::Left(l) => std::pin::Pin::new(l).poll_shutdown(cx),
Either::Right(r) => std::pin::Pin::new(r).poll_shutdown(cx),
}
}
}
#[test]
fn test_either_is_left_right() {
let left: Either<i32, &str> = Either::Left(42);
let right: Either<i32, &str> = Either::Right("hello");
assert!(left.is_left());
assert!(!left.is_right());
assert!(right.is_right());
assert!(!right.is_left());
}

7
src/errors.rs Normal file
View File

@@ -0,0 +1,7 @@
use error_stack::{Report, ResultExt};
#[derive(Debug, thiserror::Error)]
#[error("An error occurred")]
pub struct Error;
pub type Result<T, E = error_stack::Report<Error>> = core::result::Result<T, E>;

62
src/lib.rs Normal file
View File

@@ -0,0 +1,62 @@
//! Utilities for working with sync and async tee writers
//! This crate provides implementations for creating tee writers that can write to two or more underlying writers simultaneously.
//!
//! # Example with std::io::Write
//! #[cfg(feature = "io-std")]
//! ```rust
//! use teewriter::TeeWriterExt;
//! use std::io::Write;
//! let mut writer1 = Vec::new();
//! let mut writer2 = Vec::new();
//! let mut tee = writer1.tee(&mut writer2);
//! tee.write_all(b"Hello, world!").unwrap();
//! let (writer1, writer2, offset) = tee.into_inner();
//! if offset != 0 {
//! panic!("Writers are out of sync by {}", offset);
//! }
//! assert_eq!(writer1, b"Hello, world!");
//! assert_eq!(writer2, b"Hello, world!");
//! ```
//! # Example with tokio::io::AsyncWrite
//! #[cfg(feature = "tokio")]
//! ```rust
//! use teewriter::TeeWriterExt;
//! use tokio::io::AsyncWriteExt;
//! #[tokio::main]
//! async fn main() {
//! let mut writer1 = tokio::io::sink();
//! let mut writer2 = tokio::io::sink();
//! let mut tee = writer1.tee(&mut writer2);
//! tee.write_all(b"Hello, async world!").await.unwrap();
//! let (writer1, writer2, offset) = tee.into_inner();
//! if offset != 0 {
//! panic!("Writers are out of sync by {}", offset);
//! }
//! }
//! ```
//!
//! # Example with futures::io::AsyncWrite
//! #[cfg(feature = "futures")]
//! ```rust
//! use teewriter::TeeWriterExt;
//! use futures::io::AsyncWriteExt;
//! use futures::executor::block_on;
//! fn main() {
//! let mut writer1 = futures::io::sink();
//! let mut writer2 = futures::io::sink();
//! let mut tee = writer1.tee(&mut writer2);
//! block_on(tee.write_all(b"Hello, futures world!")).unwrap();
//! let (writer1, writer2, offset) = tee.into_inner();
//! if offset != 0 {
//! panic!("Writers are out of sync by {}", offset);
//! }
//! assert_eq!(offset, 0);
//! }
//!
pub mod either;
pub mod tee;
#[doc(inline)]
pub use either::*;
#[doc(inline)]
pub use tee::*;

54
src/tee.rs Normal file
View File

@@ -0,0 +1,54 @@
#[cfg(feature = "futures")]
mod futures_impl;
#[cfg(feature = "io-std")]
mod std_impl;
#[cfg(feature = "tokio")]
mod tokio_impl;
/// A writer that writes to two underlying writers, in sync
#[derive(Debug)]
pub struct TeeWriter<This, Other> {
pub(crate) this: This,
pub(crate) other: Other,
pub(crate) offset: isize,
}
/// Extension trait to add `tee` method to any writer
///
/// # Example
/// ```rust
/// use teewriter::TeeWriterExt;
/// use std::io::Write;
/// let mut writer1 = Vec::new();
/// let mut writer2 = Vec::new();
/// let mut tee = writer1.tee(&mut writer2);
/// tee.write_all(b"Hello, world!").unwrap();
/// let (writer1, writer2, offset) = tee.into_inner();
/// if offset != 0 {
/// panic!("Writers are out of sync by {}", offset);
/// }
/// assert_eq!(writer1, b"Hello, world!");
/// assert_eq!(writer2, b"Hello, world!");
/// ```
pub trait TeeWriterExt: Sized {
fn tee<Other>(self, other: Other) -> TeeWriter<Self, Other> {
TeeWriter {
this: self,
other,
offset: 0,
}
}
}
impl<T> TeeWriterExt for T {}
impl<This, Other> TeeWriter<This, Other> {
/// Consumes the TeeWriter and returns the inner writers and the current offset.
/// Offset indicates how many more bytes have been written to `this` than `other`.
/// If offset is positive, `this` has written more bytes.
/// If offset is negative, `other` has written more bytes.
/// If offset is zero, both writers are in sync.
pub fn into_inner(self) -> (This, Other, isize) {
(self.this, self.other, self.offset)
}
}

106
src/tee/futures_impl.rs Normal file
View File

@@ -0,0 +1,106 @@
use super::TeeWriter;
#[cfg(feature = "futures")]
impl<This, Other> futures::io::AsyncWrite for TeeWriter<This, Other>
where
This: futures::io::AsyncWrite + Unpin,
Other: futures::io::AsyncWrite + Unpin,
{
fn poll_write(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &[u8],
) -> core::task::Poll<futures::io::Result<usize>> {
use core::task::Poll::*;
if self.offset > 0 {
let offset = self.offset as usize;
let offset = core::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?;
if let Ready(written) = written {
if written == 0 {
return Pending;
}
self.offset -= written as isize;
}
return written.map(Ok);
} else if self.offset < 0 {
let offset = (-self.offset) as usize;
let offset = core::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?;
if let Ready(written) = written {
if written == 0 {
return Pending;
}
self.offset += written as isize;
}
return written.map(Ok);
}
let this_written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?;
let other_written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?;
match (this_written, other_written) {
(Ready(this), Ready(other)) => {
if this == other {
Ready(Ok(this))
} else {
let offset = this as isize - other as isize;
self.offset += offset;
let min = core::cmp::min(this, other);
// next call should write to the one that wrote less
Ready(Ok(min))
}
}
(Ready(this), Pending) => {
// next call should write to other
self.offset = this as isize;
Pending
}
(Pending, Ready(other)) => {
// next call should write to this
self.offset = -(other as isize);
Pending
}
(Pending, Pending) => Pending,
}
}
fn poll_flush(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<futures::io::Result<()>> {
let this_flushed = core::pin::Pin::new(&mut self.this).poll_flush(cx)?;
let other_flushed = core::pin::Pin::new(&mut self.other).poll_flush(cx)?;
if this_flushed.is_ready() && other_flushed.is_ready() {
core::task::Poll::Ready(Ok(()))
} else {
core::task::Poll::Pending
}
}
fn poll_close(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<futures::io::Result<()>> {
let this_close = core::pin::Pin::new(&mut self.this).poll_close(cx)?;
let other_close = core::pin::Pin::new(&mut self.other).poll_close(cx)?;
if this_close.is_ready() && other_close.is_ready() {
core::task::Poll::Ready(Ok(()))
} else {
core::task::Poll::Pending
}
}
}
// #[cfg(feature = "futures")]
// #[cfg(test)]
// #[futures::test]
// async fn test_poll_uneven() {
// let sink = futures::io::sink();
// use futures::io::AsyncWriteExt;
// let mut stream = futures::io::SimplexStream::new_unsplit(5).tee(sink);
// stream.write_all(b"Hello, world!").await.unwrap();
// }

41
src/tee/std_impl.rs Normal file
View File

@@ -0,0 +1,41 @@
use super::TeeWriter;
#[cfg(feature = "io-std")]
impl<This, Other> std::io::Write for TeeWriter<This, Other>
where
This: std::io::Write,
Other: std::io::Write,
{
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.offset > 0 {
let offset = self.offset as usize;
let offset = std::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = self.this.write(buf)?;
self.offset -= written as isize;
return Ok(written);
} else if self.offset < 0 {
let offset = (-self.offset) as usize;
let offset = std::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = self.other.write(buf)?;
self.offset += written as isize;
return Ok(written);
}
let this_written = self.this.write(buf)?;
let other_written = self.other.write(buf)?;
if this_written == other_written {
Ok(this_written)
} else {
let offset = this_written as isize - other_written as isize;
self.offset += offset;
let min = std::cmp::min(this_written, other_written);
// next call should write to the one that wrote less
Ok(min)
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.this.flush()?;
self.other.flush()
}
}

108
src/tee/tokio_impl.rs Normal file
View File

@@ -0,0 +1,108 @@
use super::TeeWriter;
#[cfg(feature = "tokio")]
impl<This, Other> tokio::io::AsyncWrite for TeeWriter<This, Other>
where
This: tokio::io::AsyncWrite + Unpin,
Other: tokio::io::AsyncWrite + Unpin,
{
fn poll_write(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &[u8],
) -> core::task::Poll<std::io::Result<usize>> {
use core::task::Poll::*;
if self.offset > 0 {
let offset = self.offset as usize;
let offset = core::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?;
if let Ready(written) = written {
if written == 0 {
return Pending;
}
self.offset -= written as isize;
}
return written.map(Ok);
} else if self.offset < 0 {
let offset = (-self.offset) as usize;
let offset = core::cmp::min(offset, buf.len());
let buf = &buf[offset..];
let written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?;
if let Ready(written) = written {
if written == 0 {
return Pending;
}
self.offset += written as isize;
}
return written.map(Ok);
}
let this_written = core::pin::Pin::new(&mut self.this).poll_write(cx, buf)?;
let other_written = core::pin::Pin::new(&mut self.other).poll_write(cx, buf)?;
match (this_written, other_written) {
(Ready(this), Ready(other)) => {
if this == other {
Ready(Ok(this))
} else {
let offset = this as isize - other as isize;
self.offset += offset;
let min = core::cmp::min(this, other);
// next call should write to the one that wrote less
Ready(Ok(min))
}
}
(Ready(this), Pending) => {
// next call should write to other
self.offset = this as isize;
Pending
}
(Pending, Ready(other)) => {
// next call should write to this
self.offset = -(other as isize);
Pending
}
(Pending, Pending) => Pending,
}
}
fn poll_flush(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>> {
let this_flushed = core::pin::Pin::new(&mut self.this).poll_flush(cx)?;
let other_flushed = core::pin::Pin::new(&mut self.other).poll_flush(cx)?;
if this_flushed.is_ready() && other_flushed.is_ready() {
core::task::Poll::Ready(Ok(()))
} else {
core::task::Poll::Pending
}
}
fn poll_shutdown(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<std::io::Result<()>> {
let this_shutdown = core::pin::Pin::new(&mut self.this).poll_shutdown(cx)?;
let other_shutdown = core::pin::Pin::new(&mut self.other).poll_shutdown(cx)?;
if this_shutdown.is_ready() && other_shutdown.is_ready() {
core::task::Poll::Ready(Ok(()))
} else {
core::task::Poll::Pending
}
}
}
#[cfg(feature = "tokio")]
#[cfg(test)]
#[tokio::test]
async fn test_poll_uneven() {
let sink = tokio::io::sink();
use crate::tee::TeeWriterExt;
use tokio::io::AsyncWriteExt;
let mut stream = tokio::io::SimplexStream::new_unsplit(20).tee(sink);
stream.write_all(b"Hello, world!").await.unwrap();
// let out = stream.read_until_end().await.unwrap();
}