6. Commands
So far, we've been calling aggregate methods directly. Commands add a layer of indirection — instead of calling product.Create(...), you dispatch a CreateProduct command. A handler receives the command, loads the aggregate, and executes the business logic.
This matters when you have multiple services, when you want to decouple your API layer from your domain, or when you need to dispatch work asynchronously. Commands also let you coordinate operations across multiple aggregates — for example, placing an order and adjusting product stock in one handler (we'll see this in the Order chapter).
Define Commands
Add command definitions to product.go:
// Command names.
const (
CreateProductCmd = "shop.product.create"
RenameProductCmd = "shop.product.rename"
ChangePriceCmd = "shop.product.change_price"
AdjustStockCmd = "shop.product.adjust_stock"
)
// Command payloads.
type CreateProductPayload struct {
Name string
Price int
Stock int
}
type AdjustStockPayload struct {
Quantity int
Reason string
}Register Commands
Add a registration function, just like we did for events:
func RegisterProductCommands(r codec.Registerer) {
codec.Register[CreateProductPayload](r, CreateProductCmd)
codec.Register[string](r, RenameProductCmd)
codec.Register[int](r, ChangePriceCmd)
codec.Register[AdjustStockPayload](r, AdjustStockCmd)
}Handle Commands
Add a function that sets up all command handlers for products:
import (
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/command"
"github.com/modernice/goes/helper/streams"
)
// ProductRepository is the typed repository for products.
type ProductRepository = aggregate.TypedRepository[*Product]
func HandleProductCommands(
ctx context.Context,
bus command.Bus,
products ProductRepository,
) <-chan error {
createErrs := command.MustHandle(ctx, bus, CreateProductCmd, func(ctx command.Ctx[CreateProductPayload]) error {
return products.Use(ctx, ctx.AggregateID(), func(p *Product) error {
pl := ctx.Payload()
return p.Create(pl.Name, pl.Price, pl.Stock)
})
})
renameErrs := command.MustHandle(ctx, bus, RenameProductCmd, func(ctx command.Ctx[string]) error {
return products.Use(ctx, ctx.AggregateID(), func(p *Product) error {
return p.Rename(ctx.Payload())
})
})
priceErrs := command.MustHandle(ctx, bus, ChangePriceCmd, func(ctx command.Ctx[int]) error {
return products.Use(ctx, ctx.AggregateID(), func(p *Product) error {
return p.ChangePrice(ctx.Payload())
})
})
stockErrs := command.MustHandle(ctx, bus, AdjustStockCmd, func(ctx command.Ctx[AdjustStockPayload]) error {
return products.Use(ctx, ctx.AggregateID(), func(p *Product) error {
pl := ctx.Payload()
return p.AdjustStock(pl.Quantity, pl.Reason)
})
})
return streams.FanInAll(createErrs, renameErrs, priceErrs, stockErrs)
}Wire It Up
Commands use their own codec registry (cmdReg) — separate from the event registry (eventReg). The command bus needs the command codec to serialize payloads.
Update cmd/main.go:
import (
// ... existing imports ...
"github.com/modernice/goes/aggregate/repository"
"github.com/modernice/goes/command/cmdbus"
"github.com/yourname/shop"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
eventReg := codec.New()
shop.RegisterProductEvents(eventReg)
cmdReg := codec.New()
shop.RegisterProductCommands(cmdReg)
store := eventstore.New()
bus := eventbus.New()
store = eventstore.WithBus(store, bus)
repo := repository.New(store)
products := repository.Typed(repo, shop.NewProduct)
// Create the command bus.
cbus := cmdbus.New[int](cmdReg, bus)
// Start handling product commands.
productErrs := shop.HandleProductCommands(ctx, cbus, products)
go logErrors(productErrs)
log.Println("Shop is running. Press Ctrl+C to stop.")
<-ctx.Done()
}
func logErrors(errs <-chan error) {
for err := range errs {
log.Printf("Error: %v", err)
}
}Dispatch a Command
Now you can create products by dispatching commands:
id := uuid.New()
cmd := command.New(shop.CreateProductCmd, shop.CreateProductPayload{
Name: "Wireless Mouse",
Price: 2999,
Stock: 50,
}, command.Aggregate(shop.ProductAggregate, id))
if err := cbus.Dispatch(ctx, cmd.Any()); err != nil {
log.Fatal(err)
}Understanding the Command Flow
Dispatch(CreateProductCmd)
│
▼
Command Bus (routes to handler)
│
▼
command.MustHandle callback
│
▼
products.Use(ctx, id, func(p *Product) error {
return p.Create(...)
})
│
▼
1. Fetch product (replay events)
2. Call p.Create() (validates + raises event)
3. Save product (persist new events)command.Ctx[P]
The command context provides:
ctx.Payload()— the typed command payloadctx.AggregateID()— the UUID of the target aggregatectx.AggregateName()— the name of the target aggregate- It also embeds
context.Context, so you can pass it to any function that takes a context
Error Channels
Command handlers return error channels (<-chan error). Errors are sent asynchronously as commands are handled. You're responsible for draining these channels — typically by logging or forwarding errors.
streams.FanInAll merges multiple error channels into one.
Next
Let's build our second aggregate. In the next chapter, we'll create the Order.