Compare commits

...

1 Commits

Author SHA1 Message Date
Eve
73a9583693 feat(composio): add standalone Go SDK client (MVP)
Adds server/pkg/composio — a self-contained Go SDK for the Composio v3.1
REST API. Built on go-resty/resty v2; zero coupling to other Multica
packages so it can be vendored or extracted later without surgery.

MVP surface (just the endpoints Stage 2 needs):

- POST /connected_accounts/link        Client.CreateLink
- POST /tool_router/session            Client.CreateSession
- GET  /connected_accounts             Client.ListConnectedAccounts
- POST /connected_accounts/{id}/revoke Client.RevokeConnection
- DELETE /connected_accounts/{id}      Client.DeleteConnectedAccount
                                       (404 -> nil, idempotent)
- GET  /toolkits                       Client.ListToolkits
- GET  /toolkits/{slug}                Client.GetToolkit
- POST /tools/execute/{slug}           Client.ExecuteTool
- Webhook HMAC-SHA256 verification     composio.VerifyWebhook /
                                       VerifyHTTPRequest + ParseEvent

Other notes:

- Auth via x-api-key header (Composio v3.1 contract).
- Typed *APIError envelope with IsNotFound / IsUnauthorized /
  IsRateLimited helpers; falls back to raw body when upstream returns
  non-JSON.
- Webhook signature accepts the official "v1,<sig>" format and any
  comma-separated multi-version list; 300s replay tolerance by default,
  honors an injectable clock for tests; RFC3339 timestamps tolerated.
- README.md documents all public APIs and design choices.

Tests:

- All exercise httptest.NewServer - no real Composio calls.
- 36 tests covering happy paths, validation, 404 idempotence, error
  decoding, signature verify (good / tampered / stale / multi-version /
  bare / RFC3339 / missing headers / empty secret).
- go test ./pkg/composio/... -cover -> 82.2%, exceeds the >=80% bar.

Follow-ups (separate PRs):

- server/internal/integrations/composio - DB schema, REST handlers,
  registration_service (CSRF), dispatch hook (MUL-3720 remainder).
- Pagination iterators, retry middleware, proxy execute, triggers.

Refs: MUL-3720, MUL-3715
Co-authored-by: multica-agent <github@multica.ai>
2026-06-26 14:46:48 +08:00
13 changed files with 1754 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.5
github.com/go-chi/chi/v5 v5.2.5
github.com/go-chi/cors v1.2.2
github.com/go-resty/resty/v2 v2.17.2
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
@@ -61,6 +62,7 @@ require (
github.com/prometheus/procfs v0.16.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.35.0 // indirect
)

View File

