janus/rama

Clojure utilities on top of Rama: managed objects with dependency injection, batch segmacros, symbol helpers, temp-state utilities, and depot utilities. Everything is accessible from a single namespace — janus.rama.api.

This namespace should be required with :refer :all because it exports Rama segmacros that are resolved by name at compile time.

Setup

Add the dependency:

;; deps.edn
{janus/rama {:mvn/version "..."}}

Require the API namespace:

(require '[janus.rama.api :refer :all])

Managed Objects

The Problem

Rama provides in com.rpl.rama.integration means to define ManagedResources with semantics on how they are shared (per task thread, per worker, etc…​).

This package provides a way to specify such managed resources in clojure. In addition, it allows that these ManagedResources themselves share other managed resources (e.g. API clients sharing a common http-client).

per-worker

Creates a ManagedResourceSpec — a serializable description of how to build a resource.

(per-worker key ctor & args)
  • key — a keyword uniquely identifying the resource (e.g. ::my-http-client).

  • ctor — a symbol or var pointing to the constructor function.

  • args — any arguments forwarded to the constructor via (apply ctor args).

The spec itself is Nippy-serializable and safe to embed in topology code and/or configuration.

as-object

Wraps a ManagedResourceSpec into a ManagedResourceWrapper that implements both TaskGlobalObject and IDeref. Declare it with declare-object and deref it at runtime to obtain the live resource.

(declare-object *my-client (as-object (per-worker ::my-client #'make-client opts)))

;; Inside topology code, deref to get the live resource:
@*my-client

resource

Extracts the live resource from a ManagedResource instance directly. Primarily used internally by the DI resolution machinery.

(resource managed-resource) ;; => the live resource value

secret

Convenience constructor for secrets as per-worker managed resources. Uses janus.secrets.api/get under the hood.

(secret id) ;; id is a qualified keyword

Example:

(declare-object *token (as-object (secret :cloudflare/api-token)))

;; In topology code:
@*token ;; => "sk-..."

Since secret returns a ManagedResourceSpec, it can be nested inside another per-worker spec — the DI machinery resolves it automatically:

(def ld-client
  (per-worker ::ld-client
    #'my.livingdocs/client
    {:base-uri "https://api.livingdocs.io"
     :token    (secret :livingdocs/api-token)}))

(declare-object *ld-client (as-object ld-client))

The secret is fetched once per worker and injected into the constructor map before my.livingdocs/client is called.

See janus/secrets for the full secrets documentation (lookup chain, naming conventions, CLI tool).

Shared Dependency Injection

The key feature of managed objects is automatic dependency resolution with identity sharing. If a spec’s args contain other ManagedResourceSpec instances, they are resolved before the outer constructor is called. Crucially, specs with the same key resolve to the same instance — enabling shared dependencies.

Example: Shared Base Resource

;; Spec A: a base resource
(def spec-a (per-worker ::a #'make-base-resource :config))

;; Specs B and C both depend on A
(def spec-b (per-worker ::b #'make-service-b spec-a))
(def spec-c (per-worker ::c #'make-service-c spec-a))

When B and C are materialised:

  1. A is resolved first (because it appears in both args lists).

  2. B receives the live instance of A as its first argument.

  3. C receives the same identical instance of A — not a copy.

(identical? (get-a-from-b) (get-a-from-c)) ;; => true

This means multiple managed objects can safely share a single connection pool, cache, or any other stateful resource.

Automatic Cleanup of Dependencies

When a resource has nested dependencies (i.e. its args contained other specs that were resolved into ManagedResource instances), closing the outer resource automatically closes all of its sub-resources. You do not need to manage the lifecycle of inner dependencies manually — closing the top-level object is sufficient.

;; When *service-b is closed (e.g. at end of task), the inner
;; resource created from spec-a is also closed automatically.
(declare-object *service-b
  (as-object (per-worker ::b #'make-service-b
               (per-worker ::a #'make-connection-pool))))

Real-World Pattern

In practice, config maps pass specs that are resolved by topology setup code:

;; Config map defines how to build the API client
(def config {:api-client (per-worker ::api-client #'make-api-client opts)})

;; Topology setup declares the object
(let [api-client (derive-rama-var* '* name 'external-api-client)]
  (declare-object* setup api-client (as-object (:api-client config))))

;; In topology code, deref the object to call the API
(call-api-client api-client instruction)
;; where call-api-client derefs: @api-client

See io.forward-publishing.janus.topologies.polling and io.forward-publishing.janus.topologies.cache-purging for full examples.

Cyclic dependencies are not handled. If spec A depends on spec B and spec B depends on spec A, the resolution will loop indefinitely.

defbatch

A macro for writing topology-level batch segmacros with familiar Clojure syntax.

(defbatch name doc-string? meta? [inputs... :> outputs...] body...)
  • inputs — standard Clojure binding forms (symbols, destructuring).

  • :> — separates inputs from outputs (optional).

  • outputs — simple symbols for generated temp pstates.

  • body — Rama topology expressions.

The macro automatically sanitizes ramavar symbols (*foo, %bar, $$baz) so they don’t collide across invocations. Under the hood it expands into defbasicsegmacro.

Input-only (aggregation)

When there are no :> outputs, the batch typically aggregates into an existing pstate:

(defbatch <<sync-graph [%new-ids $$pstate]
  (%new-ids :> *new-id)
  (aggs/+set-agg $$pstate *new-id))

With outputs (temp pstate generation)

When :> is present, the batch produces temporary pstates:

(defbatch execution-plan [%new-ids :> $$plan]
  (%new-ids :> *id)
  (materialize> *id :> $$plan))

Other Utilities

Temp State

temp-state — a segmacro that creates a temporary nil pstate. Call at the top level of a microbatch topology.

(temp-state :> $$my-temp)

Ramaop Generators

These segmacros produce anonymous ramaops from materialised or temporary pstates:

Segmacro Description

materialized->ramaop

[n materialized :> op] — emits contents of a materialized pstate across all tasks. n is the tuple size.

this-task-materialized->ramaop

Same as above but only on the current task.

temp-pstate->ramaop

[temp-pstate path :> op] — emits values from a temp pstate path across all tasks.

this-task-temp-pstate->ramaop

Same as above but only on the current task.

srange-dynamic Helpers

Function Description

second>

(second> coll) — returns (min 1 (count coll)). Useful as the second argument to srange-dynamic.

count>

(count> coll _) — returns (count coll). Useful as the end argument to srange-dynamic.

Symbol Helpers

Utilities for manipulating Rama variable symbols (*foo, %bar, $$baz):

Function Description

sans-rama-prefix

Strips the Rama prefix (*, %, $$) from a symbol name.

derive-rama-var*

(derive-rama-var* prefix base) or (derive-rama-var* prefix base suffix) — derives a new Rama variable symbol. E.g. (derive-rama-var* ' '*foo)` -> `'foo.

derive-rama-var

Macro version of derive-rama-var* — symbols need not be quoted.

Depot Utilities

Functions for inspecting and sampling depot contents (useful for debugging and REPL exploration):

Function Description

depot-count

(depot-count depot) or (depot-count depot offset-back) — counts items in a depot, optionally limited to the last offset-back items per partition.

depot-seq

(depot-seq depot pkey) or (depot-seq depot pkey offset-back) — lazy sequence of depot entries for the partition matching pkey.

depot-sample

(depot-sample depot offset-back) — returns a sample descriptor (map with offsets and counts) for a fixed range of depot entries.

depot-sample-seq

(depot-sample-seq sample) or (depot-sample-seq sample :pkey k) — lazy sequence from a sample, optionally filtered by partition key.

serialize-sample

(serialize-sample sample) — strips the depot reference for serialization.

restore-sample

(restore-sample depot serialized) — re-attaches a depot to a serialized sample.