ringchain

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2025 License: MIT Imports: 8 Imported by: 0

README

Ringchain

A powerful graph-based framework for building and executing agent workflows concurrently in Go. Ringchain makes it easy to create complex, multi-step workflows that can be run efficiently.

Features

  • Directed Acyclic Graphs: Build workflows as DAGs with nodes and edges
  • Concurrent Execution: Automatically parallelize independent nodes
  • Data Flow Management: Pass data seamlessly between workflow steps
  • Cycle Detection: Prevent infinite loops with built-in cycle detection
  • Configurable Workers: Control concurrency with customizable worker pools

Installation

go get github.com/dskart/gollum/ringchain

Quick Start

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/dskart/gollum/ringchain"
	"go.uber.org/zap"
)

// Define custom nodes
type GreetingNode struct{}

func (n *GreetingNode) Name() string {
	return "greeting"
}

func (n *GreetingNode) Run(ctx context.Context, logger *zap.Logger, args map[string]any) (map[string]any, error) {
	name := args["name"].(string)
	return map[string]any{
		"greeting": fmt.Sprintf("Hello, %s!", name),
	}, nil
}

type EnhancerNode struct{}

func (n *EnhancerNode) Name() string {
	return "enhancer"
}

func (n *EnhancerNode) Run(ctx context.Context, logger *zap.Logger, args map[string]any) (map[string]any, error) {
	greeting := args["greeting"].(string)
	return map[string]any{
		"enhanced": fmt.Sprintf("%s How are you today?", greeting),
	}, nil
}

func main() {
	// Create a new graph
	graph := ringchain.NewGraph()

	// Add nodes
	graph.AddNode("greeting", &GreetingNode{})
	graph.AddNode("enhancer", &EnhancerNode{})

	// Connect nodes
	graph.AddEdge("greeting", "enhancer")

	// Create a logger
	logger, _ := zap.NewProduction()
	defer logger.Sync()

	// Execute the graph
	results, err := graph.Execute(
		context.Background(),
		logger,
		map[string]any{"name": "World"},
		ringchain.WithNumWorkers(2),
	)

	if err != nil {
		fmt.Printf("Error executing graph: %v\n", err)
		os.Exit(1)
	}

	// Print the results from each node
	for nodeHash, result := range results {
		fmt.Printf("Node %s output: %v\n", nodeHash, result)
	}
}

Building Complex Workflows

You can build more complex workflows by adding multiple nodes and connections:

// Create a new graph
graph := ringchain.NewGraph()

// Add nodes
graph.AddNode("data_retrieval", &DataRetrievalNode{})
graph.AddNode("data_processing", &DataProcessingNode{})
graph.AddNode("data_analysis", &DataAnalysisNode{})
graph.AddNode("report_generation", &ReportGenerationNode{})

// Connect nodes
graph.AddEdge("data_retrieval", "data_processing")
graph.AddEdge("data_processing", "data_analysis")
graph.AddEdge("data_analysis", "report_generation")

// Execute the graph
results, err := graph.Execute(
    context.Background(),
    logger,
    initialArgs,
)

Parallel Execution

Ringchain automatically executes independent nodes in parallel:

// Create a new graph with parallel branches
graph := ringchain.NewGraph()

// Add nodes
graph.AddNode("start", &StartNode{})
graph.AddNode("branch_a", &BranchANode{})
graph.AddNode("branch_b", &BranchBNode{})
graph.AddNode("combine", &CombineNode{})

// Connect nodes
graph.AddEdge("start", "branch_a")
graph.AddEdge("start", "branch_b")
graph.AddEdge("branch_a", "combine")
graph.AddEdge("branch_b", "combine")

// Execute with multiple workers
results, err := graph.Execute(
    context.Background(),
    logger,
    initialArgs,
    ringchain.WithNumWorkers(4),
)

Integration with GoLLuM

This package works seamlessly with other GoLLuM modules:

  • Use with OpenAI for API communication
  • Use with Scrolls for prompt templating and management

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNodeNotFound      = errors.New("vertex not found")
	ErrNodeAlreadyExists = errors.New("vertex already exists")
	ErrEdgeNotFound      = errors.New("edge not found")
	ErrEdgeAlreadyExists = errors.New("edge already exists")
	ErrEdgeCreatesCycle  = errors.New("edge would create a cycle")
	ErrNodeHasEdges      = errors.New("vertex has edges")
	ErrGraphNotInit      = errors.New("graph not Init()")
)