@@ -57,6 +57,8 @@ github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE=
github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-resty/resty/v2 v2.17.2 h1:FQW5oHYcIlkCNrMD2lloGScxcHJ0gkjshV3qcQAyHQk=
github.com/go-resty/resty/v2 v2.17.2/go.mod h1:kCKZ3wWmwJaNc7S29BRtUhJwy7iqmn+2mLtQrOyQlVA=
github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U=
github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
@@ -137,12 +139,16 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -0,0 +1,159 @@
# composio
A small, standalone Go SDK for the [Composio v3.1 REST API](https://docs.composio.dev/api-reference).
This package is intentionally self-contained — its only third-party dependency
is [`github.com/go-resty/resty/v2`](https://github.com/go-resty/resty). It does
not import any other Multica package, so it can be reused by other services or
extracted into its own module unchanged.
## Scope (MVP)
Only the endpoints required by the first-stage Composio integration are wired
up. More surface (auth configs, triggers, proxy execute, etc.) can be added
later without changing existing types.
| Capability | Method | REST endpoint |
| --- | --- | --- |
| Create Connect Link (hosted auth flow) | `Client.CreateLink` | `POST /connected_accounts/link` |
| Create MCP / tool-router session | `Client.CreateSession` | `POST /tool_router/session` |
| List connected accounts (per user) | `Client.ListConnectedAccounts` | `GET /connected_accounts` |
| Revoke a connection at the provider | `Client.RevokeConnection` | `POST /connected_accounts/{id}/revoke` |
| Delete a connection record (idempotent) | `Client.DeleteConnectedAccount` | `DELETE /connected_accounts/{id}` |
| List toolkits | `Client.ListToolkits` | `GET /toolkits` |
| Get a toolkit by slug | `Client.GetToolkit` | `GET /toolkits/{slug}` |
| Execute a tool deterministically | `Client.ExecuteTool` | `POST /tools/execute/{slug}` |
| Verify a webhook delivery | `VerifyWebhook` / `VerifyHTTPRequest` | (offline) |
## Quick start
```go
import (
"context"
"os"
"github.com/multica-ai/multica/server/pkg/composio"
)
client, err := composio.NewClient(composio.Options{
APIKey: os.Getenv("COMPOSIO_API_KEY"),
})
if err != nil { /* ... */ }
// 1. Send a user to the hosted Connect Link
link, err := client.CreateLink(ctx, composio.CreateLinkRequest{
AuthConfigID: "ac_xxxxxxxx", // configured in the Composio dashboard
UserID: multicaUserID.String(), // your own user id
CallbackURL: "https://app.multica.ai/api/integrations/composio/callback",
})
// → http.Redirect(w, r, link.RedirectURL, http.StatusFound)
// 2. After Composio creates the account, fetch what the user has connected
accounts, err := client.ListConnectedAccounts(ctx, composio.ListConnectedAccountsRequest{
UserID: multicaUserID.String(),
Statuses: []string{"ACTIVE"},
})
// 3. Open an MCP session for the agent runtime
session, err := client.CreateSession(ctx, composio.CreateSessionRequest{
UserID: multicaUserID.String(),
ManageConnections: &composio.ManageConnections{
CallbackURL: "https://app.multica.ai/settings/integrations",
},
})
mcpURL := session.MCP.URL
mcpHdr := client.MCPAuthHeaders() // {"x-api-key": "..."} attach to MCP client
// 4. Disconnect (idempotent — 404 returns nil)
_ = client.RevokeConnection(ctx, "ca_xxxxxxxx")
_ = client.DeleteConnectedAccount(ctx, "ca_xxxxxxxx")
```
## Webhook verification
```go
secret := os.Getenv("COMPOSIO_WEBHOOK_SECRET")
http.HandleFunc("/api/integrations/composio/webhook", func(w http.ResponseWriter, r *http.Request) {
body, err := composio.VerifyHTTPRequest(secret, r, composio.VerifyOptions{})
if err != nil {
http.Error(w, "invalid signature", http.StatusUnauthorized)
return
}
event, err := composio.ParseEvent(body)
if err != nil {
http.Error(w, "bad payload", http.StatusBadRequest)
return
}
switch event.Type {
case "composio.connected_account.expired":
// mark row as expired, notify the user, ...
}
w.WriteHeader(http.StatusOK)
})
```
`VerifyWebhook` enforces a 300-second replay tolerance by default (matching
Composio's official SDKs). Pass `VerifyOptions{Tolerance: ...}` to tune it, or
`-1` to disable the check entirely (only useful when replaying historical
deliveries in tests).
The `webhook-signature` header is parsed as a list of `<version>,<sig>` pairs
so future signing versions don't break verification.
## Errors
All non-2xx responses are returned as a `*composio.APIError` carrying the
upstream status, slug, and message:
```go
_, err := client.CreateLink(ctx, req)
var apiErr *composio.APIError
if errors.As(err, &apiErr) {
if apiErr.IsRateLimited() { /* back off */ }
log.Printf("composio: %d %s (%s) req=%s", apiErr.HTTPStatus, apiErr.Message, apiErr.Slug, apiErr.RequestID)
}
```
`DeleteConnectedAccount` deliberately swallows 404 so the operation is
idempotent — every other error is propagated unchanged.
## Testing
The SDK is exercised entirely against `httptest.NewServer` so unit tests run
offline. Run them with:
```
go test ./server/pkg/composio/...
```
Current coverage: **82.2 %**.
## Design notes
- **Standalone.** Zero coupling to Multica internals — depend on this package
from `server/internal/integrations/composio` (Stage 2 integration glue) or
anywhere else without circular-import risk.
- **`x-api-key`, not Bearer.** Composio's v3.1 REST API authenticates with an
`x-api-key` header. The SDK sets it on every request and exposes
`Client.APIKeyHeader()` / `Client.MCPAuthHeaders()` so callers know
which header to attach when they're reaching Composio outside the SDK
(e.g. the MCP streaming client in the agent runtime).
- **Loose typing for evolving fields.** Session request blocks (`toolkits`,
`auth_configs`, `tools`, `multi_account`, …) and tool execution arguments
use `map[string]any` because their nested schemas are large and likely to
evolve. The frequently-used `manage_connections` block has a typed
wrapper — extend the typed surface as more shapes stabilise.
- **Webhook signing matches the official SDKs.** HMAC-SHA256 over
`{id}.{timestamp}.{rawBody}`, base64-encoded, with a 300-second replay
window. See
[Composio webhook verification](https://docs.composio.dev/docs/setting-up-triggers/subscribing-to-events#verifying-signatures).
## Roadmap (out of scope for v1)
- Auth-config CRUD (`/auth_configs`)
- Triggers (`/triggers`)
- Proxy execute (`/tools/execute/proxy`)
- Session meta-tool / `attach` / `search` endpoints
- Pagination iterators
- Built-in retry middleware on 429 / 5xx

View File

@@ -0,0 +1,156 @@
package composio
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/go-resty/resty/v2"
)
// DefaultBaseURL is the canonical Composio v3.1 REST root.
const DefaultBaseURL = "https://backend.composio.dev/api/v3.1"
// DefaultUserAgent is sent on every request unless overridden via [Options.UserAgent].
const DefaultUserAgent = "multica-composio-go/0.1"
// DefaultTimeout is the per-request timeout applied to the underlying
// resty client when [Options.Timeout] is zero.
const DefaultTimeout = 30 * time.Second
// Options configures a [Client]. Only APIKey is required.
type Options struct {
// APIKey is the Composio project API key, sent as the `x-api-key` header.
APIKey string
// BaseURL overrides the API root. Mostly useful for tests against a
// httptest.Server. Defaults to [DefaultBaseURL].
BaseURL string
// UserAgent overrides the User-Agent header. Defaults to [DefaultUserAgent].
UserAgent string
// Timeout is the per-request timeout. Zero means [DefaultTimeout].
// A negative value disables the timeout entirely.
Timeout time.Duration
// HTTPClient lets callers inject a custom *http.Client (for example with
// a corporate transport or instrumentation). If nil the default is used.
HTTPClient *http.Client
// RetryCount is the number of retries resty performs on transient
// failures. Zero means no retries (callers can layer their own).
RetryCount int
// RetryWaitTime is the base delay between retries when RetryCount > 0.
RetryWaitTime time.Duration
}
// Client is the Composio REST client.
//
// It is safe for concurrent use by multiple goroutines.
type Client struct {
rc *resty.Client
baseURL string
apiKey string
userAgent string
}
// NewClient constructs a Client from [Options]. It returns an error when the
// options are obviously broken (empty API key, malformed base URL).
func NewClient(opts Options) (*Client, error) {
if strings.TrimSpace(opts.APIKey) == "" {
return nil, errors.New("composio: APIKey is required")
}
baseURL := opts.BaseURL
if baseURL == "" {
baseURL = DefaultBaseURL
}
if _, err := url.Parse(baseURL); err != nil {
return nil, fmt.Errorf("composio: invalid BaseURL %q: %w", baseURL, err)
}
baseURL = strings.TrimRight(baseURL, "/")
ua := opts.UserAgent
if ua == "" {
ua = DefaultUserAgent
}
timeout := opts.Timeout
switch {
case timeout == 0:
timeout = DefaultTimeout
case timeout < 0:
timeout = 0 // resty treats 0 as "no timeout"
}
rc := resty.New().
SetBaseURL(baseURL).
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetHeader("User-Agent", ua).
SetHeader("x-api-key", opts.APIKey).
SetTimeout(timeout)
if opts.HTTPClient != nil {
rc = rc.SetTransport(opts.HTTPClient.Transport)
if opts.HTTPClient.Timeout > 0 {
rc = rc.SetTimeout(opts.HTTPClient.Timeout)
}
}
if opts.RetryCount > 0 {
rc = rc.SetRetryCount(opts.RetryCount)
if opts.RetryWaitTime > 0 {
rc = rc.SetRetryWaitTime(opts.RetryWaitTime)
}
}
return &Client{
rc: rc,
baseURL: baseURL,
apiKey: opts.APIKey,
userAgent: ua,
}, nil
}
// BaseURL returns the resolved API root after defaulting.
func (c *Client) BaseURL() string { return c.baseURL }
// APIKeyHeader returns the header pair callers should attach to MCP
// streaming clients or any other Composio request made outside the SDK.
//
// Returning a copy keeps the internal map immutable.
func (c *Client) APIKeyHeader() map[string]string {
return map[string]string{"x-api-key": c.apiKey}
}
// newRequest returns a resty.Request bound to the given context.
// All endpoint methods funnel through this helper.
func (c *Client) newRequest(ctx context.Context) *resty.Request {
return c.rc.R().SetContext(ctx)
}
// do executes a request and unmarshals a successful body into out.
// On non-2xx it returns a *APIError populated from the response body.
//
// out may be nil if the caller does not care about the body
// (e.g. DELETE / 204).
func (c *Client) do(req *resty.Request, method, path string, out any) error {
if out != nil {
req = req.SetResult(out)
}
resp, err := req.Execute(method, path)
if err != nil {
return fmt.Errorf("composio: %s %s: %w", method, path, err)
}
if resp.IsError() {
return parseAPIError(resp.StatusCode(), resp.Body())
}
return nil
}

View File

@@ -0,0 +1,430 @@
package composio_test
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/multica-ai/multica/server/pkg/composio"
)
// newTestServer wires up a httptest.Server with the provided handler and
// returns a composio.Client pointed at it.
func newTestServer(t *testing.T, h http.HandlerFunc) (*composio.Client, *httptest.Server) {
t.Helper()
srv := httptest.NewServer(h)
t.Cleanup(srv.Close)
c, err := composio.NewClient(composio.Options{
APIKey: "test-key",
BaseURL: srv.URL,
Timeout: 5 * time.Second,
})
if err != nil {
t.Fatalf("NewClient: %v", err)
}
return c, srv
}
func readJSON(t *testing.T, r *http.Request, out any) {
t.Helper()
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("read body: %v", err)
}
if err := json.Unmarshal(body, out); err != nil {
t.Fatalf("unmarshal body %q: %v", string(body), err)
}
}
func writeJSON(t *testing.T, w http.ResponseWriter, status int, v any) {
t.Helper()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(v); err != nil {
t.Fatalf("write json: %v", err)
}
}
// ---------------------------------------------------------------------------
// Client construction
// ---------------------------------------------------------------------------
func TestNewClient_Defaults(t *testing.T) {
c, err := composio.NewClient(composio.Options{APIKey: "k"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := c.BaseURL(); got != composio.DefaultBaseURL {
t.Errorf("BaseURL = %q, want %q", got, composio.DefaultBaseURL)
}
}
func TestNewClient_RequiresAPIKey(t *testing.T) {
_, err := composio.NewClient(composio.Options{})
if err == nil {
t.Fatal("expected error when APIKey is empty")
}
}
func TestNewClient_TrimsTrailingSlash(t *testing.T) {
c, err := composio.NewClient(composio.Options{APIKey: "k", BaseURL: "https://x.example.com/"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got, want := c.BaseURL(), "https://x.example.com"; got != want {
t.Errorf("BaseURL = %q, want %q", got, want)
}
}
// ---------------------------------------------------------------------------
// Connect Link
// ---------------------------------------------------------------------------
func TestCreateLink_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/connected_accounts/link" {
t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path)
}
if got := r.Header.Get("x-api-key"); got != "test-key" {
t.Errorf("missing api key header, got %q", got)
}
var body composio.CreateLinkRequest
readJSON(t, r, &body)
if body.AuthConfigID != "ac_abc" || body.UserID != "u_1" {
t.Errorf("unexpected body: %+v", body)
}
writeJSON(t, w, http.StatusCreated, map[string]any{
"link_token": "ltok_xyz",
"redirect_url": "https://connect.composio.dev/ln_xyz",
"expires_at": "2026-12-31T00:00:00Z",
"connected_account_id": "ca_pending",
})
})
resp, err := c.CreateLink(context.Background(), composio.CreateLinkRequest{
AuthConfigID: "ac_abc",
UserID: "u_1",
CallbackURL: "https://example.com/cb",
})
if err != nil {
t.Fatalf("CreateLink: %v", err)
}
if resp.RedirectURL == "" || resp.LinkToken != "ltok_xyz" || resp.ConnectedAccountID != "ca_pending" {
t.Errorf("unexpected response: %+v", resp)
}
}
func TestCreateLink_ValidatesInputs(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("server should not be hit when inputs are invalid")
})
if _, err := c.CreateLink(context.Background(), composio.CreateLinkRequest{UserID: "u"}); err == nil {
t.Error("expected error when AuthConfigID is empty")
}
if _, err := c.CreateLink(context.Background(), composio.CreateLinkRequest{AuthConfigID: "ac"}); err == nil {
t.Error("expected error when UserID is empty")
}
}
func TestCreateLink_APIError(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
writeJSON(t, w, http.StatusBadRequest, map[string]any{
"error": map[string]any{
"message": "bad input",
"code": 400,
"slug": "INVALID_INPUT",
"request_id": "req_1",
},
})
})
_, err := c.CreateLink(context.Background(), composio.CreateLinkRequest{
AuthConfigID: "ac", UserID: "u",
})
if err == nil {
t.Fatal("expected error")
}
var apiErr *composio.APIError
if !errors.As(err, &apiErr) {
t.Fatalf("expected *APIError, got %T: %v", err, err)
}
if apiErr.HTTPStatus != http.StatusBadRequest || apiErr.Slug != "INVALID_INPUT" || apiErr.Message != "bad input" {
t.Errorf("unexpected APIError: %+v", apiErr)
}
}
// ---------------------------------------------------------------------------
// Connected accounts list / revoke / delete
// ---------------------------------------------------------------------------
func TestListConnectedAccounts_QueryString(t *testing.T) {
var seen *http.Request
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
seen = r
writeJSON(t, w, http.StatusOK, map[string]any{
"items": []map[string]any{
{"id": "ca_1", "user_id": "u_1", "status": "ACTIVE",
"toolkit": map[string]any{"slug": "notion"}},
},
"next_cursor": "cur_2",
})
})
resp, err := c.ListConnectedAccounts(context.Background(), composio.ListConnectedAccountsRequest{
UserID: "u_1",
ToolkitSlugs: []string{"notion", "slack"},
Statuses: []string{"ACTIVE"},
Limit: 25,
})
if err != nil {
t.Fatalf("ListConnectedAccounts: %v", err)
}
if len(resp.Items) != 1 || resp.Items[0].Toolkit.Slug != "notion" || resp.NextCursor != "cur_2" {
t.Errorf("unexpected response: %+v", resp)
}
q := seen.URL.Query()
if q.Get("user_id") != "u_1" {
t.Errorf("user_id = %q", q.Get("user_id"))
}
if got := q["toolkit_slugs"]; len(got) != 2 || got[0] != "notion" || got[1] != "slack" {
t.Errorf("toolkit_slugs = %v", got)
}
if q.Get("limit") != "25" {
t.Errorf("limit = %q", q.Get("limit"))
}
}
func TestRevokeConnection_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/connected_accounts/ca_42/revoke" {
t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path)
}
w.WriteHeader(http.StatusNoContent)
})
if err := c.RevokeConnection(context.Background(), "ca_42"); err != nil {
t.Errorf("RevokeConnection: %v", err)
}
}
func TestRevokeConnection_RequiresID(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("server should not be hit")
})
if err := c.RevokeConnection(context.Background(), ""); err == nil {
t.Error("expected error for empty id")
}
}
func TestDeleteConnectedAccount_IdempotentOn404(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Errorf("method = %s", r.Method)
}
writeJSON(t, w, http.StatusNotFound, map[string]any{
"error": map[string]any{"message": "not found", "status": 404, "slug": "NOT_FOUND"},
})
})
if err := c.DeleteConnectedAccount(context.Background(), "ca_gone"); err != nil {
t.Errorf("expected nil on 404, got %v", err)
}
}
func TestDeleteConnectedAccount_PropagatesOtherErrors(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
writeJSON(t, w, http.StatusInternalServerError, map[string]any{
"error": map[string]any{"message": "boom", "status": 500, "slug": "INTERNAL"},
})
})
err := c.DeleteConnectedAccount(context.Background(), "ca_1")
if err == nil {
t.Fatal("expected error")
}
var apiErr *composio.APIError
if !errors.As(err, &apiErr) || apiErr.HTTPStatus != http.StatusInternalServerError {
t.Errorf("unexpected error: %v", err)
}
}
// ---------------------------------------------------------------------------
// Sessions
// ---------------------------------------------------------------------------
func TestCreateSession_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/tool_router/session" {
t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path)
}
var body composio.CreateSessionRequest
readJSON(t, r, &body)
if body.UserID != "u_1" {
t.Errorf("user_id = %q", body.UserID)
}
if body.ManageConnections == nil || body.ManageConnections.CallbackURL != "https://cb" {
t.Errorf("manage_connections = %+v", body.ManageConnections)
}
writeJSON(t, w, http.StatusCreated, map[string]any{
"session_id": "trs_1",
"mcp": map[string]any{"type": "http", "url": "https://mcp.example/trs_1"},
})
})
enable := true
resp, err := c.CreateSession(context.Background(), composio.CreateSessionRequest{
UserID: "u_1",
ManageConnections: &composio.ManageConnections{
Enable: &enable,
CallbackURL: "https://cb",
},
})
if err != nil {
t.Fatalf("CreateSession: %v", err)
}
if resp.MCP.URL == "" || resp.SessionID != "trs_1" {
t.Errorf("unexpected response: %+v", resp)
}
hdr := c.MCPAuthHeaders()
if hdr["x-api-key"] != "test-key" {
t.Errorf("MCPAuthHeaders = %v", hdr)
}
}
func TestCreateSession_RequiresUserID(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("server should not be hit")
})
if _, err := c.CreateSession(context.Background(), composio.CreateSessionRequest{}); err == nil {
t.Error("expected error for empty UserID")
}
}
// ---------------------------------------------------------------------------
// Toolkits / Tools
// ---------------------------------------------------------------------------
func TestListToolkits_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/toolkits" || r.URL.Query().Get("category") != "productivity" {
t.Errorf("unexpected request: %s ?%s", r.URL.Path, r.URL.RawQuery)
}
writeJSON(t, w, http.StatusOK, map[string]any{
"items": []map[string]any{
{"slug": "notion", "name": "Notion"},
{"slug": "slack", "name": "Slack"},
},
})
})
resp, err := c.ListToolkits(context.Background(), composio.ListToolkitsRequest{Category: "productivity"})
if err != nil {
t.Fatalf("ListToolkits: %v", err)
}
if len(resp.Items) != 2 || resp.Items[0].Slug != "notion" {
t.Errorf("unexpected response: %+v", resp)
}
}
func TestGetToolkit_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/toolkits/notion" {
t.Errorf("path = %s", r.URL.Path)
}
writeJSON(t, w, http.StatusOK, map[string]any{"slug": "notion", "name": "Notion"})
})
tk, err := c.GetToolkit(context.Background(), "notion")
if err != nil {
t.Fatalf("GetToolkit: %v", err)
}
if tk.Slug != "notion" {
t.Errorf("slug = %q", tk.Slug)
}
}
func TestGetToolkit_RequiresSlug(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("server should not be hit")
})
if _, err := c.GetToolkit(context.Background(), ""); err == nil {
t.Error("expected error for empty slug")
}
}
func TestExecuteTool_Success(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/tools/execute/GITHUB_CREATE_ISSUE" {
t.Errorf("path = %s", r.URL.Path)
}
var body composio.ExecuteToolRequest
readJSON(t, r, &body)
if body.UserID != "u_1" || body.Arguments["title"] != "hi" {
t.Errorf("body = %+v", body)
}
writeJSON(t, w, http.StatusOK, map[string]any{
"successful": true,
"data": map[string]any{"issue_number": float64(42)},
"log_id": "log_1",
})
})
resp, err := c.ExecuteTool(context.Background(), "GITHUB_CREATE_ISSUE", composio.ExecuteToolRequest{
UserID: "u_1",
Arguments: map[string]any{"title": "hi"},
})
if err != nil {
t.Fatalf("ExecuteTool: %v", err)
}
if !resp.Successful || resp.Data["issue_number"].(float64) != 42 || resp.LogID != "log_1" {
t.Errorf("unexpected response: %+v", resp)
}
}
func TestExecuteTool_ValidatesInputs(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("server should not be hit")
})
if _, err := c.ExecuteTool(context.Background(), "", composio.ExecuteToolRequest{UserID: "u"}); err == nil {
t.Error("expected error for empty tool slug")
}
if _, err := c.ExecuteTool(context.Background(), "X", composio.ExecuteToolRequest{}); err == nil {
t.Error("expected error when neither UserID nor ConnectedAccountID is set")
}
}
// ---------------------------------------------------------------------------
// Error parsing
// ---------------------------------------------------------------------------
func TestAPIError_FallbackOnNonJSONBody(t *testing.T) {
c, _ := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("<html>upstream down</html>"))
})
_, err := c.ListToolkits(context.Background(), composio.ListToolkitsRequest{})
if err == nil {
t.Fatal("expected error")
}
var apiErr *composio.APIError
if !errors.As(err, &apiErr) {
t.Fatalf("expected *APIError, got %T: %v", err, err)
}
if apiErr.HTTPStatus != http.StatusBadGateway {
t.Errorf("status = %d", apiErr.HTTPStatus)
}
if !strings.Contains(string(apiErr.RawBody), "upstream down") {
t.Errorf("raw body lost: %q", apiErr.RawBody)
}
}
func TestAPIError_HelperPredicates(t *testing.T) {
e := &composio.APIError{HTTPStatus: http.StatusNotFound}
if !e.IsNotFound() {
t.Error("IsNotFound() = false")
}
e2 := &composio.APIError{HTTPStatus: http.StatusUnauthorized}
if !e2.IsUnauthorized() {
t.Error("IsUnauthorized() = false")
}
e3 := &composio.APIError{HTTPStatus: http.StatusTooManyRequests}
if !e3.IsRateLimited() {
t.Error("IsRateLimited() = false")
}
}

