Configure Worker Groups
Preview Feature
The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.
Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.
These code examples demonstrate how to use the Cribl Python SDK for the control plane or the Cribl API to configure, scale, and replicate Worker Groups in Cribl Stream.
About the Code Examples
The code examples use Bearer token authentication. Read the authentication documentation for the API or SDKs to learn how to configure authentication. The Permissions granted to your Bearer token must include creating and managing Worker Groups.
Replace the variables in the examples with the corresponding information for your Cribl deployment.
For on-prem deployments, to use
httpsin the URLs, you must configure Transport Layer Security (TLS).The configurations in the examples do not include all available body parameters. For a complete list of body parameters for each endpoint, refer to the documentation in the API Reference.
Configure Worker Groups with the Python SDK
The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl Python SDK for the control plane.
The examples for creating and scaling a Worker Group in Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:
estimatedIngestRate | Throughput | Worker Processes |
|---|---|---|
| 1024 | 12 MB/s | 6 |
| 2048 | 24 MB/s | 9 |
| 3072 | 36 MB/s | 14 |
| 4096 | 48 MB/s | 21 |
| 5120 | 60 MB/s | 30 |
| 7168 | 84 MB/s | 45 |
| 10240 | 120 MB/s | 62 |
| 13312 | 156 MB/s | 93 |
| 15360 | 180 MB/s | 186 |
1. Create a Worker Group
This example creates a new Worker Group in Cribl Stream.
In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow the steps
at https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ConfigGroup,
ConfigGroupCloud,
CloudProvider,
Security,
SchemeClientOauth,
ProductsCore,
EstimatedIngestRateOptionsConfigGroup,
)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
# Equivalent to 24 MB/s maximum estimated ingest rate with 9 Worker Processes
ESTIMATED_INGEST_RATE = EstimatedIngestRateOptionsConfigGroup.RATE24_MB_PER_SEC
group = ConfigGroup(
id=WORKER_GROUP_ID,
name="my-worker-group",
description="Cribl.Cloud Worker Group",
cloud=ConfigGroupCloud(provider=CloudProvider.AWS, region="us-west-2"),
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=False,
provisioned=False,
)
async def main():
# Create authenticated SDK client
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify that Worker Group doesn't already exist
worker_group_response = cribl.groups.get(id=group.id, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(f"❌ Worker Group already exists: {group.id}. Try a different group ID.")
return
# Create the worker group
cribl.groups.create(
product=ProductsCore.STREAM,
id=group.id,
name=group.name,
description=group.description,
cloud=group.cloud,
worker_remote_access=group.worker_remote_access,
is_fleet=group.is_fleet,
is_search=group.is_search,
on_prem=group.on_prem,
estimated_ingest_rate=ESTIMATED_INGEST_RATE,
provisioned=group.provisioned,
)
print(f"✅ Worker Group created: {group.id}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
On-Prem Create Worker Group Example
This example demonstrates how to create an on-prem Worker Group in Cribl Stream.
This example performs the following operations:
1. Connects to Cribl Stream and authenticates.
2. Verifies that a Worker Group with the specified ID does not already exist.
3. Creates the Worker Group.
4. Commits the configuration changes to the Worker Group.
5. Deploys the configuration changes to the Worker Group.
Prerequisites:
- Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
- Replace the WORKER_GROUP_ID placeholder value with the Worker Group ID that
you want to use.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"
base_url = f"{ONPREM_SERVER_URL}/api/v1"
async def main():
# Initialize Cribl client
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Construct the base URL for the Worker Group
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Verify that Worker Group doesn't already exist
worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(f"❌ Worker Group already exists: {WORKER_GROUP_ID}. Try a different group ID.")
return
# Create the Worker Group
cribl.groups.create(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
name="my-worker-group",
description="My Worker Group description",
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=True,
)
print(f"✅ Worker Group created: {WORKER_GROUP_ID}")
# Commit configuration changes
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example for creating a Worker Group",
server_url=group_url,
effective=True,
files=["."],
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy configuration changes to the Worker Group
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
2. Scale the Worker Group
The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to True to activate Cribl.Cloud resources.
The on-prem example assumes the Syslog Source load balancer (LB) is enabled on a Cribl Stream system with 6 physical cores hyperthreaded (12 vCPUs). Because the Syslog Source LB reserves an additional core, the example scales the Worker process count from the default to -3: two cores for system/API overhead plus one for the LB process, so the system spawns nine Worker Processes. For more information, see Optimize a Distributed Deployment or Hybrid Group and Choose a Process Count. The on-prem example also updates the Worker Group system settings, commits and deploys the changes, and restarts the Worker Group to apply the changes.
The request bodies for the
groups.update(Cribl.Cloud) andsystem.settings.cribl.update(on-prem) methods require a complete representation of the Worker Group or settings, respectively, that you want to update. These methods do not support partial updates. Cribl removes any omitted fields when updating the Worker Group or settings.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow the steps
at https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
"""
import asyncio
from typing import Optional
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ConfigGroup,
ProductsCore,
SchemeClientOauth,
Security,
EstimatedIngestRateOptionsConfigGroup,
)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Use the same Worker Group ID as in the previous example
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
def resolve_group(cribl, group_id: str, product: ProductsCore) -> Optional[ConfigGroup]:
resp = cribl.groups.get(id=group_id, product=product)
# Case 1: List-style wrapper with items
items = getattr(resp, "items", None)
if items:
return items[0]
# Case 2: Direct ConfigGroup object
if isinstance(resp, ConfigGroup):
return resp
# Nothing found or unexpected shape
return None
async def main():
# Create authenticated SDK client
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify that Worker Group already exists
group = resolve_group(cribl, WORKER_GROUP_ID, ProductsCore.STREAM)
if group is None:
print(f"Worker Group {WORKER_GROUP_ID} not found.")
else:
# Scale and provision the Worker Group
# Equivalent to 48 MB/s maximum estimated ingest rate with 21 Worker Processes
group.estimated_ingest_rate = EstimatedIngestRateOptionsConfigGroup.RATE48_MB_PER_SEC
group.provisioned = True
cribl.groups.update(
product=ProductsCore.STREAM,
id=group.id,
id_param=group.id,
name=group.name,
description=group.description,
cloud=group.cloud,
worker_remote_access=group.worker_remote_access,
is_fleet=group.is_fleet,
is_search=group.is_search,
on_prem=group.on_prem,
estimated_ingest_rate=group.estimated_ingest_rate,
provisioned=group.provisioned,
)
print(f"✅ Worker Group scaled and provisioned: {group.id}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
On-Prem Worker Group Process Optimization Example
This example demonstrates how to optimize Worker Process settings for an
existing on-prem Worker Group in Cribl Stream using the Control Plane SDK,
following the scaling guidelines from the Cribl documentation.
This example performs the following operations:
1. Connects to an existing on-prem Worker Group.
2. Retrieves the current system settings for the Worker Group.
3. Optimizes Worker Process settings following the scaling documentation,
assuming the Syslog Source load balancer is enabled on a Cribl Stream
system with 6 physical cores hyperthreaded (12 vCPUs):
- Process count: -3 (to reserve two CPU cores for system/API overhead plus
one for the load balancer process)
- Memory: 2048 MB (2 GB per Worker Process)
- Minimum Worker Process count: 2 (to spawn at least two Worker Processes)
4. Updates the Worker Group's system settings with the optimized configuration.
5. Commits the configuration changes to the Worker Group.
6. Deploys the configuration changes to the Worker Group.
7. Restarts the Worker Group to apply the changes.
The Cribl documentation provides more information about optimizing Worker Processes:
https://docs.cribl.io/stream/scaling/#optimize-a-distributed-deployment-or-hybrid-group
Prerequisites:
- Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
- Replace WORKER_GROUP_ID with your actual Worker Group ID.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
Security,
WorkersTypeSystemSettingsConf,
)
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"
base_url = f"{ONPREM_SERVER_URL}/api/v1"
async def main():
# Initialize Cribl client
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Construct the base URL for the Worker Group
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Verify that Worker Group exists
worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
if not worker_group_response.items or len(worker_group_response.items) == 0:
print(f"⚠️ Worker Group not found: {WORKER_GROUP_ID}. Please create it first.")
return
print(f"✅ Found Worker Group: {WORKER_GROUP_ID}")
# Get current system settings to preserve existing configuration
current_settings = cribl.system.settings.cribl.list(server_url=group_url)
current_conf = current_settings.items[0] if current_settings.items else None
if not current_conf:
raise Exception("Failed to retrieve current system settings")
print(f"📊 Current Worker Process settings:")
print(f" Worker Process count: {current_conf.workers.count}")
print(f" Memory: {current_conf.workers.memory} MB")
print(f" Minimum Worker Processes: {current_conf.workers.minimum}")
# Configure Worker Process settings following scaling documentation
# Process count = -3 (six physical cores hyperthreaded for 12 vCPUs; reserves 2 cores for
# system/API overhead plus 1 for the load balancer process)
# Memory: 2048 MB (default; 2 GB per Worker Process)
# Minimum: 2 (spawn at least two Worker Processes)
workers_config = WorkersTypeSystemSettingsConf(
count=-3, # Negative number specifies the number of CPU cores to reserve for system/API overhead
memory=2048, # Amount of heap memory available to each Worker Process, in MB
minimum=2, # Minimum number of Worker Processes to spawn
)
# Update system settings with the optimized configuration for Worker Processes
# Preserve other settings from the current configuration
updated_conf = current_conf.model_copy(update={"workers": workers_config})
cribl.system.settings.cribl.update(
**updated_conf.model_dump(exclude_none=False),
server_url=group_url,
)
print(f"\n✅ Worker Process settings optimized:")
print(f" Worker Process count: {workers_config.count}")
print(f" Memory: {workers_config.memory} MB per Worker Process")
print(f" Minimum Worker Processes: {workers_config.minimum}")
# Commit configuration changes
commit_response = cribl.versions.commits.create(
message="Optimize Worker Process settings",
effective=True,
files=["."],
server_url=group_url
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the Worker Group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy configuration changes to the Worker Group
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
# Restart the Worker Group to apply the changes
print(f"\n🔄 Restarting Worker Group to apply changes...")
cribl.system.settings.restart(server_url=group_url)
print(f"✅ Worker Group restarted successfully")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
3. Replicate a Worker Group
This example creates a replica Worker Group in Cribl Stream by cloning an existing Worker Group configuration. The request body uses the source_group_id parameter to specify the Worker Group to clone.
The replica Worker Group inherits the configuration from the source Worker Group, including settings and resources like Sources, Destinations, and Pipelines.
To run this example, you must have at least one existing Worker Group named my-worker-group in Cribl Stream to use as the source Worker Group.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
"""
import asyncio
from typing import Optional
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
CloudProvider,
ConfigGroup,
ConfigGroupCloud,
EstimatedIngestRateOptionsConfigGroup,
ProductsCore,
SchemeClientOauth,
Security,
)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
SOURCE_WORKER_GROUP_ID = "my-worker-group" # The id of the Worker Group to clone
REPLICA_WORKER_GROUP_ID = "my-replica-worker-group" # The id to use for the replica Worker Group
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
async def main() -> None:
"""Main function that demonstrates Worker Group replication"""
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
try:
# Verify that the source Worker Group exists
source_group_response = cribl.groups.get(id=SOURCE_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if not source_group_response.items or len(source_group_response.items) == 0:
print(f"❌ Source Worker Group not found: {SOURCE_WORKER_GROUP_ID}. Create the source Worker Group first.")
exit(1)
# Replicate the Worker Group
replicate_worker_group(cribl, SOURCE_WORKER_GROUP_ID)
except Exception as error:
message = str(error)
print(f"Error: {message}")
exit(1)
def replicate_worker_group(
cribl: CriblControlPlane, source_id: str
) -> Optional[ConfigGroup]:
"""
Replicates a Worker Group with a unique ID
"""
# Verify that the replica Worker Group does not exist
replica_group_response = cribl.groups.get(id=REPLICA_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if replica_group_response.items and len(replica_group_response.items) > 0:
print(f"❌ Replica Worker Group already exists: {REPLICA_WORKER_GROUP_ID}. Try a different group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
name="my-replica-worker-group",
description=f"Worker Group cloned from {source_id} with identical configuration",
cloud=ConfigGroupCloud(provider=CloudProvider.AWS, region="us-east-1"),
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=False,
estimated_ingest_rate=EstimatedIngestRateOptionsConfigGroup.RATE24_MB_PER_SEC, # Equivalent to 24 MB/s maximum estimated ingest rate with 9 Worker Processes
source_group_id=source_id,
)
print(f"✅ Worker Group replicated: {REPLICA_WORKER_GROUP_ID} (cloned from {source_id})")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
message = str(error)
print(f"❌ Something went wrong: {message}")
"""
On-Prem Replicate Worker Group Example
This example demonstrates how to replicate an on-prem Worker Group in
Cribl Stream.
This example performs the following operations:
1. Connects to Cribl Stream and authenticates.
2. Verifies that the source Worker Group (SOURCE_WORKER_GROUP_ID) exists.
3. Verifies that a Worker Group with the specified REPLICA_WORKER_GROUP_ID does
not already exist.
4. Creates a new Worker Group that replicates the specified source Worker Group.
5. Commits the configuration changes to the new Worker Group.
6. Deploys the configuration changes to the new Worker Group.
Prerequisites:
- Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
- Replace the WORKER_GROUP_ID and REPLICA_WORKER_GROUP_ID placeholder values
with the Worker Group IDs that you want to use.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
SOURCE_WORKER_GROUP_ID = "my-worker-group" # The id of the Worker Group to clone
REPLICA_WORKER_GROUP_ID = "my-replica-worker-group" # The id to use for the replica Worker Group
base_url = f"{ONPREM_SERVER_URL}/api/v1"
async def main() -> None:
# Initialize Cribl client
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Construct the base URL for the replica Worker Group
replica_group_url = f"{base_url}/m/{REPLICA_WORKER_GROUP_ID}"
# Verify that the source Worker Group exists
source_group_response = cribl.groups.get(id=SOURCE_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if not source_group_response.items or len(source_group_response.items) == 0:
print(f"❌ Source Worker Group not found: {SOURCE_WORKER_GROUP_ID}. Create the source Worker Group first.")
return
# Verify that the replica Worker Group does not exist
replica_group_response = cribl.groups.get(id=REPLICA_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if replica_group_response.items and len(replica_group_response.items) > 0:
print(f"❌ Replica Worker Group already exists: {REPLICA_WORKER_GROUP_ID}. Try a different group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
name="my-replica-worker-group",
description=f"Worker Group cloned from {SOURCE_WORKER_GROUP_ID} with identical configuration",
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=True,
source_group_id=SOURCE_WORKER_GROUP_ID,
)
print(f"✅ Worker Group replicated: {REPLICA_WORKER_GROUP_ID} (cloned from {SOURCE_WORKER_GROUP_ID})")
# Commit configuration changes
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example for replicating a Worker Group",
server_url=replica_group_url,
effective=True,
files=["."],
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {REPLICA_WORKER_GROUP_ID}, commit ID: {version}")
# Deploy configuration changes to the Worker Group
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {REPLICA_WORKER_GROUP_ID}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
message = str(error)
print(f"❌ Something went wrong: {message}")
4. Confirm the Worker Group Configuration
Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow the steps
at https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (ProductsCore, SchemeClientOauth,
Security)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
async def main():
# Create authenticated SDK client
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Retrieve and list all Worker Groups
worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)
if worker_groups_response.items:
print(f"✅ List of Worker Group Configurations:")
for group in worker_groups_response.items:
print(group) # Print the entire configuration for each Worker Group
else:
print(f"❌ No Worker Groups found.")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
NOTE: This example is for on-prem deployments only.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
base_url = f"{ONPREM_SERVER_URL}/api/v1"
async def main():
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Retrieve and list all Worker Groups
worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)
if worker_groups_response.items:
print(f"✅ List of Worker Group Configurations:")
for group in worker_groups_response.items:
print(group) # Print the entire configuration for each Worker Group
else:
print(f"❌ No Worker Groups found.")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
5. Commit and Deploy the Worker Group
This example demonstrates how to commit and deploy the Worker Group configuration, then commit to the Leader to keep it in sync with the Worker Group. You can commit and deploy immediately after a single create or update request or after multiple requests.
Committing and deploying the Worker Group configuration requires three requests, which the Python SDK example chains together:
- Commit pending changes to the Worker Group. This request commits only the configuration changes for Worker Groups by specifying the file
local/cribl/groups.yml. - Deploy the committed changes to the Worker Group. This request includes the
versionbody parameter, which uses the value ofcommitfrom the response body for the commit request. - Commit the changes to the Leader to keep the Leader in sync with the Worker Group.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow the steps
at https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security, SchemeClientOauth
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Use the same Worker Group ID as in previous examples
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
async def main():
# Create authenticated SDK client
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Commit configuration changes
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example",
effective=True,
files=["."],
server_url=group_url
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy configuration changes
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
NOTE: This example is for on-prem deployments only.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id" # Use the same Worker Group ID as in previous examples
base_url = f"{ONPREM_SERVER_URL}/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
async def main():
# Initialize Cribl client
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Commit configuration changes
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example",
effective=True,
files=["."],
server_url=group_url
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy configuration changes
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
Configure Worker Groups with the Cribl API
The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl API.
The examples for creating and scaling a Worker Group on Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:
estimatedIngestRate | Throughput | Worker Processes |
|---|---|---|
| 1024 | 12 MB/s | 6 |
| 2048 | 24 MB/s | 9 |
| 3072 | 36 MB/s | 14 |
| 4096 | 48 MB/s | 21 |
| 5120 | 60 MB/s | 30 |
| 7168 | 84 MB/s | 45 |
| 10240 | 120 MB/s | 62 |
| 13312 | 156 MB/s | 93 |
| 15360 | 180 MB/s | 186 |
1. Create a Worker Group
This example creates a new Worker Group in Cribl Stream.
In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes. In the on-prem deployment example, the new Worker Group is created with eight Workers.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "Cribl.Cloud Worker Group",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"estimatedIngestRate": 2048,
"provisioned": false
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "My Worker Group description",
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": true,
"workerCount": 8
}'2. Scale the Worker Group
The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to true to activate Cribl.Cloud resources.
The on-prem example assumes the Syslog Source load balancer (LB) is enabled on a Cribl Stream system with 6 physical cores hyperthreaded (12 vCPUs). Because the Syslog Source LB reserves an additional core, the example scales the process count from the default to -3: two cores for system/API overhead plus one for the LB process, so the system spawns nine Worker Processes. For more information, see Optimize a Distributed Deployment or Hybrid Group and Choose a Process Count. The on-prem example also updates the Worker Group system settings, commits and deploys the changes, and restarts the Worker Group to apply the changes.
The request bodies for the
PATCH /products/{product}/groups/{id}(Cribl.Cloud) andPATCH /system/settings/conf(on-prem) endpoints must include a complete representation of the Worker Group or settings, respectively, that you want to update. These endpoints do not support partial updates. Cribl removes any omitted fields when updating the Worker Group or settings.
curl --request PATCH \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "Cribl.Cloud Worker Group",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"estimatedIngestRate": 2048,
"provisioned": true
}'curl --request PATCH \
--url "https://${hostname}:${port}/api/v1/products/stream/groups/my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"api": {
"host": "0.0.0.0",
"port": 1234,
"ssl": {
"disabled": false,
"certificateName": "myCertificate",
"certPath": "$CRIBL_HOME/local/cribl/auth/certs/myCertificate.crt",
"privKeyPath": "$CRIBL_HOME/local/cribl/auth/certs/myCertificate.key"
}
},
"system": {
"upgrade": "api",
"intercom": true
},
"workers": {
"count": -3,
"memory": 2048,
"minimum": 2
},
"proxy": {
"useEnvVars": true
},
"upgradeSettings": {
"disableAutomaticUpgrade": true,
"enableLegacyEdgeUpgrade": false
},
"shutdown": {
"drainTimeout": 10
}
}'3. Replicate the Worker Group
This example creates a replica Worker Group in Cribl Stream by cloning an existing Worker Group configuration. The new Worker Group uses the sourceGroupId body parameter to specify the Worker Group to clone. The replica Worker Group inherits the configuration from the source Worker Group, including settings and resources like Sources, Destinations, and Pipelines.
To run this example, you must have at least one existing Worker Group named my-worker-group in Cribl Stream to use as the source Worker Group.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-replica-worker-group",
"name": "my-replica-worker-group",
"description": "Worker Group cloned from my-worker-group with identical configuration",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"sourceGroupId": "my-worker-group"
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-replica-worker-group",
"name": "my-replica-worker-group",
"description": "Worker Group cloned from my-worker-group with identical configuration",
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": true,
"sourceGroupId": "my-worker-group"
}'4. Confirm the Worker Group Configuration
Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.
curl --request GET \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json"curl --request GET \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json"5. Commit and Deploy the Worker Group
This example demonstrates how to commit and deploy the Worker Group configurations, then commit to the Leader to keep it in sync with the Worker Groups. You can commit and deploy immediately after a single create or update request or after multiple requests.
On-prem deployments also require restarting the Worker Groups if you want their new configurations to take effect.
First, commit pending changes to the Worker Group. This request commits only the configuration changes for the Worker Group by specifying the file local/cribl/groups.yml. Repeat this request for each Worker Group whose changes you want to commit.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit?groupId=my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"files": [
"local/cribl/groups.yml"
],
"message": "Commit Worker Group configuration"
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/version/commit?groupId=my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"files": [
"local/cribl/groups.yml"
],
"message": "Commit Worker Group configuration"
}'Second, deploy the committed changes to the Worker Group. This request includes the version body parameter, which uses the value of commit from the response body for the commit request. Repeat this request for each Worker Group whose committed changes you want to deploy.
curl --request PATCH \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group/deploy" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'curl --request PATCH \
--url "https://${hostname}:${port}/api/v1/products/stream/groups/my-worker-group/deploy" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'Commit the changes to the Leader to keep the Leader in sync with the Worker Group.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"message": "Sync my-worker-group with Leader"
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/version/commit" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"message": "Sync my-worker-group with Leader"
}'Finally, for on-prem deployments only, restart the Worker Group. Repeat this request for each Worker Group whose configuration you want to take effect.
curl --request POST \
--url "https://${hostname}:${port}/api/v1/m/my-worker-group/system/settings/restart" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json"