Worker Protocol
The Appose worker protocol defines how services communicate with worker processes. This document describes the protocol in detail, enabling you to create custom worker implementations.
Protocol Overview
Workers are separate processes that communicate with the Appose service via:
Standard Input (stdin): Receives requests from the service
Standard Output (stdout): Sends responses to the service
Standard Error (stderr): Logs and error messages (optional)
All communication uses JSON-formatted messages, with one message per line.
Worker Contract
A worker process must:
Accept requests in Appose’s request format on stdin
Issue responses in Appose’s response format on stdout
Handle each request appropriately and in a timely manner
Use UUIDs to track tasks across the request/response lifecycle
Request Format
A request is a single line of JSON with the following structure:
Common Fields
All requests include:
task: A UUID string identifying the taskrequestType: The type of request (EXECUTEorCANCEL)
Request Types
EXECUTE
Asynchronously execute a script within the worker process.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"requestType": "EXECUTE",
"script": "task.outputs[\"result\"] = computeResult(gamma)\n",
"inputs": {"gamma": 2.2}
}
Fields:
task(string): UUID of the taskrequestType(string): Must be"EXECUTE"script(string): The script code to executeinputs(object, optional): Key-value pairs of input data
The worker should:
Parse the request
Send a
LAUNCHresponse immediately to confirm receiptExecute the script asynchronously
Make inputs available to the script in a
task.inputsmap/dictionaryProvide a
task.outputsmap/dictionary for the script to populateSend
UPDATEresponses as the task progresses (optional)Send a
COMPLETIONresponse with outputs when doneSend a
FAILUREresponse if an error occurs
CANCEL
Cancel a running task.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"requestType": "CANCEL"
}
Fields:
task(string): UUID of the task to cancelrequestType(string): Must be"CANCEL"
The worker should:
Mark the task for cancelation
Make
task.cancel_requested/task.cancelRequestedtrue in the script contextAllow the script to check this flag and terminate gracefully
Send a
CANCELATIONresponse when the task is canceled
Response Format
A response is a single line of JSON with the following structure:
Common Fields
All responses include:
task: A UUID string identifying the task (must match the request)responseType: The type of response
Response Types
LAUNCH
Confirms successful receipt of an EXECUTE request.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"responseType": "LAUNCH"
}
Fields:
task(string): UUID of the taskresponseType(string): Must be"LAUNCH"
When to send: Immediately after receiving an EXECUTE request.
UPDATE
Indicates progress during task execution.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"responseType": "UPDATE",
"message": "Processing step 0 of 91",
"current": 0,
"maximum": 91
}
Fields:
task(string): UUID of the taskresponseType(string): Must be"UPDATE"message(string, optional): Human-readable progress messagecurrent(number, optional): Current progress valuemaximum(number, optional): Maximum progress value
When to send: Periodically during long-running tasks to report progress.
COMPLETION
Indicates successful task completion and returns outputs.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"responseType": "COMPLETION",
"outputs": {"result": 91}
}
Fields:
task(string): UUID of the taskresponseType(string): Must be"COMPLETION"outputs(object): Key-value pairs of output data
When to send: After the task script completes successfully.
CANCELATION
Confirms successful cancelation of a task.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"responseType": "CANCELATION"
}
Fields:
task(string): UUID of the taskresponseType(string): Must be"CANCELATION"
When to send: After a task has been successfully canceled.
FAILURE
Indicates that a task failed to complete.
Structure:
{
"task": "87427f91-d193-4b25-8d35-e1292a34b5c4",
"responseType": "FAILURE",
"error": "Invalid gamma value"
}
Fields:
task(string): UUID of the taskresponseType(string): Must be"FAILURE"error(string): Error message or stack trace
When to send: When a task encounters an error or exception.
Task Context
Within the executing script, workers must provide a task object with:
Required Properties
task.inputs(map/dict): Input values from the requesttask.outputs(map/dict): Output values to return in COMPLETION responsetask.cancel_requested/task.cancelRequested(boolean): True if cancelation was requested
Required Methods
task.update(current, maximum, message)ortask.update(message, current, maximum): Send an UPDATE responsetask.cancel(): Mark the task as canceled and send a CANCELATION response
Example Implementation Flow
Here’s a complete example of the request/response flow:
Service sends EXECUTE request:
{ "task": "abc-123", "requestType": "EXECUTE", "script": "x * 2", "inputs": {"x": 5} }
Worker sends LAUNCH response:
{ "task": "abc-123", "responseType": "LAUNCH" }
Worker executes script and sends UPDATE responses (optional):
{ "task": "abc-123", "responseType": "UPDATE", "message": "Computing...", "current": 50, "maximum": 100 }
Worker sends COMPLETION response:
{ "task": "abc-123", "responseType": "COMPLETION", "outputs": {"result": 10} }
Reference Implementations
Appose provides two reference worker implementations:
Python Worker
The python_worker module implements the protocol in Python. Key features:
Executes Python scripts using
exec()Provides a
taskobject to scripts withinputs,outputs,cancel_requestedHandles multiple concurrent tasks via threading
Groovy Worker
The GroovyWorker class implements the protocol in Groovy/Java. Key features:
Executes Groovy scripts using
GroovyShellProvides a
taskobject withinputs,outputs,cancelRequestedHandles multiple concurrent tasks via threading
Creating Custom Workers
To create a custom worker:
Choose your language/platform for the worker
Read requests from stdin one line at a time
Parse JSON to extract task UUID, request type, and parameters
Send LAUNCH response immediately for EXECUTE requests
Execute scripts with access to task context
Send responses to stdout as JSON lines
Handle CANCEL requests by setting a flag scripts can check
Minimal Worker Example
Here’s a minimal (single-threaded) worker in Python:
import sys
import json
from uuid import UUID
while True:
# Read request
line = sys.stdin.readline()
if not line:
break
request = json.loads(line)
task_id = request["task"]
request_type = request["requestType"]
if request_type == "EXECUTE":
# Send LAUNCH
print(json.dumps({"task": task_id, "responseType": "LAUNCH"}))
sys.stdout.flush()
# Execute script
try:
script = request["script"]
inputs = request.get("inputs", {})
# Minimal task context
outputs = {}
# Execute the script
exec(script, {"task_inputs": inputs, "task_outputs": outputs})
# Send COMPLETION
print(json.dumps({
"task": task_id,
"responseType": "COMPLETION",
"outputs": outputs
}))
sys.stdout.flush()
except Exception as e:
# Send FAILURE
print(json.dumps({
"task": task_id,
"responseType": "FAILURE",
"error": str(e)
}))
sys.stdout.flush()
elif request_type == "CANCEL":
# Send CANCELATION
# Note: Single-threaded worker cannot actually honor task cancelations.
print(json.dumps({"task": task_id, "responseType": "CANCELATION"}))
sys.stdout.flush()
Best Practices
Always flush stdout after writing responses
Validate JSON before processing requests
Handle errors gracefully and send FAILURE responses
Support cancelation by checking flags periodically in long scripts
Use UUIDs correctly to match responses to requests
Keep responses line-delimited (one JSON object per line, no pretty-printing)
Log to stderr to avoid interfering with the protocol on stdout
Test with multiple concurrent tasks if your worker supports them
Data Type Considerations
Appose uses JSON for serialization, which natively supports:
Numbers (integers and floats)
Strings
Booleans
Arrays/lists
Objects/dictionaries
null
Beyond JSON-Native Types
For data types that JSON cannot represent natively, Appose uses a special encoding scheme: complex objects are wrapped in a dictionary with an appose_type key that identifies the object type. This allows seamless serialization of domain-specific types like shared memory blocks and multi-dimensional arrays.
The worker implementations automatically handle encoding and decoding of these types. When your script produces a non-serializable object (e.g., a Python datetime instance), it is automatically exported and returned as a reference that you can interact with.
Supported Extended Types
NDArray
Represents a multi-dimensional array backed by shared memory, enabling efficient tensor sharing.
Structure:
{
"appose_type": "ndarray",
"dtype": "float32",
"shape": [2, 3, 4],
"shm": {
"appose_type": "shm",
"name": "psm_4812f794",
"rsize": 16384
}
}
Fields:
appose_type: Must be"ndarray"dtype: Data type of array elements (e.g.,"float32","int64")shape: Array dimensions as a list of integers (in C-order)shm: A SharedMemory object containing the actual data
WorkerObject (Remote Object Proxies)
When a worker script returns a non-JSON-serializable object (such as a Python datetime, Java object, or custom class instance), the worker automatically exports it with a generated variable name and returns a reference to it. This reference is converted into a proxy object on the client side, allowing you to interact with the remote object naturally.
Structure:
{
"appose_type": "worker_object",
"var_name": "_appose_auto_0"
}
Fields:
appose_type: Must be"worker_object"var_name: The exported variable name in the worker process
Usage:
When you receive a WorkerObject, the client libraries automatically convert it to a proxy object. You can access attributes directly or create a strongly-typed proxy for method calls:
# Worker returns a datetime object
now = service.task("import datetime; datetime.datetime.now()").wait_for().result()
# now is a ProxyObject wrapping the remote datetime instance
year = now.year # Accesses the year attribute remotely
weekday = now.weekday() # Calls a method remotely
// Worker returns a datetime object
WorkerObject now = (WorkerObject) service.task(
"import datetime; datetime.datetime.now()"
).waitFor().result();
// Access attributes directly
int year = (Integer) now.getAttribute("year");
// Or create a strongly-typed proxy for method calls
interface DateTime {
int weekday();
float timestamp();
}
DateTime dt = now.proxy(DateTime.class);
int weekday = dt.weekday();
float timestamp = dt.timestamp();
For an introduction to using proxies with task outputs, see the Core Concepts documentation (the “Non-Serializable Objects and Proxies” section under “Task” subsections).
Encoding Rules
When serializing data to JSON:
Natively serializable types (strings, numbers, booleans, lists, maps) are encoded as-is
SharedMemory objects are wrapped with
appose_type: "shm"NDArray objects are wrapped with
appose_type: "ndarray"(includes their SharedMemory)Non-serializable objects (in worker mode only) are auto-exported and wrapped with
appose_type: "worker_object"
Auto-export enables transparent handling of objects that cannot be serialized: the worker automatically persists them for future access, and the client receives a proxy to interact with them.
Decoding Rules
When deserializing JSON:
Check for
appose_typekey in dictionary objectsIf
"shm": Reconstruct a SharedMemory object from the name and sizeIf
"ndarray": Reconstruct an NDArray, recursively decoding the embedded SharedMemoryIf
"worker_object": (Client-side only) Convert to a proxy object for remote method/attribute accessOtherwise: Return the dictionary or value as-is
Testing Your Worker
To test a custom worker:
Run the worker manually and send it JSON requests via stdin
Verify responses match the expected format
Test error cases (invalid script, cancelation, etc.)
Integrate with Appose using
env.service("my-worker", ...)
Example manual test:
# Start your worker
./my-worker
# Send an EXECUTE request (paste this as one line)
{"task":"test-123","requestType":"EXECUTE","script":"5 + 6","inputs":{}}
# Expected responses:
{"task":"test-123","responseType":"LAUNCH"}
{"task":"test-123","responseType":"COMPLETION","outputs":{"result":11}}