View File

@@ -0,0 +1,170 @@
package composio
import (
"context"
"errors"
"net/http"
"net/url"
"strconv"
)
// --- Create link --------------------------------------------------------
// CreateLinkRequest is the body of POST /connected_accounts/link.
//
// Spec: https://docs.composio.dev/reference/api-reference/connected-accounts/postConnectedAccountsLink
type CreateLinkRequest struct {
// AuthConfigID is the `ac_…` id of an auth config registered in your
// Composio project (one per toolkit / OAuth client variant).
AuthConfigID string `json:"auth_config_id"`
// UserID is your own user identifier — Composio scopes the resulting
// connected account by it.
UserID string `json:"user_id"`
// CallbackURL is where Composio sends the user after they finish the
// hosted auth flow. Optional; Composio has a default landing page.
CallbackURL string `json:"callback_url,omitempty"`
// Alias is a human-readable label for the connection. Optional but useful
// when the same user connects multiple accounts of the same toolkit.
Alias string `json:"alias,omitempty"`
// ConnectionData lets the caller pre-fill connection fields with default
// values (per the Composio docs). Free-form to avoid coupling to the
// scheme-specific child schemas.
ConnectionData map[string]any `json:"connection_data,omitempty"`
}
// CreateLinkResponse is the body returned by POST /connected_accounts/link.
type CreateLinkResponse struct {
LinkToken string `json:"link_token"`
RedirectURL string `json:"redirect_url"`
ExpiresAt string `json:"expires_at"`
ConnectedAccountID string `json:"connected_account_id"`
}
// CreateLink starts a hosted Composio Connect Link session. The redirect URL
// is what the caller should send the user to (popup, redirect, or
// SFSafariViewController).
func (c *Client) CreateLink(ctx context.Context, req CreateLinkRequest) (*CreateLinkResponse, error) {
if req.AuthConfigID == "" {
return nil, errors.New("composio: CreateLink: AuthConfigID is required")
}
if req.UserID == "" {
return nil, errors.New("composio: CreateLink: UserID is required")
}
var out CreateLinkResponse
if err := c.do(c.newRequest(ctx).SetBody(req), http.MethodPost, "/connected_accounts/link", &out); err != nil {
return nil, err
}
return &out, nil
}
// --- List ---------------------------------------------------------------
// ListConnectedAccountsRequest collects the optional filters supported by
// GET /connected_accounts. Zero values are omitted from the query string.
type ListConnectedAccountsRequest struct {
UserID string
ToolkitSlugs []string // OR'd by the upstream
AuthConfigID string
Statuses []string // ACTIVE, EXPIRED, INACTIVE, …
Limit int // 0 = use upstream default
Cursor string
}
// ConnectedAccount mirrors a subset of the Composio response shape. Only the
// fields actually consumed by the MVP are typed; extras live in Extra so
// callers can read them without an SDK update.
type ConnectedAccount struct {
ID string `json:"id"`
UserID string `json:"user_id"`
AuthConfigID string `json:"auth_config_id"`
Toolkit Toolkit `json:"toolkit"`
Status string `json:"status"`
StatusReason string `json:"status_reason,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
LastUsedAt string `json:"last_used_at,omitempty"`
Extra map[string]any `json:"-"`
}
// ListConnectedAccountsResponse is the typed paginated response.
type ListConnectedAccountsResponse struct {
Items []ConnectedAccount `json:"items"`
NextCursor string `json:"next_cursor,omitempty"`
TotalItems int `json:"total_items,omitempty"`
}
// ListConnectedAccounts returns the connections matching the supplied filters.
func (c *Client) ListConnectedAccounts(ctx context.Context, req ListConnectedAccountsRequest) (*ListConnectedAccountsResponse, error) {
q := url.Values{}
if req.UserID != "" {
q.Set("user_id", req.UserID)
}
for _, slug := range req.ToolkitSlugs {
if slug != "" {
q.Add("toolkit_slugs", slug)
}
}
if req.AuthConfigID != "" {
q.Set("auth_config_id", req.AuthConfigID)
}
for _, s := range req.Statuses {
if s != "" {
q.Add("statuses", s)
}
}
if req.Limit > 0 {
q.Set("limit", strconv.Itoa(req.Limit))
}
if req.Cursor != "" {
q.Set("cursor", req.Cursor)
}
path := "/connected_accounts"
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
var out ListConnectedAccountsResponse
if err := c.do(c.newRequest(ctx), http.MethodGet, path, &out); err != nil {
return nil, err
}
return &out, nil
}
// --- Revoke / Delete ----------------------------------------------------
// RevokeConnection revokes the OAuth grant at the upstream provider but
// keeps the Composio record. Use this when the user disconnects and you
// want the provider-side tokens invalidated immediately.
func (c *Client) RevokeConnection(ctx context.Context, connectedAccountID string) error {
if connectedAccountID == "" {
return errors.New("composio: RevokeConnection: connectedAccountID is required")
}
return c.do(c.newRequest(ctx),
http.MethodPost, "/connected_accounts/"+url.PathEscape(connectedAccountID)+"/revoke", nil)
}
// DeleteConnectedAccount removes the connection record from Composio. The
// provider tokens are NOT revoked by this call — call [Client.RevokeConnection]
// first if you need them invalidated upstream.
//
// Returns nil for 404 so callers can treat the operation as idempotent.
func (c *Client) DeleteConnectedAccount(ctx context.Context, connectedAccountID string) error {
if connectedAccountID == "" {
return errors.New("composio: DeleteConnectedAccount: connectedAccountID is required")
}
err := c.do(c.newRequest(ctx),
http.MethodDelete, "/connected_accounts/"+url.PathEscape(connectedAccountID), nil)
if err == nil {
return nil
}
var apiErr *APIError
if errors.As(err, &apiErr) && apiErr.IsNotFound() {
return nil
}
return err
}

View File

@@ -0,0 +1,56 @@
// Package composio is a small, standalone Go SDK for the Composio v3.1 REST API.
//
// It is intentionally self-contained: the only third-party dependency is
// [github.com/go-resty/resty/v2]. It does not import any Multica-specific
// package, so it can be reused by other Go services or extracted into its
// own module unchanged.
//
// # MVP surface
//
// The SDK targets the surface required by the Composio integration MVP
// (see MUL-3715 / MUL-3720). It is deliberately minimal — only the
// endpoints actually used by the first-stage product are wired up:
//
// - Connect Link — POST /connected_accounts/link
// - MCP Session — POST /tool_router/session
// - Connected Accounts — GET /connected_accounts,
// POST /connected_accounts/{id}/revoke,
// DELETE /connected_accounts/{id}
// - Toolkits — GET /toolkits, GET /toolkits/{slug}
// - Tool Execute — POST /tools/execute/{tool_slug}
// - Webhook — HMAC-SHA256 signature verification
//
// More surface (auth configs, triggers, proxy execute, etc.) can be
// added later without changing the existing types.
//
// # Quick start
//
// client, err := composio.NewClient(composio.Options{
// APIKey: os.Getenv("COMPOSIO_API_KEY"),
// })
// if err != nil { return err }
//
// link, err := client.CreateLink(ctx, composio.CreateLinkRequest{
// AuthConfigID: "ac_abc",
// UserID: "u_123",
// CallbackURL: "https://app.example.com/composio/callback",
// })
// // redirect user to link.RedirectURL
//
// session, err := client.CreateSession(ctx, composio.CreateSessionRequest{
// UserID: "u_123",
// })
// // agent runtime now consumes session.MCP.URL + composio.MCPAuthHeaders(...)
//
// # Errors
//
// All non-2xx responses come back as a *APIError carrying the upstream
// status, slug, and message. Transport errors come back unwrapped from
// resty so callers can errors.Is/As as usual.
//
// # Webhook verification
//
// [VerifyWebhook] verifies the HMAC-SHA256 signature Composio attaches
// to every webhook delivery, with a configurable replay tolerance.
// See https://docs.composio.dev/docs/setting-up-triggers/subscribing-to-events#verifying-signatures
package composio

View File

@@ -0,0 +1,88 @@
package composio
import (
"encoding/json"
"fmt"
"net/http"
)
// APIError is the canonical error returned by the SDK when Composio responds
// with a non-2xx HTTP status.
//
// The Composio error envelope as of v3.1 looks like:
//
// {
// "error": {
// "message": "...",
// "code": 400,
// "slug": "INVALID_INPUT",
// "status": 400,
// "request_id": "req_...",
// "suggested_fix":"...",
// "errors": ["..."]
// }
// }
//
// HTTPStatus is the transport status as observed locally; the rest mirrors
// the body if Composio returned one. RawBody is preserved verbatim so
// callers can log the full upstream response for debugging.
type APIError struct {
HTTPStatus int `json:"-"`
Message string `json:"message,omitempty"`
Code int `json:"code,omitempty"`
Slug string `json:"slug,omitempty"`
Status int `json:"status,omitempty"`
RequestID string `json:"request_id,omitempty"`
SuggestedFix string `json:"suggested_fix,omitempty"`
Errors []string `json:"errors,omitempty"`
RawBody []byte `json:"-"`
}
// Error implements error. It surfaces the upstream status, slug, and message.
func (e *APIError) Error() string {
if e == nil {
return ""
}
msg := e.Message
if msg == "" {
msg = http.StatusText(e.HTTPStatus)
}
if e.Slug != "" {
return fmt.Sprintf("composio: %d %s (%s)", e.HTTPStatus, msg, e.Slug)
}
return fmt.Sprintf("composio: %d %s", e.HTTPStatus, msg)
}
// IsNotFound reports whether the error is an HTTP 404 — useful for idempotent
// delete/revoke flows.
func (e *APIError) IsNotFound() bool { return e != nil && e.HTTPStatus == http.StatusNotFound }
// IsUnauthorized reports whether the error is an HTTP 401.
func (e *APIError) IsUnauthorized() bool {
return e != nil && e.HTTPStatus == http.StatusUnauthorized
}
// IsRateLimited reports whether the error is an HTTP 429.
func (e *APIError) IsRateLimited() bool {
return e != nil && e.HTTPStatus == http.StatusTooManyRequests
}
// parseAPIError decodes Composio's `{"error": {...}}` envelope. If the body
// is not the expected shape it returns an APIError carrying just HTTPStatus
// and RawBody so callers still see something useful.
func parseAPIError(status int, body []byte) *APIError {
out := &APIError{HTTPStatus: status, RawBody: body}
if len(body) == 0 {
return out
}
var wire struct {
Error APIError `json:"error"`
}
if err := json.Unmarshal(body, &wire); err != nil {
// Body is not the expected envelope — leave RawBody set, message empty.
return out
}
wire.Error.HTTPStatus = status
wire.Error.RawBody = body
return &wire.Error
}

View File

@@ -0,0 +1,99 @@
package composio
import (
"context"
"errors"
"net/http"
)
// --- Session creation ---------------------------------------------------
// CreateSessionRequest is the body of POST /tool_router/session.
//
// The minimum required field is [UserID]. Everything else is optional and
// maps directly to the v3.1 wire schema:
// https://docs.composio.dev/reference/api-reference/tool-router/postToolRouterSession
//
// The schema is intentionally typed loosely (map-based) for the nested
// `toolkits`, `auth_configs`, `tools`, `tags`, `multi_account`, etc. fields
// because they carry many child attributes and are expected to evolve.
// Callers can still construct strongly typed wrappers on top.
type CreateSessionRequest struct {
UserID string `json:"user_id"`
Toolkits map[string]any `json:"toolkits,omitempty"`
AuthConfigs map[string]any `json:"auth_configs,omitempty"`
ConnectedAccounts map[string]any `json:"connected_accounts,omitempty"`
ManageConnections *ManageConnections `json:"manage_connections,omitempty"`
Tools map[string]any `json:"tools,omitempty"`
Tags any `json:"tags,omitempty"`
Workbench map[string]any `json:"workbench,omitempty"`
MultiAccount map[string]any `json:"multi_account,omitempty"`
Preload map[string]any `json:"preload,omitempty"`
Search map[string]any `json:"search,omitempty"`
Execute map[string]any `json:"execute,omitempty"`
Experimental map[string]any `json:"experimental,omitempty"`
}
// ManageConnections is the typed flavor of the `manage_connections` object —
// the field used most often by integrations.
type ManageConnections struct {
Enable *bool `json:"enable,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
EnableWaitForConnections *bool `json:"enable_wait_for_connections,omitempty"`
EnableConnectionRemoval *bool `json:"enable_connection_removal,omitempty"`
}
// MCPDescriptor is the streamable HTTP entrypoint for the session's MCP.
type MCPDescriptor struct {
Type string `json:"type"`
URL string `json:"url"`
}
// CreateSessionResponse mirrors the subset of the upstream response the SDK
// currently exposes typed. Additional fields can be added without breaking
// callers.
type CreateSessionResponse struct {
SessionID string `json:"session_id"`
MCP MCPDescriptor `json:"mcp"`
ToolRouterTools []string `json:"tool_router_tools,omitempty"`
Config map[string]any `json:"config,omitempty"`
ConfigVersion int `json:"config_version,omitempty"`
Experimental map[string]any `json:"experimental,omitempty"`
Warnings []SessionWarning `json:"warnings,omitempty"`
}
// SessionWarning is a non-fatal warning emitted at session creation time.
type SessionWarning struct {
Code string `json:"code"`
Message string `json:"message"`
}
// CreateSession opens a new tool-router (a.k.a. MCP) session for the given
// user. The returned [CreateSessionResponse.MCP.URL] is the URL an
// MCP-compatible client connects to.
//
// Use [Client.MCPAuthHeaders] to obtain the matching headers — the SDK
// returns these separately rather than baking them into the response so
// that callers don't accidentally leak the secret API key through logs.
func (c *Client) CreateSession(ctx context.Context, req CreateSessionRequest) (*CreateSessionResponse, error) {
if req.UserID == "" {
return nil, errors.New("composio: CreateSession: UserID is required")
}
var out CreateSessionResponse
if err := c.do(c.newRequest(ctx).SetBody(req), http.MethodPost, "/tool_router/session", &out); err != nil {
return nil, err
}
return &out, nil
}
// MCPAuthHeaders returns the headers an MCP client must send when connecting
// to a session URL produced by [Client.CreateSession].
//
// Composio authenticates MCP streaming the same way it authenticates the
// REST API — with the project's `x-api-key` header. Keeping this as a
// dedicated helper makes it explicit at the call site that bearer
// material is leaving the SDK boundary, so callers can route it through
// their secret-redact pipeline (see server/pkg/redact).
func (c *Client) MCPAuthHeaders() map[string]string {
return c.APIKeyHeader()
}

