Skip to main content

Caller Interfaces

All AI provider implementations are registered in api/integration-api/internal/caller/callers.go. There are four interfaces a provider can implement:
// api/integration-api/internal/caller/callers.go

// LargeLanguageCaller is required for all LLM providers.
type LargeLanguageCaller interface {
    // Non-streaming: returns the full response as a single message.
    GetChatCompletion(
        ctx         context.Context,
        allMessages []*protos.Message,
        options     *ChatCompletionOptions,
    ) (*protos.Message, []*protos.Metric, error)

    // Streaming: delivers tokens incrementally via onStream callback.
    // onMetrics is called once when the stream completes.
    // onError is called on failure.
    StreamChatCompletion(
        ctx         context.Context,
        allMessages []*protos.Message,
        options     *ChatCompletionOptions,
        onStream    func(requestID string, msg *protos.Message) error,
        onMetrics   func(requestID string, msg *protos.Message, metrics []*protos.Metric) error,
        onError     func(requestID string, err error),
    ) error
}

// EmbeddingCaller is implemented by providers that support vector embeddings.
type EmbeddingCaller interface {
    GetEmbedding(
        ctx     context.Context,
        content map[int32]string,
        options *EmbeddingOptions,
    ) ([]*protos.Embedding, []*protos.Metric, error)
}

// RerankingCaller is implemented by providers that support result reranking.
type RerankingCaller interface {
    GetReranking(
        ctx     context.Context,
        query   string,
        content map[int32]string,
        options *RerankerOptions,
    ) ([]*protos.Reranking, []*protos.Metric, error)
}

// Verifier validates a credential before it is saved to the vault.
type Verifier interface {
    CredentialVerifier(
        ctx     context.Context,
        options *CredentialVerifierOptions,
    ) (*string, error)
}

ChatCompletionOptions

ChatCompletionOptions is passed to both GetChatCompletion and StreamChatCompletion. It carries the request context, model parameters, tool definitions, and observability hooks.
// api/integration-api/internal/caller/chat_options.go

type ChatCompletionOptions struct {
    AIOptions
    Request         *protos.ChatRequest
    ToolDefinitions []*ToolDefinition
}

type AIOptions struct {
    RequestId      uint64
    ModelParameter map[string]interface{}  // keyed by "model.*" prefix
    PreHook        func(map[string]interface{})
    PostHook       func(map[string]interface{}, []*protos.Metric)
}

Model Parameters

Model parameters are passed in ChatCompletionOptions.ModelParameter as a map[string]interface{} keyed by "model.*" strings. The OpenAI caller maps these keys as follows:
KeyTypeDescription
model.namestringModel ID (e.g. gpt-4o, gpt-3.5-turbo)
model.temperaturefloat64Sampling temperature (0.0–2.0)
model.top_pfloat64Nucleus sampling
model.max_completion_tokensint64Max tokens to generate
model.frequency_penaltyfloat64Frequency penalty (-2.0–2.0)
model.presence_penaltyfloat64Presence penalty (-2.0–2.0)
model.seedint64Deterministic sampling seed
model.stopstringComma-separated stop sequences
model.tool_choicestringauto · required · none
model.response_formatJSON{"type": "json_object"} · {"type": "json_schema", ...}
model.reasoning_effortstringOpenAI reasoning effort (o-series models)
model.service_tierstringOpenAI service tier
model.userstringUser identifier for abuse detection

Tool Definitions

Function tool definitions are passed via ChatCompletionOptions.ToolDefinitions:
type ToolDefinition struct {
    Type     string              // "function"
    Function *FunctionDefinition
}

type FunctionDefinition struct {
    Name        string
    Description string
    Parameters  *FunctionParameter
}

type FunctionParameter struct {
    Type       string                            // "object"
    Properties map[string]FunctionParameterProperty
    Required   []string
}

type FunctionParameterProperty struct {
    Type        string
    Description string
    Enum        []*string
    Items       map[string]interface{}  // for array types
}

Reference Implementation — OpenAI

The OpenAI caller is the canonical reference. All other providers follow the same structure.
// api/integration-api/internal/caller/openai/openai.go

type OpenAI struct {
    logger     commons.Logger
    credential CredentialResolver  // func() map[string]interface{}
}

func openAI(logger commons.Logger, credential *protos.Credential) OpenAI {
    _credential := credential.GetValue().AsMap()
    return OpenAI{
        logger: logger,
        credential: func() map[string]interface{} {
            return _credential
        },
    }
}

func (openAI *OpenAI) GetClient() (*openai.Client, error) {
    credentials := openAI.credential()
    cx, ok := credentials["key"]  // "key" is the vault credential map key
    if !ok {
        return nil, errors.New("unable to resolve the credential")
    }
    clt := openai.NewClient(option.WithAPIKey(cx.(string)))
    return &clt, nil
}
The credential map is populated from the vault credential stored in PostgreSQL. The key "key" maps to the API key field in the credential form.
// api/integration-api/internal/caller/openai/llm.go

func (llc *largeLanguageCaller) StreamChatCompletion(
    ctx         context.Context,
    allMessages []*protos.Message,
    options     *internal_callers.ChatCompletionOptions,
    onStream    func(string, *protos.Message) error,
    onMetrics   func(string, *protos.Message, []*protos.Metric) error,
    onError     func(string, error),
) error {
    client, err := llc.GetClient()
    // ...

    // Build OpenAI request from model parameters
    completionsOptions := llc.getChatCompletionOptions(options)
    completionsOptions.Messages = llc.BuildHistory(allMessages)

    // Get streaming response
    resp := client.Chat.Completions.NewStreaming(ctx, completionsOptions)
    defer resp.Close()

    accumulate := openai.ChatCompletionAccumulator{}
    for resp.Next() {
        chatCompletions := resp.Current()
        accumulate.AddChunk(chatCompletions)

        // Tool call detection
        if tool, ok := accumulate.JustFinishedToolCall(); ok {
            assistantMsg.ToolCalls = append(assistantMsg.ToolCalls, &protos.ToolCall{...})
        }

        // Stream content tokens
        for _, choice := range chatCompletions.Choices {
            content := choice.Delta.Content
            if content != "" && !hasToolCalls {
                tokenMsg := &protos.Message{Role: "assistant", ...}
                onStream(options.Request.GetRequestId(), tokenMsg)
            }
        }
    }

    // Emit final metrics (input tokens, output tokens, TTFT)
    onMetrics(options.Request.GetRequestId(), protoMsg, metrics.Build())
    return nil
}
Key implementation points:
  • Tokens are streamed via onStream as they arrive — each call delivers one delta
  • Tool calls are accumulated and emitted when complete via accumulate.JustFinishedToolCall()
  • Metrics (input/output tokens, first-token time) are emitted once via onMetrics when streaming is complete

Adding a New LLM Provider

1

Create the provider directory

mkdir api/integration-api/internal/caller/<provider>
Minimum required files:
FilePurpose
<provider>.goClient struct, credential extraction, GetClient()
llm.goLargeLanguageCaller implementation
verify-credential.goVerifier implementation
Optional:
FilePurpose
embedding.goEmbeddingCaller implementation
2

Implement the base client struct

// <provider>.go
package internal_myprovider_callers

import (
    internal_callers "github.com/rapidaai/api/integration-api/internal/caller"
    "github.com/rapidaai/pkg/commons"
    "github.com/rapidaai/protos"
)

type MyProvider struct {
    logger     commons.Logger
    credential internal_callers.CredentialResolver
}

func myProvider(logger commons.Logger, credential *protos.Credential) MyProvider {
    _credential := credential.GetValue().AsMap()
    return MyProvider{
        logger: logger,
        credential: func() map[string]interface{} {
            return _credential
        },
    }
}

func (p *MyProvider) GetClient() (*myprovider.Client, error) {
    credentials := p.credential()
    apiKey, ok := credentials["key"]
    if !ok {
        return nil, errors.New("unable to resolve credential")
    }
    return myprovider.NewClient(apiKey.(string)), nil
}
3

Implement StreamChatCompletion

// llm.go
type largeLanguageCaller struct {
    MyProvider
}

func NewLargeLanguageCaller(
    logger     commons.Logger,
    credential *protos.Credential,
) internal_callers.LargeLanguageCaller {
    return &largeLanguageCaller{MyProvider: myProvider(logger, credential)}
}

func (llc *largeLanguageCaller) StreamChatCompletion(
    ctx      context.Context,
    messages []*protos.Message,
    options  *internal_callers.ChatCompletionOptions,
    onStream  func(string, *protos.Message) error,
    onMetrics func(string, *protos.Message, []*protos.Metric) error,
    onError   func(string, error),
) error {
    client, err := llc.GetClient()
    if err != nil {
        onError(options.Request.GetRequestId(), err)
        return err
    }

    // Build provider-specific request from options.ModelParameter
    modelName, _ := options.ModelParameter["model.name"].(string)

    // Stream from provider
    stream, err := client.ChatStream(ctx, modelName, llc.buildMessages(messages))
    if err != nil {
        onError(options.Request.GetRequestId(), err)
        return err
    }

    for token := range stream.Tokens() {
        msg := &protos.Message{
            Role: "assistant",
            Message: &protos.Message_Assistant{
                Assistant: &protos.AssistantMessage{
                    Contents: []string{token},
                },
            },
        }
        if err := onStream(options.Request.GetRequestId(), msg); err != nil {
            llc.logger.Warnf("stream send error: %v", err)
        }
    }

    // Emit final metrics
    onMetrics(options.Request.GetRequestId(), finalMsg, metrics)
    return nil
}
4

Register in callers.go

Open api/integration-api/internal/caller/callers.go and add a case to the router that creates the caller:
// In the provider registration switch (exact location varies by version):
case "my-provider":
    return internal_myprovider.NewLargeLanguageCaller(logger, credential), nil
No changes to any other service are required.
5

Add credential form (optional)

To support credential validation before save, implement Verifier:
// verify-credential.go
type credentialVerifier struct {
    MyProvider
}

func NewVerifier(logger commons.Logger, credential *protos.Credential) internal_callers.Verifier {
    return &credentialVerifier{MyProvider: myProvider(logger, credential)}
}

func (v *credentialVerifier) CredentialVerifier(
    ctx     context.Context,
    options *internal_callers.CredentialVerifierOptions,
) (*string, error) {
    client, err := v.GetClient()
    if err != nil {
        return nil, err
    }
    // Make a minimal API call to verify the key is valid
    modelList, err := client.ListModels(ctx)
    if err != nil {
        return nil, fmt.Errorf("credential invalid: %w", err)
    }
    result := modelList[0].ID
    return &result, nil
}

Metrics

Every caller emits structured metrics via onMetrics. These are stored in PostgreSQL and visible in the dashboard activity logs.
Metric NameDescription
INPUT_TOKENPrompt tokens consumed
OUTPUT_TOKENCompletion tokens generated
TOTAL_TOKENTotal tokens (input + output)
FIRST_TOKEN_RECIEVED_TIMETime from request start to first token (TTFT)

Next Steps