Functions

func OpenAiFunctions

func OpenAiFunctions(tools []Tool) []openai.Tool

func WithNumWorkers

func WithNumWorkers(n int) func(*GraphExecuteOptions)

Types

type Edge

type Edge struct {
	SourceHash string
	TargetHash string
}

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph() *Graph

func (*Graph) AddEdge

func (g *Graph) AddEdge(sourceHash, targetHash string) error

func (*Graph) AddNode

func (g *Graph) AddNode(name string, node Node) error

func (*Graph) Edge

func (g *Graph) Edge(sourceHash string, targetHash string) (Edge, error)

func (*Graph) Execute

func (g *Graph) Execute(ctx context.Context, logger *zap.Logger, args map[string]any, opts ...func(*GraphExecuteOptions)) (map[string]map[string]any, error)

Execute executes the graph starting from the given node. You need to call Init before calling Execute. This method is safe to call concurrently but might break if the graph is modified while executing.

func (*Graph) Node

func (g *Graph) Node(hash string) (Node, error)

func (*Graph) PredecessorMap

func (g *Graph) PredecessorMap() (map[string]map[string]Edge, error)

func (*Graph) SuccessorMap

func (g *Graph) SuccessorMap() (map[string]map[string]Edge, error)

type GraphExecuteOptions

type GraphExecuteOptions struct {
	NumWorkers int
}

type Node

type Node interface {
	Name() string

	// Run executes the node with the given arguments and returns the result.
	// If you want to copy the args map into the result, use maps.Clone(args).
	Run(ctx context.Context, logger *zap.Logger, args map[string]any) (map[string]any, error)
}

type Store

type Store interface {
	// AddNode should add the given node with the given hash value and node properties to the
	// graph. If the node already exists, it is up to you whether ErrNodeAlreadyExists or no
	// error should be returned.
	AddNode(hash string, value Node) error

	// Node should return the node with the given hash value. If the
	// node doesn't exist, ErrNodeNotFound should be returned.
	Node(hash string) (Node, error)

	// RemoveNode should remove the node with the given hash value. If the node doesn't
	// exist, ErrNodeNotFound should be returned. If the node has edges to other vertices,
	// ErrNodeHasEdges should be returned.
	RemoveNode(hash string) error

	// ListNodes should return all vertices in the graph in a slice.
	ListNodes() ([]string, error)

	// NodeCount should return the number of vertices in the graph. This should be equal to the
	// length of the slice returned by ListNodes.
	NodeCount() (int, error)

	// AddEdge should add an edge between the vertices with the given source and target hashes.
	//
	// If either node doesn't exit, ErrNodeNotFound should be returned for the respective
	// node. If the edge already exists, ErrEdgeAlreadyExists should be returned.
	AddEdge(sourceHash, targetHash string, edge Edge) error

	// UpdateEdge should update the edge between the given vertices with the data of the given
	// Edge instance. If the edge doesn't exist, ErrEdgeNotFound should be returned.
	UpdateEdge(sourceHash string, targetHash string, edge Edge) error

	// RemoveEdge should remove the edge between the vertices with the given source and target
	// hashes.
	//
	// If either node doesn't exist, it is up to you whether ErrNodeNotFound or no error should
	// be returned. If the edge doesn't exist, it is up to you whether ErrEdgeNotFound or no error
	// should be returned.
	RemoveEdge(sourceHash string, targetHash string) error

	// Edge should return the edge joining the vertices with the given hash values. It should
	// exclusively look for an edge between the source and the target node, not vice versa. The
	// graph implementation does this for undirected graphs itself.
	//
	// Note that unlike Graph.Edge, this function is supposed to return an Edge[string], i.e. an edge
	// that only contains the node hashes instead of the vertices themselves.
	//
	// If the edge doesn't exist, ErrEdgeNotFound should be returned.
	Edge(sourceHash string, targetHash string) (Edge, error)

	// ListEdges should return all edges in the graph in a slice.
	ListEdges() ([]Edge, error)

	CreatesCycle(source, target string) (bool, error)
}

type Tool

type Tool interface {
	OpenAiTool() openai.Tool
	FunctionName() string
	Description() string
	ToolName() string
	Run(ctx context.Context, logger *zap.Logger, args map[string]any) (map[string]any, error)
}

func SelectTool

func SelectTool(tools []Tool, functionName string) (Tool, bool)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL