Aggregate data automatically

Data pipelines automatically transform raw sensor readings into summaries and insights at a schedule that you choose. Viam stores the output of these pipelines in a separate, queryable database.

For example, you may often query the average temperature across multiple sensors for each hour of the day. To make these queries faster, you can use a data pipeline to pre-calculate the results, saving significant computational resources.

Data pipelines work with incomplete data as well. If a machine goes offline, data collection continues but sync pauses. viam-server stores the data locally and syncs later, when your machine reconnects to Viam. Once the machine reconnects and syncs this stored data, Viam automatically re-runs affected pipelines to include the new data.

Prerequisites

While not a requirement, it is easier to test data pipelines if you have already enabled data capture from at least one component and begun syncing data with Viam before setting up a pipeline.

Only users with organization owner permissions can create a data pipeline.

Pipeline management

Create a pipeline

To define a data pipeline, specify a name, organization, schedule, data source type, and query:

Use datapipelines create to create your pipeline:

viam datapipelines create \
  --org-id=<org-id> \
  --name=sensor-counts \
  --schedule="0 * * * *" \
  --data-source-type="standard" \
  --mql='[{"$match": {"component_name": "sensor"}}, {"$group": {"_id": "$location_id", "avg_temp": {"$avg": "$data.readings.temperature"}, "count": {"$sum": 1}}}, {"$project": {"location": "$_id", "avg_temp": 1, "count": 1}}]' \
  --enable-backfill=True

To pass your query as a file instead of specifying it as inline MQL, pass the --mql-path flag instead of --mql. To create a pipeline that reads data from the hot data store, specify --data-source-type hotstorage.

To define a new pipeline, call DataClient.CreateDataPipeline:

import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType


# Configuration constants – replace with your actual values
API_KEY = ""  # API key, find or create in your organization settings
API_KEY_ID = ""  # API key ID, find or create in your organization settings
ORG_ID = ""  # Organization ID, find or create in your organization settings


async def connect() -> ViamClient:
    """Establish a connection to the Viam client using API credentials."""
    dial_options = DialOptions(
        credentials=Credentials(
            type="api-key",
            payload=API_KEY,
        ),
        auth_entity=API_KEY_ID
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def main() -> int:
    viam_client = await connect()
    data_client = viam_client.data_client


    pipeline_id = await data_client.create_data_pipeline(
        name="test-pipeline",
        organization_id=ORG_ID,
        mql_binary=[
            {"$match": {"component_name": "temperature-sensor"}},
            {
                "$group": {
                    "_id": "$location_id",
                    "avg_temp": {"$avg": "$data.readings.temperature"},
                    "count": {"$sum": 1}
                }
            },
            {
                "$project": {
                    "location": "$_id",
                    "avg_temp": 1,
                    "count": 1
                }
            }
        ],
        schedule="0 * * * *",
        data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
        enable_backfill=False,
    )
    print(f"Pipeline created with ID: {pipeline_id}")

    viam_client.close()
    return 0

if __name__ == "__main__":
    asyncio.run(main())

To create a pipeline that reads data from the hot data store, set your query’s data_source to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE.

To define a new pipeline, call DataClient.CreateDataPipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	orgID := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()


	// Create MQL stages as map slices
	mqlStages := []map[string]interface{}{
		{"$match": map[string]interface{}{"component_name": "temperature-sensor"}},
		{
			"$group": map[string]interface{}{
				"_id": "$location_id",
				"avg_temp": map[string]interface{}{"$avg": "$data.readings.temperature"},
				"count": map[string]interface{}{"$sum": 1},
			},
		},
		{
			"$project": map[string]interface{}{
				"location": "$_id",
				"avg_temp": 1,
				"count": 1,
			},
		},
	}

	pipelineId, err := dataClient.CreateDataPipeline(
		ctx,
		orgID,
		"test-pipeline",
		mqlStages,
		"0 * * * *",
		false,
		&app.CreateDataPipelineOptions{
			TabularDataSourceType: 0,
		},
	)
	if err != nil {
		logger.Fatal(err)
	}

	fmt.Printf("Pipeline created with ID: %s\n", pipelineId)
}

