diff --git a/cmd/ww/cat/cat.go b/cmd/ww/cat/cat.go index 8bd9b4c..0b0a94e 100644 --- a/cmd/ww/cat/cat.go +++ b/cmd/ww/cat/cat.go @@ -22,7 +22,7 @@ var env util.IPFSEnv func Command() *cli.Command { return &cli.Command{ Name: "cat", - ArgsUsage: " ", + ArgsUsage: " [method]", Usage: "Connect to a peer and execute a procedure over a stream", Description: `Connect to a specified peer and execute a procedure over a custom protocol stream. The command will: @@ -33,7 +33,8 @@ The command will: Examples: ww cat QmPeer123 /echo - ww cat 12D3KooW... /myproc`, + ww cat 12D3KooW... /myproc echo + ww cat 12D3KooW... /myproc poll`, Flags: append([]cli.Flag{ &cli.StringFlag{ Name: "ipfs", @@ -58,12 +59,13 @@ func Main(c *cli.Context) error { ctx, cancel := context.WithTimeout(c.Context, c.Duration("timeout")) defer cancel() - if c.NArg() != 2 { - return cli.Exit("cat requires exactly two arguments: ", 1) + if c.NArg() < 3 { + return cli.Exit("cat requires 2-3 arguments: [method]", 1) } peerIDStr := c.Args().Get(0) procName := c.Args().Get(1) + method := c.Args().Get(2) // Parse peer ID peerID, err := peer.Decode(peerIDStr) @@ -73,6 +75,9 @@ func Main(c *cli.Context) error { // Construct protocol ID protocolID := protocol.ID("/ww/0.1.0/" + procName) + if method != "" && method != "poll" { + protocolID = protocol.ID("/ww/0.1.0/" + procName + "/" + method) + } // Create libp2p host in client mode h, err := util.NewClient() diff --git a/cmd/ww/run/run.go b/cmd/ww/run/run.go index 2852c1e..760529f 100644 --- a/cmd/ww/run/run.go +++ b/cmd/ww/run/run.go @@ -9,11 +9,13 @@ import ( "os" "os/exec" "path/filepath" + "strings" "github.com/ipfs/boxo/path" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/experimental/sys" "github.com/urfave/cli/v2" @@ -134,20 +136,39 @@ func Main(c *cli.Context) error { "peer", env.Host.ID(), "endpoint", p.Endpoint.Name) - env.Host.SetStreamHandler(p.Endpoint.Protocol(), func(s network.Stream) { + // Set up stream handler that matches both exact protocol and with method suffix + baseProto := p.Endpoint.Protocol() + env.Host.SetStreamHandlerMatch(baseProto, func(protocol protocol.ID) bool { + // Match exact base protocol (/ww/0.1.0/) or with method suffix (/ww/0.1.0//) + return protocol == baseProto || strings.HasPrefix(string(protocol), string(baseProto)+"/") + }, func(s network.Stream) { defer s.CloseRead() + + // Extract method from protocol string + method := "poll" // default + protocolStr := string(s.Protocol()) + if strings.HasPrefix(protocolStr, string(baseProto)+"/") { + // Extract method from /ww/0.1.0// + parts := strings.Split(protocolStr, "/") + if len(parts) > 0 { + method = parts[len(parts)-1] + } + } + slog.InfoContext(ctx, "stream connected", "peer", s.Conn().RemotePeer(), "stream-id", s.ID(), - "endpoint", p.Endpoint.Name) - if err := p.Poll(ctx, s, nil); err != nil { + "endpoint", p.Endpoint.Name, + "method", method) + if err := p.ProcessMessage(ctx, s, method); err != nil { slog.ErrorContext(ctx, "failed to poll process", "id", p.ID(), "stream", s.ID(), + "method", method, "reason", err) } }) - defer env.Host.RemoveStreamHandler(p.Endpoint.Protocol()) + defer env.Host.RemoveStreamHandler(baseProto) for { select { diff --git a/examples/echo/main.go b/examples/echo/main.go index cbe8515..622d424 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -10,26 +10,19 @@ import ( // main is the entry point for synchronous mode. // It processes one complete message from stdin and exits. func main() { - // Echo: copy stdin to stdout using io.Copy - // io.Copy uses an internal 32KB buffer by default - if _, err := io.Copy(os.Stdout, os.Stdin); err != nil { - os.Stderr.WriteString("Error copying stdin to stdout: " + err.Error() + "\n") - os.Exit(1) - } - defer os.Stdout.Sync() - // implicitly returns 0 to indicate successful completion + echo() } -// poll is the async entry point for stream-based processing. +// echo is the async entry point for stream-based processing. // This function is called by the wetware runtime when a new stream // is established for this process. // -//export poll -func poll() { +//export echo +func echo() { // In async mode, we process each incoming stream // This is the same logic as main() but for individual streams if _, err := io.Copy(os.Stdout, os.Stdin); err != nil { - os.Stderr.WriteString("Error in poll: " + err.Error() + "\n") + os.Stderr.WriteString("Error in echo: " + err.Error() + "\n") os.Exit(1) } defer os.Stdout.Sync() diff --git a/examples/echo/main.wasm b/examples/echo/main.wasm index ba70093..e07bf35 100644 Binary files a/examples/echo/main.wasm and b/examples/echo/main.wasm differ diff --git a/go.mod b/go.mod index 990a55b..23d465d 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/lthibault/go-libp2p-inproc-transport v0.4.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.16.0 + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 github.com/tetratelabs/wazero v1.9.0 github.com/urfave/cli/v2 v2.27.5 diff --git a/system/ipfs.go b/system/ipfs.go new file mode 100644 index 0000000..fbfbe3b --- /dev/null +++ b/system/ipfs.go @@ -0,0 +1,252 @@ +package system + +import ( + "context" + "io" + "io/fs" + "log/slog" + "runtime" + "time" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/boxo/path" + iface "github.com/ipfs/kubo/core/coreiface" + "github.com/pkg/errors" +) + +var _ fs.FS = (*IPFS)(nil) + +// An IPFS provides access to a hierarchical file system. +// +// The IPFS interface is the minimum implementation required of the file system. +// A file system may implement additional interfaces, +// such as [ReadFileFS], to provide additional or optimized functionality. +// +// [testing/fstest.TestFS] may be used to test implementations of an IPFS for +// correctness. +type IPFS struct { + Ctx context.Context + Root path.Path + Unix iface.UnixfsAPI +} + +// Open opens the named file. +// +// When Open returns an error, it should be of type *PathError +// with the Op field set to "open", the Path field set to name, +// and the Err field describing the problem. +// +// Open should reject attempts to open names that do not satisfy +// fs.ValidPath(name), returning a *fs.PathError with Err set to +// fs.ErrInvalid or fs.ErrNotExist. +func (f IPFS) Open(name string) (fs.File, error) { + path, node, err := f.Resolve(f.Ctx, name) + if err != nil { + return nil, &fs.PathError{ + Op: "open", + Path: name, + Err: err, + } + } + + return &ipfsNode{ + Path: path, + Node: node, + }, nil +} + +func (f IPFS) Resolve(ctx context.Context, name string) (path.Path, files.Node, error) { + if pathInvalid(name) { + return nil, nil, fs.ErrInvalid + } + + p, err := path.Join(f.Root, name) + if err != nil { + return nil, nil, err + } + + node, err := f.Unix.Get(ctx, p) + return p, node, err +} + +func pathInvalid(name string) bool { + return !fs.ValidPath(name) +} + +func (f IPFS) Sub(dir string) (fs.FS, error) { + var root path.Path + var err error + if (f == IPFS{}) { + root, err = path.NewPath(dir) + } else { + root, err = path.Join(f.Root, dir) + } + + return &IPFS{ + Ctx: f.Ctx, + Root: root, + Unix: f.Unix, + }, err +} + +var ( + _ fs.FileInfo = (*ipfsNode)(nil) + _ fs.ReadDirFile = (*ipfsNode)(nil) + _ fs.DirEntry = (*ipfsNode)(nil) +) + +// ipfsNode provides access to a single file. The fs.File interface is the minimum +// implementation required of the file. Directory files should also implement [ReadDirFile]. +// A file may implement io.ReaderAt or io.Seeker as optimizations. +type ipfsNode struct { + Path path.Path + files.Node +} + +// base name of the file +func (n ipfsNode) Name() string { + segs := n.Path.Segments() + return segs[len(segs)-1] // last segment is name +} + +func (n *ipfsNode) Stat() (fs.FileInfo, error) { + return n, nil +} + +// length in bytes for regular files; system-dependent for others +func (n ipfsNode) Size() int64 { + size, err := n.Node.Size() + if err != nil { + slog.Error("failed to obtain file size", + "path", n.Path, + "reason", err) + } + + return size +} + +// file mode bits +func (n ipfsNode) Mode() fs.FileMode { + switch n.Node.(type) { + case files.Directory: + return fs.ModeDir + default: + return 0 // regular read-only file + } +} + +// modification time +func (n ipfsNode) ModTime() time.Time { + return time.Time{} // zero-value time +} + +// abbreviation for Mode().IsDir() +func (n ipfsNode) IsDir() bool { + return n.Mode().IsDir() +} + +// underlying data source (never returns nil) +func (n ipfsNode) Sys() any { + return n.Node +} + +func (n ipfsNode) Read(b []byte) (int, error) { + switch node := n.Node.(type) { + case io.Reader: + return node.Read(b) + default: + return 0, errors.New("unreadable node") + } +} + +// ReadDir reads the contents of the directory and returns +// a slice of up to max DirEntry values in directory order. +// Subsequent calls on the same file will yield further DirEntry values. +// +// If max > 0, ReadDir returns at most max DirEntry structures. +// In this case, if ReadDir returns an empty slice, it will return +// a non-nil error explaining why. +// At the end of a directory, the error is io.EOF. +// (ReadDir must return io.EOF itself, not an error wrapping io.EOF.) +// +// If max <= 0, ReadDir returns all the DirEntry values from the directory +// in a single slice. In this case, if ReadDir succeeds (reads all the way +// to the end of the directory), it returns the slice and a nil error. +// If it encounters an error before the end of the directory, +// ReadDir returns the DirEntry list read until that point and a non-nil error. +func (n ipfsNode) ReadDir(max int) (entries []fs.DirEntry, err error) { + root, ok := n.Node.(files.Directory) + if !ok { + return nil, errors.New("not a directory") + } + + iter := root.Entries() + for iter.Next() { + name := iter.Name() + node := iter.Node() + + // Callers will typically discard entries if they get a non-nill + // error, so we make sure nodes are eventually closed. + runtime.SetFinalizer(node, func(c io.Closer) { + if err := c.Close(); err != nil { + slog.Warn("unable to close node", + "name", name, + "reason", err) + } + }) + + var subpath path.Path + if subpath, err = path.Join(n.Path, name); err != nil { + return + } + + entries = append(entries, &ipfsNode{ + Path: subpath, + Node: node}) + + // got max items? + if max--; max == 0 { + return + } + } + + // If we get here, it's because the iterator stopped. It either + // failed or is exhausted. Any other error has already caused us + // to return. + if iter.Err() != nil { + err = iter.Err() // failed + } else if max >= 0 { + err = io.EOF // exhausted + } + + return +} + +// Info returns the FileInfo for the file or subdirectory described by the entry. +// The returned FileInfo may be from the time of the original directory read +// or from the time of the call to Info. If the file has been removed or renamed +// since the directory read, Info may return an error satisfying errors.Is(err, ErrNotExist). +// If the entry denotes a symbolic link, Info reports the information about the link itself, +// not the link's target. +func (n *ipfsNode) Info() (fs.FileInfo, error) { + return n, nil +} + +// Type returns the type bits for the entry. +// The type bits are a subset of the usual FileMode bits, those returned by the FileMode.Type method. +func (n ipfsNode) Type() fs.FileMode { + if n.Mode().IsDir() { + return fs.ModeDir + } + + return 0 +} + +func (n ipfsNode) Write(b []byte) (int, error) { + dst, ok := n.Node.(io.Writer) + if ok { + return dst.Write(b) + } + + return 0, errors.New("not writeable") +} diff --git a/system/ipfs_test.go b/system/ipfs_test.go new file mode 100644 index 0000000..f5a378c --- /dev/null +++ b/system/ipfs_test.go @@ -0,0 +1,88 @@ +package system_test + +import ( + "context" + "io/fs" + "testing" + "testing/fstest" + + "github.com/ipfs/boxo/path" + "github.com/ipfs/kubo/client/rpc" + "github.com/stretchr/testify/require" + "github.com/wetware/go/system" +) + +const IPFS_ROOT = "/ipfs/QmRecDLNaESeNY3oUFYZKK9ftdANBB8kuLaMdAXMD43yon" // go/system/testdata/fs + +// TestIPFS_Env verifies that an IPFS node is available in the +// host environment, and that it exports IPFS_ROOT. It then +// checks that IPFS_ROOT contains the expected directory structure. +// +// Other tests in this file will likely fail if TestIPFS_Env fails. +func TestIPFS_Env(t *testing.T) { + t.Parallel() + + root, err := path.NewPath(IPFS_ROOT) + require.NoError(t, err) + + ipfs, err := rpc.NewLocalApi() + if err != nil { + t.Skipf("IPFS not available: %v", err) + } + + dir, err := ipfs.Unixfs().Ls(context.Background(), root) + require.NoError(t, err) + + var names []string + for entry := range dir { + names = append(names, entry.Name) + } + + expect := []string{"testdata", "main.go", "main.wasm"} + require.ElementsMatch(t, names, expect, + "unexpected file path") +} + +func TestIPFS_FS(t *testing.T) { + t.Parallel() + + root, err := path.NewPath(IPFS_ROOT) + require.NoError(t, err) + + ipfs, err := rpc.NewLocalApi() + if err != nil { + t.Skipf("IPFS not available: %v", err) + } + + fs := system.IPFS{Ctx: context.Background(), Unix: ipfs.Unixfs(), Root: root} + err = fstest.TestFS(fs, + "main.go", + "main.wasm", + "testdata") + require.NoError(t, err) +} + +// TestIPFS_SubFS ensures that the filesystem retunred by fs.Sub correctly +// handles the '.' path. The returned filesystem MUST ensure that '.' +// resolves to the root IPFS path. +func TestIPFS_SubFS(t *testing.T) { + t.Parallel() + + root, err := path.NewPath("/ipfs/QmSAyttKvYkSCBTghuMxAJaBZC3jD2XLRCQ5FB3CTrb9rE") // go/system/testdata + require.NoError(t, err) + + ipfs, err := rpc.NewLocalApi() + if err != nil { + t.Skipf("IPFS not available: %v", err) + } + + fs, err := fs.Sub(system.IPFS{Ctx: context.Background(), Unix: ipfs.Unixfs(), Root: root}, "fs") + require.NoError(t, err) + require.NotNil(t, fs) + + err = fstest.TestFS(fs, + "main.go", + "main.wasm", + "testdata") + require.NoError(t, err) +} diff --git a/system/proc.go b/system/proc.go index c8a4185..3f89886 100644 --- a/system/proc.go +++ b/system/proc.go @@ -21,6 +21,7 @@ import ( ) type ProcConfig struct { + IPFS IPFS Host host.Host Runtime wazero.Runtime Src io.ReadCloser @@ -76,7 +77,7 @@ func (c ProcConfig) New(ctx context.Context) (*Proc, error) { } // Configure module instantiation based on async mode - config := c.NewModuleConfig(e) + config := c.NewModuleConfig(ctx, e) mod, err := c.Runtime.InstantiateModule(ctx, cm, config) if err != nil { @@ -107,7 +108,8 @@ func (c ProcConfig) New(ctx context.Context) (*Proc, error) { Config: c, Module: mod, Endpoint: e, - Closer: cs} + Closer: cs, + sem: semaphore.NewWeighted(1)} return proc, nil } @@ -116,13 +118,14 @@ type ReadWriteStringer interface { io.ReadWriter } -func (c ProcConfig) NewModuleConfig(sock ReadWriteStringer) wazero.ModuleConfig { +func (c ProcConfig) NewModuleConfig(ctx context.Context, sock ReadWriteStringer) wazero.ModuleConfig { config := wazero.NewModuleConfig(). WithName(sock.String()). WithArgs(c.Args...). WithStdin(sock). WithStdout(sock). - WithStderr(c.ErrWriter) + WithStderr(c.ErrWriter). + WithFSConfig(c.NewFSConfig(ctx)) // async mode? if c.Async { @@ -140,16 +143,25 @@ func (c ProcConfig) NewModuleConfig(sock ReadWriteStringer) wazero.ModuleConfig return config } +func (c ProcConfig) NewFSConfig(ctx context.Context) wazero.FSConfig { + ipfs := IPFS{ + Ctx: ctx, + Unix: c.IPFS.Unix, + Root: c.IPFS.Root} + + return wazero.NewFSConfig(). + WithFSMount(ipfs, "/ipfs"). + WithFSMount(ipfs, "/ipns"). + WithFSMount(ipfs, "/ipld") +} + func (p ProcConfig) NewEndpoint() *Endpoint { var buf [8]byte if _, err := io.ReadFull(rand.Reader, buf[:]); err != nil { panic(err) } - return &Endpoint{ - Name: base58.FastBase58Encoding(buf[:]), - sem: semaphore.NewWeighted(1), - } + return &Endpoint{Name: base58.FastBase58Encoding(buf[:])} } type Proc struct { @@ -157,6 +169,8 @@ type Proc struct { Endpoint *Endpoint Module api.Module api.Closer + + sem *semaphore.Weighted } // ID returns the process identifier (endpoint name) without the protocol prefix. @@ -166,8 +180,8 @@ func (p Proc) ID() string { // ProcessMessage processes one complete message synchronously. // In sync mode: lets _start run automatically and process one message -// In async mode: calls the poll export function -func (p Proc) ProcessMessage(ctx context.Context, s network.Stream) error { +// In async mode: calls the specified export function +func (p Proc) ProcessMessage(ctx context.Context, s network.Stream, method string) error { if deadline, ok := ctx.Deadline(); ok { if err := s.SetReadDeadline(deadline); err != nil { return fmt.Errorf("set read deadline: %w", err) @@ -187,14 +201,28 @@ func (p Proc) ProcessMessage(ctx context.Context, s network.Stream) error { p.Endpoint.ReadWriteCloser = nil }() - // In async mode, call the poll export function + // In async mode, call the specified export function if p.Config.Async { - if poll := p.Module.ExportedFunction("poll"); poll == nil { - return fmt.Errorf("%s::poll: not found", p.ID()) - } else if err := poll.CallWithStack(ctx, nil); err != nil { + // Normalize method: if empty string, use "poll" + if method == "" { + method = "poll" + } + + exp := p.Module.ExportedFunction(method) + if exp == nil { + _ = s.Reset() + return fmt.Errorf("unknown method: %s", method) + } + + if err := p.sem.Acquire(ctx, 1); err != nil { + return fmt.Errorf("acquire semaphore: %w", err) + } + defer p.sem.Release(1) + + if err := exp.CallWithStack(ctx, nil); err != nil { var exitErr *sys.ExitError if errors.As(err, &exitErr) && exitErr.ExitCode() != 0 { - return fmt.Errorf("%s::poll: %w", p.ID(), err) + return fmt.Errorf("%s::%s: %w", p.ID(), method, err) } // If it's ExitError with code 0, treat as success } @@ -204,11 +232,6 @@ func (p Proc) ProcessMessage(ctx context.Context, s network.Stream) error { return nil } -// Poll is an alias for ProcessMessage for backward compatibility -func (p Proc) Poll(ctx context.Context, s network.Stream, stack []uint64) error { - return p.ProcessMessage(ctx, s) -} - type CloserSlice []api.Closer func (cs CloserSlice) Close(ctx context.Context) error { diff --git a/system/proc_test.go b/system/proc_test.go index e2b68a0..f1a8fca 100644 --- a/system/proc_test.go +++ b/system/proc_test.go @@ -218,16 +218,17 @@ func TestProc_Poll_WithGomock(t *testing.T) { defer proc.Close(ctx) defer runtime.Close(ctx) - // Test that poll function exists - pollFunc := proc.Module.ExportedFunction("poll") - assert.NotNil(t, pollFunc, "poll function should exist") + // Test that echo function exists + echoFunc := proc.Module.ExportedFunction("echo") + assert.NotNil(t, echoFunc, "echo function should exist") // The echo WASM module will try to read from stdin, so we need to expect Read calls - // The poll function reads up to 512 bytes from stdin + // The echo function reads up to 512 bytes from stdin mockStream.EXPECT().Read(gomock.Any()).Return(0, io.EOF).AnyTimes() + mockStream.EXPECT().Reset().Return(nil).AnyTimes() - // Actually call the Poll method - this should succeed since we have a valid WASM function - err = proc.ProcessMessage(ctx, mockStream) + // Actually call the ProcessMessage method - this should succeed since we have a valid WASM function + err = proc.ProcessMessage(ctx, mockStream, "echo") // The WASM function should execute successfully assert.NoError(t, err) }) @@ -272,13 +273,14 @@ func TestProc_Poll_WithGomock(t *testing.T) { mockStream.EXPECT().SetReadDeadline(deadline).Return(nil) // The echo WASM module will try to read from stdin mockStream.EXPECT().Read(gomock.Any()).Return(0, io.EOF).AnyTimes() + mockStream.EXPECT().Reset().Return(nil).AnyTimes() - // Test that poll function exists (for async mode) - pollFunc := proc.Module.ExportedFunction("poll") - assert.NotNil(t, pollFunc, "poll function should exist") + // Test that echo function exists (for async mode) + echoFunc := proc.Module.ExportedFunction("echo") + assert.NotNil(t, echoFunc, "echo function should exist") // Actually call the ProcessMessage method to trigger the mock expectations - err = proc.ProcessMessage(ctxWithDeadline, mockStream) + err = proc.ProcessMessage(ctxWithDeadline, mockStream, "echo") // The WASM function should execute successfully assert.NoError(t, err) }) @@ -303,7 +305,7 @@ func TestProc_Poll_WithGomock(t *testing.T) { deadlineErr := errors.New("deadline error") mockStream.EXPECT().SetReadDeadline(deadline).Return(deadlineErr) - err := proc.ProcessMessage(ctxWithDeadline, mockStream) + err := proc.ProcessMessage(ctxWithDeadline, mockStream, "echo") assert.Error(t, err) assert.Contains(t, err.Error(), "set read deadline") assert.Contains(t, err.Error(), "deadline error") @@ -547,9 +549,9 @@ func TestProc_Integration_WithRealWasm(t *testing.T) { assert.NotEmpty(t, proc.Endpoint.Name, "Endpoint should have a name") assert.NotEmpty(t, proc.ID(), "String should not be empty") - // Test that the poll function is exported - pollFunc := proc.Module.ExportedFunction("poll") - assert.NotNil(t, pollFunc, "poll function should be exported") + // Test that the echo function is exported + echoFunc := proc.Module.ExportedFunction("echo") + assert.NotNil(t, echoFunc, "echo function should be exported") } // TestEcho_Synchronous tests the echo example in synchronous mode @@ -689,9 +691,14 @@ func TestEcho_Asynchronous(t *testing.T) { Return(nil). AnyTimes() + mockStream.EXPECT(). + Reset(). + Return(nil). + AnyTimes() + // Process message with the mock stream // This should process one complete message (until EOF) - err = proc.ProcessMessage(ctx, mockStream) + err = proc.ProcessMessage(ctx, mockStream, "echo") require.NoError(t, err, "ProcessMessage should succeed") // Verify the output matches the input @@ -780,8 +787,13 @@ func TestEcho_RepeatedAsync(t *testing.T) { Return(nil). AnyTimes() + mockStream.EXPECT(). + Reset(). + Return(nil). + AnyTimes() + // Process message with the mock stream - err = proc.ProcessMessage(ctx, mockStream) + err = proc.ProcessMessage(ctx, mockStream, "echo") require.NoError(t, err, "ProcessMessage should succeed for message %d", i+1) // Verify the output matches the input diff --git a/system/system.go b/system/system.go index 1ce2804..b9201f0 100644 --- a/system/system.go +++ b/system/system.go @@ -5,13 +5,11 @@ import ( "io" "github.com/libp2p/go-libp2p/core/protocol" - "golang.org/x/sync/semaphore" ) type Endpoint struct { Name string io.ReadWriteCloser - sem *semaphore.Weighted } // Read implements io.Reader for Endpoint