Examples

This page provides comprehensive examples of using Appose in various scenarios.

Basic Examples

Simple Calculation

The most basic example: execute a simple calculation.

import appose

env = appose.system()
with env.groovy() as groovy:
    task = groovy.task("5 + 6")
    task.wait_for()
    result = task.result()
    print(f"Result: {result}")  # 11

With Inputs and Outputs

Passing inputs to tasks and retrieving outputs.

import appose

script = """
// Calculate the sum
result = a + b
task.outputs["sum"] = result
task.outputs["product"] = a * b
"""

env = appose.system()
with env.groovy() as groovy:
    task = groovy.task(script)
    task.inputs["a"] = 10
    task.inputs["b"] = 5
    task.wait_for()

    print(f"Sum: {task.outputs['sum']}")       # 15
    print(f"Product: {task.outputs['product']}") # 50

Progress Tracking

Golden Ratio Approximation

A more complex example showing progress tracking and cancelation.

import appose
from appose.service import ResponseType
from time import sleep

script = """
// Approximate the golden ratio using the Fibonacci sequence.
previous = 0
current = 1
iterations = 50
for (i=0; i<iterations; i++) {
    if (task.cancelRequested) {
        task.cancel()
        break
    }
    task.update(null, i, iterations)
    v = current
    current += previous
    previous = v
}
task.outputs["numer"] = current
task.outputs["denom"] = previous
"""

def task_listener(event):
    if event.response_type == ResponseType.UPDATE:
        print(f"Progress: {event.current}/{event.maximum}")
    elif event.response_type == ResponseType.COMPLETION:
        numer = task.outputs["numer"]
        denom = task.outputs["denom"]
        ratio = numer / denom
        print(f"Result: {numer}/{denom}{ratio}")
    elif event.response_type == ResponseType.CANCELATION:
        print("Task canceled")
    elif event.response_type == ResponseType.FAILURE:
        print(f"Task failed: {task.error}", file=sys.stderr)

env = appose.system()
with env.groovy() as groovy:
    task = groovy.task(script)
    task.listen(task_listener)
    task.start()

    sleep(1)
    if not task.status.is_finished():
        # Task is taking too long; request cancelation
        task.cancel()

    task.wait_for()

Environment Building

Conda Environment

Building an environment with conda dependencies.

import appose

env = appose.mamba() \
    .conda("python=3.11", "numpy", "pandas") \
    .channels("conda-forge") \
    .name("my-data-env") \
    .log_debug() \
    .build()

with env.python() as python:
    script = """
    import numpy as np
    import pandas as pd

    # Create a simple array
    arr = np.array([1, 2, 3, 4, 5])
    task.outputs["mean"] = float(np.mean(arr))
    task.outputs["std"] = float(np.std(arr))
    """

    task = python.task(script)
    task.wait_for()

    print(f"Mean: {task.outputs['mean']}")
    print(f"Std: {task.outputs['std']}")

Pixi Environment

Using Pixi for a modern, faster alternative to conda.

import appose

env = appose.pixi() \
    .conda("python>=3.10", "numpy") \
    .pypi("scikit-learn") \
    .channels("conda-forge") \
    .name("my-ml-env") \
    .build()

with env.python() as python:
    script = """
    import numpy as np
    from sklearn.linear_model import LinearRegression

    # Simple linear regression
    X = np.array([[1], [2], [3], [4], [5]])
    y = np.array([2, 4, 6, 8, 10])

    model = LinearRegression()
    model.fit(X, y)

    task.outputs["slope"] = float(model.coef_[0])
    task.outputs["intercept"] = float(model.intercept_)
    """

    task = python.task(script)
    task.wait_for()

    print(f"Slope: {task.outputs['slope']}")
    print(f"Intercept: {task.outputs['intercept']}")

uv Environment

Using uv for fast Python virtual environments.

import appose

env = appose.uv() \
    .python("3.11") \
    .include("requests", "beautifulsoup4") \
    .name("my-web-env") \
    .build()

with env.python() as python:
    script = """
    import requests
    from bs4 import BeautifulSoup

    # Simple web scraping example
    html = "<html><body><h1>Hello World</h1></body></html>"
    soup = BeautifulSoup(html, 'html.parser')

    task.outputs["title"] = soup.h1.text
    """

    task = python.task(script)
    task.wait_for()

    print(f"Title: {task.outputs['title']}")

From Environment Files

Loading environments from configuration files.

import appose

# Auto-detect builder from file extension
env = appose.file("environment.yml") \
    .log_debug() \
    .build()