To create a pipeline that reads data from the hot data store, set your query’s data_source field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE.

To define a new pipeline, call dataClient.CreateDataPipeline:

import { createViamClient } from "@viamrobotics/sdk";

// Configuration constants – replace with your actual values
let API_KEY = "";  // API key, find or create in your organization settings
let API_KEY_ID = "";  // API key ID, find or create in your organization settings
let ORG_ID = "";  // Organization ID, find or create in your organization settings

async function main(): Promise<void> {
    // Create Viam client
    const client = await createViamClient({
        credentials: {
            type: "api-key",
            authEntity: API_KEY_ID,
            payload: API_KEY,
        },
    });

    const pipelineId = await client.dataClient.createDataPipeline(
        ORG_ID,
        "test-pipeline",
        [
            { "$match": { "component_name": "temperature-sensor" } },
            { "$group": { "_id": "$location_id", "avg_temp": { "$avg": "$data.readings.temperature" }, "count": { "$sum": 1 } } },
            { "$project": { "location": "$_id", "avg_temp": 1, "count": 1 } }
        ],
        "0 * * * *",
        0,
        false,
    );

    console.log(`Pipeline created with ID: ${pipelineId}`);
}

// Run the script
main().catch((error) => {
    console.error("Script failed:", error);
    process.exit(1);
});

To create a pipeline that reads data from the hot data store, set your query’s dataSource field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE.

Schedule format

To create a schedule for your pipeline, specify a cron expression in UTC. The schedule determines both execution frequency and the range of time queried by each execution. The following table contains some common schedules, which correspond to the listed execution frequencies and query time range:

ScheduleFrequencyQuery Time Range
0 * * * *HourlyPrevious hour
0 0 * * *DailyPrevious day
*/15 * * * *Every 15 minutesPrevious 15 minutes

Query limitations

Data pipeline queries only support a subset of MQL aggregation operators. For more information, see Supported aggregation operators.

Query pipeline results

To query the results of your data pipeline, call DataClient.TabularDataByMQL. Configure the data_source argument with the following fields:

  • type: TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
  • pipeline_id: your pipeline ID
import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType


# Configuration constants – replace with your actual values
API_KEY = ""  # API key, find or create in your organization settings
API_KEY_ID = ""  # API key ID, find or create in your organization settings
ORG_ID = ""  # Organization ID, find or create in your organization settings
PIPELINE_ID = ""


async def connect() -> ViamClient:
    """Establish a connection to the Viam client using API credentials."""
    dial_options = DialOptions(
        credentials=Credentials(
            type="api-key",
            payload=API_KEY,
        ),
        auth_entity=API_KEY_ID
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def main() -> int:
    viam_client = await connect()
    data_client = viam_client.data_client

    tabular_data = await data_client.tabular_data_by_mql(
        organization_id=ORG_ID,
        query=[
            {"$match": {"component_name": "sensor-1"}},
            {
                "$group": {
                    "_id": "$location_id",
                    "avg_val": {"$avg": "$data.readings.a"},
                    "count": {"$sum": 1}
                }
            },
            {
                "$project": {
                    "location": "$_id",
                    "avg_val": 1,
                    "count": 1
                }
            }
        ],
        tabular_data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
        pipeline_id=PIPELINE_ID
    )
    print(f"Tabular Data: {tabular_data}")

    viam_client.close()
    return 0

if __name__ == "__main__":
    asyncio.run(main())

To query the results of your data pipeline, call DataClient.TabularDataByMQL. Configure the DataSource argument with the following fields:

  • Type: datapb.TabularDataSourceType_TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
  • PipelineId: your pipeline’s ID
package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	orgID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	// Create MQL stages as map slices
	mqlStages := []map[string]interface{}{
		{"$match": map[string]interface{}{"component_name": "sensor-1"}},
		{
			"$group": map[string]interface{}{
				"_id": "$location_id",
				"avg_val": map[string]interface{}{"$avg": "$data.readings.a"},
				"count": map[string]interface{}{"$sum": 1},
			},
		},
		{
			"$project": map[string]interface{}{
				"location": "$_id",
				"avg_val": 1,
				"count": 1,
			},
		},
	}

	tabularData, err := dataClient.TabularDataByMQL(ctx, orgID, mqlStages, &app.TabularDataByMQLOptions{
		TabularDataSourceType: 3,
		PipelineID: pipelineId,
	})
	if err != nil {
		logger.Fatal(err)
	}

	fmt.Printf("Tabular Data: %v\n", tabularData)
}

To query the results of your data pipeline, call dataClient.TabularDataByMQL. Configure the data_source argument with the following fields:

  • type: TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
  • pipelineId: your pipeline’s ID
import { createViamClient } from "@viamrobotics/sdk";

// Configuration constants – replace with your actual values
let API_KEY = "";  // API key, find or create in your organization settings
let API_KEY_ID = "";  // API key ID, find or create in your organization settings
let ORG_ID = "";  // Organization ID, find or create in your organization settings
let PIPELINE_ID = "";

async function main(): Promise<void> {
    // Create Viam client
    const client = await createViamClient({
        credentials: {
            type: "api-key",
            authEntity: API_KEY_ID,
            payload: API_KEY,
        },
    });

    const tabularData = await client.dataClient.tabularDataByMQL(
        ORG_ID,
        [
            { "$match": { "component_name": "sensor-1" } },
            { "$group": { "_id": "$location_id", "avg_val": { "$avg": "$data.readings.a" }, "count": { "$sum": 1 } } },
            { "$project": { "location": "$_id", "avg_val": 1, "count": 1 } }
        ],
        {
            tabularDataSourceType: 3,
            pipelineId: PIPELINE_ID,
        }
    );
    console.log(tabularData);
}

// Run the script
main().catch((error) => {
    console.error("Script failed:", error);
    process.exit(1);
});

List pipelines

Use datapipelines list to fetch a list of pipeline configurations in an organization:

viam datapipelines list --org-id=<org-id>

Use DataClient.ListDataPipelines to fetch a list of pipeline configurations in an organization:

import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType


# Configuration constants – replace with your actual values
API_KEY = ""  # API key, find or create in your organization settings
API_KEY_ID = ""  # API key ID, find or create in your organization settings
ORG_ID = ""  # Organization ID, find or create in your organization settings


async def connect() -> ViamClient:
    """Establish a connection to the Viam client using API credentials."""
    dial_options = DialOptions(
        credentials=Credentials(
            type="api-key",
            payload=API_KEY,
        ),
        auth_entity=API_KEY_ID
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def main() -> int:
    viam_client = await connect()
    data_client = viam_client.data_client

    pipelines = await data_client.list_data_pipelines(ORG_ID)
    for pipeline in pipelines:
        print(f"Pipeline: {pipeline.name}, ID: {pipeline.id}, schedule: {pipeline.schedule}, data_source_type: {pipeline.data_source_type}")

    viam_client.close()
    return 0

if __name__ == "__main__":
    asyncio.run(main())

Use DataClient.ListDataPipelines to fetch a list of pipeline configurations in an organization:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	orgID := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
	if err != nil {
		logger.Fatal(err)
	}
	for _, pipeline := range pipelines {
		fmt.Printf("Pipeline: %s, ID: %s, schedule: %s, data_source_type: %s, enable_backfill: %t\n", pipeline.Name, pipeline.ID, pipeline.Schedule, pipeline.DataSourceType)
	}
}

Use dataClient.ListDataPipelines to fetch a list of pipeline configurations in an organization:

import { createViamClient } from "@viamrobotics/sdk";

// Configuration constants – replace with your actual values
let API_KEY = "";  // API key, find or create in your organization settings
let API_KEY_ID = "";  // API key ID, find or create in your organization settings
let ORG_ID = "";  // Organization ID, find or create in your organization settings

async function main(): Promise<void> {
    // Create Viam client
    const client = await createViamClient({
        credentials: {
            type: "api-key",
            authEntity: API_KEY_ID,
            payload: API_KEY,
        },
    });

    const pipelines = await client.dataClient.listDataPipelines(ORG_ID);
    for (const pipeline of pipelines) {
        console.log(`Pipeline: ${pipeline.name}, ID: ${pipeline.id}, schedule: ${pipeline.schedule}, data_source_type: ${pipeline.dataSourceType}`);
    }
}

// Run the script
main().catch((error) => {
    console.error("Script failed:", error);
    process.exit(1);
});

Update a pipeline

Use datapipelines update to alter an existing data pipeline:

viam datapipelines update \
  --org-id=<org-id> \
  --id=<pipeline-id> \
  --schedule="0 * * * *" \
  --name="updated-name" \
  --mql='[{"$match": {"component_name": "sensor"}}, {"$group": {"_id": "$part_id", "total": {"$sum": "$1"}}, {"$project": {"part": "$_id", "avg_temp": 1, "count": 1}}]'

To pass your query from a file instead of from inline MQL, pass the --mql-path flag instead of --mql.

Use DataClient.UpdateDataPipeline to alter an existing data pipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	// Create MQL stages as map slices
	mqlStages := []map[string]interface{}{
		{"$match": map[string]interface{}{"component_name": "temperature-sensor"}},
		{
			"$group": map[string]interface{}{
				"_id": "$location_id",
				"avg_temp": map[string]interface{}{"$avg": "$data.readings.temperature"},
				"count": map[string]interface{}{"$sum": 1},
			},
		},
		{
			"$project": map[string]interface{}{
				"location": "$_id",
				"avg_temp": 1,
				"count": 1,
			},
		},
	}

	err = dataClient.UpdateDataPipeline(ctx, pipelineId, "test-pipeline-updated", mqlStages, "0 * * * *", app.TabularDataSourceTypeStandard)
	if err != nil {
		logger.Fatal(err)
	}
	fmt.Printf("Pipeline updated with ID: %s\n", pipelineId)

}

Enable a pipeline

Use datapipelines enable to enable a disabled data pipeline:

viam datapipelines enable --id=<pipeline-id>

Use DataClient.EnableDataPipeline to enable a disabled data pipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	err = dataClient.EnableDataPipeline(ctx, pipelineId)
	if err != nil {
		logger.Fatal(err)
	}
	fmt.Printf("Pipeline enabled with ID: %s\n", pipelineId)
}

Disable a pipeline

Disabling a data pipeline lets you pause data pipeline execution without fully deleting the pipeline configuration from your organization. The pipeline immediately stops aggregating data. You can re-enable the pipeline at any time to resume execution. When a pipeline is re-enabled, Viam will not backfill missed time windows from the period of time when a pipeline was disabled.

Use datapipelines disable to disable a data pipeline:

viam datapipelines disable --id=<pipeline-id>

Use DataClient.DisableDataPipeline to disable a data pipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	err = dataClient.DisableDataPipeline(ctx, pipelineId)
	if err != nil {
		logger.Fatal(err)
	}
	fmt.Printf("Pipeline disabled with ID: %s\n", pipelineId)
}

Delete a pipeline

Use datapipelines delete to delete a data pipeline, its execution history, and all output generated by that pipeline:

viam datapipelines delete --id=<pipeline-id>

Use DataClient.DeleteDataPipeline to delete a data pipeline:

import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType


# Configuration constants – replace with your actual values
API_KEY = ""  # API key, find or create in your organization settings
API_KEY_ID = ""  # API key ID, find or create in your organization settings
PIPELINE_ID = ""


async def connect() -> ViamClient:
    """Establish a connection to the Viam client using API credentials."""
    dial_options = DialOptions(
        credentials=Credentials(
            type="api-key",
            payload=API_KEY,
        ),
        auth_entity=API_KEY_ID
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def main() -> int:
    viam_client = await connect()
    data_client = viam_client.data_client

    await data_client.delete_data_pipeline(PIPELINE_ID)
    print(f"Pipeline deleted with ID: {PIPELINE_ID}")

    viam_client.close()
    return 0

if __name__ == "__main__":
    asyncio.run(main())

Use DataClient.DeleteDataPipeline to delete a data pipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	err = dataClient.DeleteDataPipeline(ctx, pipelineId)
	if err != nil {
		logger.Fatal(err)
	}
	fmt.Printf("Pipeline deleted with ID: %s\n", pipelineId)
}

Use dataClient.DeleteDataPipeline to delete a data pipeline:

import { createViamClient } from "@viamrobotics/sdk";