View File

@@ -0,0 +1,76 @@
package composio
import (
"context"
"errors"
"net/http"
"net/url"
"strconv"
)
// Toolkit is the minimal toolkit descriptor used as a nested field inside
// connected accounts, sessions, and the toolkit list endpoint. Only fields
// useful for UI / dispatch decisions are typed.
type Toolkit struct {
Slug string `json:"slug"`
Name string `json:"name,omitempty"`
LogoURL string `json:"logo,omitempty"`
Description string `json:"description,omitempty"`
Categories []string `json:"categories,omitempty"`
AuthSchemes []string `json:"auth_schemes,omitempty"`
Meta map[string]any `json:"meta,omitempty"`
}
// ListToolkitsRequest carries the optional filters of GET /toolkits.
type ListToolkitsRequest struct {
Category string
Limit int
Cursor string
SortBy string // "popular" (default) | "alphabetical"
}
// ListToolkitsResponse is the typed paginated response.
type ListToolkitsResponse struct {
Items []Toolkit `json:"items"`
NextCursor string `json:"next_cursor,omitempty"`
TotalItems int `json:"total_items,omitempty"`
}
// ListToolkits returns toolkits available to the project.
func (c *Client) ListToolkits(ctx context.Context, req ListToolkitsRequest) (*ListToolkitsResponse, error) {
q := url.Values{}
if req.Category != "" {
q.Set("category", req.Category)
}
if req.Limit > 0 {
q.Set("limit", strconv.Itoa(req.Limit))
}
if req.Cursor != "" {
q.Set("cursor", req.Cursor)
}
if req.SortBy != "" {
q.Set("sort_by", req.SortBy)
}
path := "/toolkits"
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
var out ListToolkitsResponse
if err := c.do(c.newRequest(ctx), http.MethodGet, path, &out); err != nil {
return nil, err
}
return &out, nil
}
// GetToolkit fetches a single toolkit by its slug (e.g. "notion", "github").
func (c *Client) GetToolkit(ctx context.Context, slug string) (*Toolkit, error) {
if slug == "" {
return nil, errors.New("composio: GetToolkit: slug is required")
}
var out Toolkit
if err := c.do(c.newRequest(ctx),
http.MethodGet, "/toolkits/"+url.PathEscape(slug), &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -0,0 +1,67 @@
package composio
import (
"context"
"errors"
"net/http"
"net/url"
)
// ExecuteToolRequest is the body for POST /tools/execute/{tool_slug}.
//
// Spec: https://docs.composio.dev/reference/api-reference/tools/postToolsExecuteByToolSlug
//
// Either ConnectedAccountID or (UserID + the tool's toolkit) is required so
// Composio knows which credential set to use. The SDK does not enforce that
// invariant up front; the upstream returns a 422 with a clear message when
// missing.
type ExecuteToolRequest struct {
// Arguments is the structured input to the tool. Shape varies per tool.
Arguments map[string]any `json:"arguments,omitempty"`
// ConnectedAccountID pins execution to a specific connected account.
ConnectedAccountID string `json:"connected_account_id,omitempty"`
// UserID lets Composio resolve the connected account by user when
// the caller does not have the explicit `ca_` id handy.
UserID string `json:"user_id,omitempty"`
// ToolkitVersions controls how Composio resolves the tool definition.
// Pass "latest" or a dated version like "20251027_00". The Composio
// docs note that manual execution requires an explicit version.
ToolkitVersions string `json:"toolkit_versions,omitempty"`
// AllowTracing flips Composio-side tracing on for this call.
AllowTracing bool `json:"allow_tracing,omitempty"`
}
// ExecuteToolResponse is the typed result. The upstream wire shape varies by
// tool, so [Data] is intentionally generic; callers cast to whatever the
// tool's documented output schema looks like.
type ExecuteToolResponse struct {
Successful bool `json:"successful"`
Data map[string]any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
LogID string `json:"log_id,omitempty"`
SessionInfo map[string]any `json:"session_info,omitempty"`
}
// ExecuteTool calls a Composio tool by its slug
// (SCREAMING_SNAKE_CASE, e.g. GITHUB_CREATE_ISSUE).
//
// This is the deterministic backend path — it skips MCP/session orchestration
// and is the right call for fixed flows like autopilots or built-in skills.
func (c *Client) ExecuteTool(ctx context.Context, toolSlug string, req ExecuteToolRequest) (*ExecuteToolResponse, error) {
if toolSlug == "" {
return nil, errors.New("composio: ExecuteTool: toolSlug is required")
}
if req.ConnectedAccountID == "" && req.UserID == "" {
return nil, errors.New("composio: ExecuteTool: either ConnectedAccountID or UserID must be set")
}
var out ExecuteToolResponse
if err := c.do(c.newRequest(ctx).SetBody(req),
http.MethodPost, "/tools/execute/"+url.PathEscape(toolSlug), &out); err != nil {
return nil, err
}
return &out, nil
}

View File

@@ -0,0 +1,191 @@
package composio
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// Webhook header names Composio sets on every delivery.
const (
HeaderWebhookID = "webhook-id"
HeaderWebhookTimestamp = "webhook-timestamp"
HeaderWebhookSignature = "webhook-signature"
)
// DefaultWebhookTolerance is the default replay window — matches the
// official Composio SDKs (300 s, see Composio webhook docs).
const DefaultWebhookTolerance = 300 * time.Second
// Sentinel errors returned by [VerifyWebhook] so callers can distinguish
// the failure mode with errors.Is.
var (
ErrMissingWebhookHeaders = errors.New("composio: missing webhook headers")
ErrInvalidWebhookSignature = errors.New("composio: invalid webhook signature")
ErrWebhookTimestampStale = errors.New("composio: webhook timestamp outside tolerance")
ErrWebhookSecretMissing = errors.New("composio: webhook secret is empty")
)
// WebhookHeaders carries the three headers that participate in the signature
// computation. Pass these straight from the inbound HTTP request.
type WebhookHeaders struct {
ID string
Timestamp string
Signature string
}
// HeadersFromHTTP pulls the three webhook headers off an http.Header.
// It is case-insensitive (http.Header normalizes its keys).
func HeadersFromHTTP(h http.Header) WebhookHeaders {
return WebhookHeaders{
ID: h.Get(HeaderWebhookID),
Timestamp: h.Get(HeaderWebhookTimestamp),
Signature: h.Get(HeaderWebhookSignature),
}
}
// VerifyOptions tweaks [VerifyWebhook]. Zero values mean defaults.
type VerifyOptions struct {
// Tolerance is how far the webhook-timestamp may drift from `now`.
// Zero means [DefaultWebhookTolerance]; a negative value disables the
// check entirely (useful only for replaying historical deliveries in
// tests).
Tolerance time.Duration
// Now overrides the wall clock used for the tolerance check.
// Tests use this; production should leave it nil.
Now func() time.Time
}
// VerifyWebhook checks the HMAC-SHA256 signature attached by Composio to
// every webhook delivery and enforces a replay-window tolerance.
//
// The signing string is constructed as
//
// "<webhook-id>.<webhook-timestamp>.<rawBody>"
//
// and HMAC-SHA256'd with secret. The result is base64 encoded.
//
// Composio's `webhook-signature` header is a comma-separated list of
// `<version>,<signature>` pairs (e.g. `v1,abc123…`); this function accepts
// any of them whose version starts with "v" so future-proofs work.
//
// secret must be the value from the matching webhook subscription —
// fetch via the Composio dashboard or the
// `GET /webhook_subscriptions/{id}` endpoint.
func VerifyWebhook(secret string, headers WebhookHeaders, rawBody []byte, opts VerifyOptions) error {
if secret == "" {
return ErrWebhookSecretMissing
}
if headers.ID == "" || headers.Timestamp == "" || headers.Signature == "" {
return ErrMissingWebhookHeaders
}
tolerance := opts.Tolerance
if tolerance == 0 {
tolerance = DefaultWebhookTolerance
}
if tolerance > 0 {
ts, err := strconv.ParseInt(headers.Timestamp, 10, 64)
if err != nil {
// Composio's docs show timestamps as Unix seconds, but allow a
// fallback in case future deliveries use RFC3339.
t, terr := time.Parse(time.RFC3339, headers.Timestamp)
if terr != nil {
return fmt.Errorf("composio: invalid webhook-timestamp %q: %w", headers.Timestamp, err)
}
ts = t.Unix()
}
now := time.Now().UTC()
if opts.Now != nil {
now = opts.Now().UTC()
}
delta := now.Sub(time.Unix(ts, 0))
if delta < 0 {
delta = -delta
}
if delta > tolerance {
return fmt.Errorf("%w: drift=%s tolerance=%s", ErrWebhookTimestampStale, delta, tolerance)
}
}
signingString := headers.ID + "." + headers.Timestamp + "." + string(rawBody)
mac := hmac.New(sha256.New, []byte(secret))
_, _ = mac.Write([]byte(signingString))
expected := base64.StdEncoding.EncodeToString(mac.Sum(nil))
// Composio's header takes the form "v1,<sig>[ v2,<sig> ...]" — accept
// any version-tagged signature plus the bare-base64 form for forward-
// compat.
candidates := strings.Fields(strings.ReplaceAll(headers.Signature, ",", " "))
if len(candidates) == 0 {
return ErrInvalidWebhookSignature
}
want := []byte(expected)
for _, cand := range candidates {
// Skip version tags like "v1" / "v2".
if len(cand) <= 3 && strings.HasPrefix(cand, "v") {
continue
}
if hmac.Equal([]byte(cand), want) {
return nil
}
}
return ErrInvalidWebhookSignature
}
// VerifyHTTPRequest is a convenience wrapper that reads & verifies an
// inbound *http.Request in one call. It consumes the body and returns it
// to the caller so the handler can json-decode after a successful verify.
//
// On error the returned body slice is still populated (when read succeeded)
// so handlers can choose to log it.
func VerifyHTTPRequest(secret string, r *http.Request, opts VerifyOptions) ([]byte, error) {
if r == nil || r.Body == nil {
return nil, errors.New("composio: VerifyHTTPRequest: request body is nil")
}
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("composio: read webhook body: %w", err)
}
_ = r.Body.Close()
if verr := VerifyWebhook(secret, HeadersFromHTTP(r.Header), body, opts); verr != nil {
return body, verr
}
return body, nil
}
// --- Event envelope -----------------------------------------------------
// EventEnvelope is the V3 webhook payload as documented by Composio.
//
// Spec: https://docs.composio.dev/docs/setting-up-triggers/subscribing-to-events#webhook-payload-versions
//
// The `data` and `metadata` blocks vary per event; they stay as
// json.RawMessage so callers can decode into a strongly-typed struct
// matching whatever Type they care about.
type EventEnvelope struct {
ID string `json:"id"`
Type string `json:"type"`
Metadata json.RawMessage `json:"metadata,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
}
// ParseEvent decodes a V3 envelope. It does NOT verify the signature —
// always call [VerifyWebhook] / [VerifyHTTPRequest] first.
func ParseEvent(rawBody []byte) (*EventEnvelope, error) {
var out EventEnvelope
if err := json.Unmarshal(rawBody, &out); err != nil {
return nil, fmt.Errorf("composio: parse webhook envelope: %w", err)
}
return &out, nil
}

View File

@@ -0,0 +1,254 @@
package composio_test
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"errors"
"io"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
"github.com/multica-ai/multica/server/pkg/composio"
)
// helper: produce a valid signature for the given inputs.
func sign(secret, id, ts, body string) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(id + "." + ts + "." + body))
return base64.StdEncoding.EncodeToString(mac.Sum(nil))
}
func TestVerifyWebhook_Success(t *testing.T) {
secret := "shh"
body := `{"id":"evt_1","type":"composio.connected_account.expired"}`
id := "msg_abc"
ts := strconv.FormatInt(time.Now().Unix(), 10)
sig := sign(secret, id, ts, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: "v1," + sig,
}, []byte(body), composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyWebhook: %v", err)
}
}
func TestVerifyWebhook_AcceptsBareSignature(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_b"
ts := strconv.FormatInt(time.Now().Unix(), 10)
sig := sign(secret, id, ts, body)
// No version prefix: just the raw base64
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: sig,
}, []byte(body), composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyWebhook bare: %v", err)
}
}
func TestVerifyWebhook_AcceptsMultipleVersions(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_c"
ts := strconv.FormatInt(time.Now().Unix(), 10)
good := sign(secret, id, ts, body)
bad := "AAAA" + good[4:]
// One bad sig, one good sig — verify should still pass.
hdr := "v2," + bad + " v1," + good
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: hdr,
}, []byte(body), composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyWebhook multi: %v", err)
}
}
func TestVerifyWebhook_RejectsTamperedBody(t *testing.T) {
secret := "shh"
body := `{"data":"original"}`
id := "msg_d"
ts := strconv.FormatInt(time.Now().Unix(), 10)
sig := sign(secret, id, ts, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: "v1," + sig,
}, []byte(`{"data":"tampered"}`), composio.VerifyOptions{})
if !errors.Is(err, composio.ErrInvalidWebhookSignature) {
t.Fatalf("expected ErrInvalidWebhookSignature, got %v", err)
}
}
func TestVerifyWebhook_RejectsStaleTimestamp(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_e"
old := time.Now().Add(-10 * time.Minute).Unix()
ts := strconv.FormatInt(old, 10)
sig := sign(secret, id, ts, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: "v1," + sig,
}, []byte(body), composio.VerifyOptions{Tolerance: 5 * time.Minute})
if !errors.Is(err, composio.ErrWebhookTimestampStale) {
t.Fatalf("expected ErrWebhookTimestampStale, got %v", err)
}
}
func TestVerifyWebhook_NegativeToleranceDisablesCheck(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_f"
ts := "1" // ancient
sig := sign(secret, id, ts, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: "v1," + sig,
}, []byte(body), composio.VerifyOptions{Tolerance: -1})
if err != nil {
t.Fatalf("VerifyWebhook negative tolerance: %v", err)
}
}
func TestVerifyWebhook_HonorsCustomNow(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_g"
ts := "1700000000"
sig := sign(secret, id, ts, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: ts, Signature: "v1," + sig,
}, []byte(body), composio.VerifyOptions{
Tolerance: 5 * time.Second,
Now: func() time.Time { return time.Unix(1700000003, 0) },
})
if err != nil {
t.Fatalf("expected fresh timestamp, got %v", err)
}
}
func TestVerifyWebhook_MissingHeaders(t *testing.T) {
err := composio.VerifyWebhook("shh", composio.WebhookHeaders{}, []byte(`{}`), composio.VerifyOptions{})
if !errors.Is(err, composio.ErrMissingWebhookHeaders) {
t.Fatalf("expected ErrMissingWebhookHeaders, got %v", err)
}
}
func TestVerifyWebhook_EmptySecret(t *testing.T) {
err := composio.VerifyWebhook("", composio.WebhookHeaders{
ID: "x", Timestamp: "1", Signature: "v1,xyz",
}, []byte(`{}`), composio.VerifyOptions{})
if !errors.Is(err, composio.ErrWebhookSecretMissing) {
t.Fatalf("expected ErrWebhookSecretMissing, got %v", err)
}
}
func TestVerifyWebhook_AcceptsRFC3339Timestamp(t *testing.T) {
secret := "shh"
body := `{}`
id := "msg_h"
now := time.Now().UTC().Format(time.RFC3339)
sig := sign(secret, id, now, body)
err := composio.VerifyWebhook(secret, composio.WebhookHeaders{
ID: id, Timestamp: now, Signature: "v1," + sig,
}, []byte(body), composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyWebhook rfc3339: %v", err)
}
}
func TestVerifyHTTPRequest_HappyPath(t *testing.T) {
secret := "shh"
body := `{"x":1}`
id := "msg_req"
ts := strconv.FormatInt(time.Now().Unix(), 10)
sig := sign(secret, id, ts, body)
r := httptest.NewRequest(http.MethodPost, "/webhook", bytes.NewReader([]byte(body)))
r.Header.Set(composio.HeaderWebhookID, id)
r.Header.Set(composio.HeaderWebhookTimestamp, ts)
r.Header.Set(composio.HeaderWebhookSignature, "v1,"+sig)
got, err := composio.VerifyHTTPRequest(secret, r, composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyHTTPRequest: %v", err)
}
if string(got) != body {
t.Errorf("body roundtrip mismatch: %q vs %q", got, body)
}
}
func TestVerifyHTTPRequest_ReturnsBodyOnFailure(t *testing.T) {
body := `{"x":1}`
r := httptest.NewRequest(http.MethodPost, "/webhook", bytes.NewReader([]byte(body)))
r.Header.Set(composio.HeaderWebhookID, "id")
r.Header.Set(composio.HeaderWebhookTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
r.Header.Set(composio.HeaderWebhookSignature, "v1,deadbeef")
got, err := composio.VerifyHTTPRequest("shh", r, composio.VerifyOptions{})
if err == nil {
t.Fatal("expected error")
}
if string(got) != body {
t.Errorf("expected body returned for logging, got %q", got)
}
}
func TestVerifyHTTPRequest_NilBody(t *testing.T) {
r := &http.Request{}
_, err := composio.VerifyHTTPRequest("shh", r, composio.VerifyOptions{})
if err == nil {
t.Fatal("expected error for nil body")
}
}
// Sanity check: io.ReadAll still gets the same body bytes via our helper.
func TestVerifyHTTPRequest_BodyReadFully(t *testing.T) {
body := "{}"
r := httptest.NewRequest(http.MethodPost, "/", io.NopCloser(bytes.NewReader([]byte(body))))
ts := strconv.FormatInt(time.Now().Unix(), 10)
r.Header.Set(composio.HeaderWebhookID, "id")
r.Header.Set(composio.HeaderWebhookTimestamp, ts)
r.Header.Set(composio.HeaderWebhookSignature, "v1,"+sign("shh", "id", ts, body))
got, err := composio.VerifyHTTPRequest("shh", r, composio.VerifyOptions{})
if err != nil {
t.Fatalf("VerifyHTTPRequest: %v", err)
}
if string(got) != body {
t.Errorf("body = %q, want %q", got, body)
}
}
// ---------------------------------------------------------------------------
// Event envelope
// ---------------------------------------------------------------------------
func TestParseEvent_V3Envelope(t *testing.T) {
raw := []byte(`{
"id": "evt_1",
"type": "composio.connected_account.expired",
"metadata": {"project_id":"pr_a","user_id":"u_1"},
"data": {"id":"ca_1","status":"EXPIRED"},
"timestamp": "2026-02-06T12:00:00Z"
}`)
ev, err := composio.ParseEvent(raw)
if err != nil {
t.Fatalf("ParseEvent: %v", err)
}
if ev.ID != "evt_1" || ev.Type != "composio.connected_account.expired" {
t.Errorf("unexpected envelope: %+v", ev)
}
if !bytes.Contains(ev.Data, []byte(`"EXPIRED"`)) {
t.Errorf("data lost: %s", ev.Data)
}
}
func TestParseEvent_RejectsGarbage(t *testing.T) {
if _, err := composio.ParseEvent([]byte(`not-json`)); err == nil {
t.Error("expected error for non-JSON body")
}
}