with env.python() as python:
    task = python.task("import sys; sys.version")
    task.wait_for()
    print(f"Python version: {task.result()}")

# Or explicitly specify Mamba
env = appose.mamba("environment.yml").build()

# Or use Pixi with pixi.toml
env = appose.pixi("pixi.toml").build()

Wrapping Existing Environments

Wrap and use existing conda/pixi environments.

import appose

# Wrap an existing environment (auto-detects type)
env = appose.wrap("/path/to/existing/env")

with env.python() as python:
    task = python.task("print('Hello from wrapped environment!')")
    task.wait_for()

Advanced Examples

Multiple Tasks in Sequence

Running multiple tasks one after another.

Todo

Running tasks in sequence is not very “Advanced”. More appropriate would be things like: 1) open-ended tasks that send updates for communication back to the service process; or 2) use of the queue=main feature to run sensitive tasks on the main thread to avoid threading issues.

import appose

env = appose.system()
with env.python() as python:
    # Task 1: Initialize data
    task1 = python.task(
        "data = [1, 2, 3, 4, 5]\n"
        "task.outputs['data'] = data"
    )
    task1.wait_for()

    # Task 2: Process data
    task2 = python.task(
        "result = sum(data) / len(data)\n"
        "task.outputs['average'] = result"
    )
    task2.inputs["data"] = task1.outputs["data"]
    task2.wait_for()

    print(f"Average: {task2.outputs['average']}")

Error Handling

Properly handling task failures.

Todo

The following example will throw an exception at the task.waitFor() step. This example needs to be changed to either: A) try/catch at that step and handle that way; or B) do not call waitFor and instead handle errors asynchronously by checking task.status at intervals; or C) maybe one example of each?

import org.apposed.appose.*;

public class ErrorHandling {
    public static void main(String[] args) throws Exception {
        Environment env = Appose.system();
        try (Service python = env.python()) {
            Task task = python.task("""
                # This will raise an exception
                result = 1 / 0
                """);

            task.listen(event -> {
                switch (event.responseType) {
                    case FAILURE:
                        System.err.println("Task failed with error:");
                        System.err.println(task.error);
                        break;
                    case COMPLETION:
                        System.out.println("Task completed successfully");
                        break;
                }
            });

            task.waitFor();

            if (task.status == TaskStatus.FAILED) {
                System.err.println("Handling the failure...");
                // Recover or retry
            }
        }
    }
}

Real-World Use Cases

Deep Learning Inference

Todo

This example needs to be finished. We can use https://github.com/ctrueden/starfun3d

Running deep learning models from a different language.

import org.apposed.appose.*;

public class DeepLearning {
    public static void main(String[] args) throws Exception {
        // Build environment with PyTorch
        Environment env = Appose.pixi()
            .conda("python>=3.10")
            .pypi("torch", "torchvision")
            .name("pytorch-env")
            .build();

        try (Service python = env.python()) {
            // Load a pre-trained model
            Task loadModel = python.task("""
                import torch
                import torchvision.models as models

                # Load pre-trained ResNet
                model = models.resnet18(pretrained=False)
                model.eval()

                task.outputs["status"] = "Model loaded"
                """);
            loadModel.waitFor();

            System.out.println(loadModel.outputs.get("status"));

            // In a real scenario, you would:
            // 1. Share image tensors via shared memory
            // 2. Run inference
            // 3. Get results back via shared memory
        }
    }
}

Data Science Pipeline

Todo

This example needs testing and refinement.

Complex data processing workflow.

import appose

env = appose.pixi() \
    .conda("python>=3.10", "pandas", "numpy") \
    .pypi("scikit-learn") \
    .name("data-pipeline") \
    .build()

with env.python() as python:
    # Step 1: Load and preprocess data
    preprocess_script = (
        "import pandas as pd\n"
        "import numpy as np\n"
        "\n"
        "data = pd.DataFrame({\n"
        "    'x': np.random.randn(100),\n"
        "    'y': np.random.randn(100)\n"
        "})\n"
        "data_normalized = (data - data.mean()) / data.std()\n"
        "task.outputs['rows'] = len(data_normalized)\n"
        "task.outputs['status'] = 'Preprocessed'"
    )
    preprocess = python.task(preprocess_script)
    preprocess.wait_for()

    print(f"Preprocessed {preprocess.outputs['rows']} rows")

    # Step 2: Train model
    train_script = (
        "from sklearn.linear_model import LinearRegression\n"
        "task.outputs['status'] = 'Model trained'"
    )
    train = python.task(train_script)
    train.wait_for()

    print(train.outputs["status"])