// Configuration constants – replace with your actual values
let API_KEY = "";  // API key, find or create in your organization settings
let API_KEY_ID = "";  // API key ID, find or create in your organization settings
let PIPELINE_ID = "";

async function main(): Promise<void> {
    // Create Viam client
    const client = await createViamClient({
        credentials: {
            type: "api-key",
            authEntity: API_KEY_ID,
            payload: API_KEY,
        },
    });

    await client.dataClient.deleteDataPipeline(PIPELINE_ID);
    console.log(`Pipeline deleted with ID: ${PIPELINE_ID}`);
}

// Run the script
main().catch((error) => {
    console.error("Script failed:", error);
    process.exit(1);
});

Check pipeline execution history

Data pipeline executions may have any of the following statuses:

  • SCHEDULED: pending execution
  • STARTED: currently processing
  • COMPLETED: successfully finished
  • FAILED: execution error

Use DataClient.ListDataPipelineRuns to view information about past and in-progress executions of a pipeline:

import asyncio

from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType


# Configuration constants – replace with your actual values
API_KEY = ""  # API key, find or create in your organization settings
API_KEY_ID = ""  # API key ID, find or create in your organization settings
PIPELINE_ID = ""


async def connect() -> ViamClient:
    """Establish a connection to the Viam client using API credentials."""
    dial_options = DialOptions(
        credentials=Credentials(
            type="api-key",
            payload=API_KEY,
        ),
        auth_entity=API_KEY_ID
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def main() -> int:
    viam_client = await connect()
    data_client = viam_client.data_client

    pipeline_runs = await data_client.list_data_pipeline_runs(PIPELINE_ID, 10)
    for run in pipeline_runs.runs:
        print(f"Run: ID: {run.id}, status: {run.status}, start_time: {run.start_time}, end_time: {run.end_time}, data_start_time: {run.data_start_time}, data_end_time: {run.data_end_time}")

    viam_client.close()
    return 0

if __name__ == "__main__":
    asyncio.run(main())

Use DataClient.ListDataPipelineRuns to view information about past executions of a pipeline:

package main

import (
	"context"
	"fmt"

	"go.viam.com/rdk/app"
	"go.viam.com/rdk/logging"
)

func main() {
	apiKey := ""
	apiKeyID := ""
	pipelineId := ""

	logger := logging.NewDebugLogger("client")
	ctx := context.Background()
	viamClient, err := app.CreateViamClientWithAPIKey(
		ctx, app.Options{}, apiKey, apiKeyID, logger)
	if err != nil {
		logger.Fatal(err)
	}
	defer viamClient.Close()

	dataClient := viamClient.DataClient()

	pipelineRuns, err := dataClient.ListDataPipelineRuns(ctx, pipelineId, 10)
	if err != nil {
		logger.Fatal(err)
	}
	for _, run := range pipelineRuns.Runs {
		fmt.Printf("Run: ID: %s, status: %s, start_time: %s, end_time: %s, data_start_time: %s, data_end_time: %s\n", run.ID, run.Status, run.StartTime, run.EndTime, run.DataStartTime, run.DataEndTime)
	}

}

Use dataClient.ListDataPipelineRuns to view information about past executions of a pipeline:

import { createViamClient } from "@viamrobotics/sdk";

// Configuration constants – replace with your actual values
let API_KEY = "";  // API key, find or create in your organization settings
let API_KEY_ID = "";  // API key ID, find or create in your organization settings
let PIPELINE_ID = "";

async function main(): Promise<void> {
    // Create Viam client
    const client = await createViamClient({
        credentials: {
            type: "api-key",
            authEntity: API_KEY_ID,
            payload: API_KEY,
        },
    });

    const pipelineRuns = await client.dataClient.listDataPipelineRuns(PIPELINE_ID, 10);
    for (const run of pipelineRuns.runs) {
        console.log(
            `Run: ID: ${run.id}, status: ${run.status}, start_time: ${run.startTime}, ` +
            `end_time: ${run.endTime}, data_start_time: ${run.dataStartTime}, data_end_time: ${run.dataEndTime}`
        );
    }
}

// Run the script
main().catch((error) => {
    console.error("Script failed:", error);
    process.exit(1);
});