Elements API Cookbook
Introduction
The Elements API Cookbook contains recipes for integrating applications using the Elements API. Each recipe describes a common use case you need to solve when implementing an integration. Each recipe also includes an example solution implemented as a Python procedure.
Recipes
A sample Python procedure is an integral part of each recipe.
Except for the requests library, the examples use only modules from the Python Standard Library. The requests library is used
to communicate with the Elements API.
- Authentication
- Listing devices
- Polling EPP Security Events
- Count Security Events by engine
- Polling EDR detections
Authentication
Prior to making requests to the Elements API, you must obtain an authentication token by sending credentials that were
generated within Elements Security Center, defining the desired scope for the requests. If you require read-only access
to the API, you need to set the scope to connect.api.read. Otherwise, you can obtain an authentication token with the following
scopes: connect.api.read and connect.api.write.
The description of each endpoint in the Elements API documentation includes information about the scopes that are required for specific operations.
When you send an authentication request to an endpoint via https://api.connect.withsecure.com/as/token.oauth2,
the request body must include the following properties:
grant_type=client_credentialsscope=<scopes separated with space>
When you send your credentials in the Authorization HTTP header, the header’s value comprises your identifier and a secret
value that was generated during the creation of your credentials in the Elements Security Center. To create this value, you must
combine your identifier and secret value using a colon, and then encode the resulting string using the base64 function.
auth_token := base64($CLIENT_ID + ":" + $SECRET_VALUE)
Place the encoded credentials in the Authorization header with the Basic prefix.
You must send the request body using application/x-www-form-urlencoded. Successful authentication requires valid credentials
in the Authorization header and a client that is allowed to authenticate with the requested scopes. Otherwise, the request will fail.
Regardless of the authentication outcome, Elements API responds in JSON format:
- If authentication is successful, the received JSON object will include the
access_tokenproperty. You can use this token for all subsequent API requests. - If authentication fails, the received JSON object will include an error explanation.
from json import *
from base64 import b64encode
import requests
API_URL = 'https://api.connect.withsecure.com'
def authenticate(client_id, client_secret):
auth_header = b64encode(bytes(client_id + ':' + client_secret, 'utf-8')).decode('utf-8')
headers = {
'Content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'Accept': 'application/json',
'Authorization': 'Basic ' + auth_header,
'User-Agent': 'my-script' # each request must contain User-Agent header
}
scopes = ['connect.api.read', 'connect.api.write'] # authenticated client can read
# and write data
data = {'grant_type': 'client_credentials', 'scope': ' '.join(scopes)}
response = requests.post(API_URL + '/as/token.oauth2', data=data, headers=headers)
if response.ok:
res_body = response.json()
return res_body['access_token']
else:
print('Response=' + response.text)
print('Headers=' + str(response.headers))
# Each response contains an `X-Transaction` header. It can be sent to the Elements API
# support team to help the investigation of possible errors.
print('Transaction-id=' + response.headers.get('X-Transaction'))
raise Exception('Authentication failed')
Managing API Keys
The Elements API provides endpoints to programmatically manage API keys (credentials). This allows you to create, list, and delete API keys without using the Elements Security Center web interface. These operations require the connect.api.write scope.
Important Notes:
- API keys are created as read-only by default (least privilege principle)
- The
clientSecretis only returned once during creation and cannot be retrieved again - You need an existing API key with write permissions to manage other API keys
- API keys can be created at the organization or service organization partner (SOP) level
Creating API Keys
The following example demonstrates how to create a new API key. You can optionally specify the organization ID and whether the key should have read-only or write access.
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
API_KEYS_CREATE_PATH = "/api-keys/v1/create"
def create_api_key(auth_token, description, organization_id=None, read_only=True):
"""
Create a new API key.
Args:
auth_token: Bearer token with connect.api.write scope
description: Human-readable description for the API key (required)
organization_id: Optional organization ID. If not specified, uses current organization
read_only: Whether the key should be read-only (default: True)
Returns:
Dictionary containing the new API key details including clientSecret (only shown once)
"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
body = {
"description": description
}
if organization_id:
body["organizationId"] = organization_id
if not read_only:
body["readOnly"] = False
response = requests.post(API_URL + API_KEYS_CREATE_PATH, json=body, headers=headers)
if response.ok:
api_key = response.json()
print(f"Created API key with ID: {api_key['clientId']}")
print(f"Client ID: {api_key['clientId']}")
print(f"Client Secret: {api_key['clientSecret']}")
print("WARNING: Store the clientSecret securely - it won't be shown again!")
return api_key
else:
print("Error:", response.text)
raise Exception("Failed to create API key")
# Example usage:
# auth_token = authenticate(client_id, client_secret)
# new_key = create_api_key(auth_token, description="API key for monitoring", read_only=True)
Listing API Keys
You can list all API keys for an organization to see what credentials exist:
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
API_KEYS_LIST_PATH = "/api-keys/v1/api-keys"
def list_api_keys(auth_token, organization_id=None):
"""
List all API keys for an organization.
Args:
auth_token: Bearer token with connect.api.write scope
organization_id: Optional organization ID. If not specified, uses current organization
Returns:
List of API keys (without secrets)
"""
headers = {
"Accept": "application/json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {}
if organization_id:
params["organizationId"] = organization_id
response = requests.get(API_URL + API_KEYS_LIST_PATH, params=params, headers=headers)
if response.ok:
result = response.json()
api_keys = result.get("items", [])
print(f"Found {len(api_keys)} API key(s):")
for key in api_keys:
print(f" - ID: {key['clientId']}")
print(f" Client ID: {key['clientId']}")
print(f" Read-only: {key['readOnly']}")
print(f" Created: {key['createdAt']}")
return api_keys
else:
print("Error:", response.text)
raise Exception("Failed to list API keys")
# Example usage:
# auth_token = authenticate(client_id, client_secret)
# keys = list_api_keys(auth_token)
Deleting API Keys
When an API key is no longer needed, you can delete it. This operation is permanent and the deleted key can no longer be used for authentication.
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
def delete_api_key(auth_token, credential_id):
"""
Delete an API key by its credential ID.
Args:
auth_token: Bearer token with connect.api.write scope
credential_id: The UUID of the API key to delete
Returns:
True if deletion was successful
"""
headers = {
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
url = f"{API_URL}/api-keys/v1/api-keys/{credential_id}"
response = requests.delete(url, headers=headers)
if response.status_code == 204:
print(f"Successfully deleted API key: {credential_id}")
return True
else:
print("Error:", response.text)
raise Exception("Failed to delete API key")
# Example usage:
# auth_token = authenticate(client_id, client_secret)
# delete_api_key(auth_token, "ec8a0100-d313-4896-b3cb-02188e060bf3")
Complete Example: API Key Lifecycle Management
Here’s a complete example that demonstrates creating, listing, and deleting API keys:
import requests
from authentication import authenticate
from time import sleep
API_URL = "https://api.connect.withsecure.com"
def manage_api_keys_example(client_id, client_secret, organization_id=None):
"""
Complete example of API key lifecycle management.
"""
# Step 1: Authenticate with write scope
print("Step 1: Authenticating...")
auth_token = authenticate(client_id, client_secret)
# Step 2: List existing API keys
print("\nStep 2: Listing existing API keys...")
existing_keys = list_api_keys(auth_token, organization_id)
# Step 3: Create a new read-only API key
print("\nStep 3: Creating new read-only API key...")
new_key = create_api_key(auth_token, description="Test API key for lifecycle demo", organization_id=organization_id, read_only=True)
# Step 4: List API keys again to see the new one
print("\nStep 4: Listing API keys after creation...")
sleep(1) # Brief pause to ensure consistency
all_keys = list_api_keys(auth_token, organization_id)
# Step 5: Delete the newly created API key
print("\nStep 5: Deleting the newly created API key...")
delete_api_key(auth_token, new_key['clientId'])
# Step 6: Verify deletion
print("\nStep 6: Listing API keys after deletion...")
sleep(1)
final_keys = list_api_keys(auth_token, organization_id)
print("\nAPI key management completed successfully!")
# Run the example:
# manage_api_keys_example("your-client-id", "your-client-secret")
Listing all devices
A paging functionality is available in all Elements API endpoints that return extensive lists of items, such as device or incident listings.
This allows you to specify the maximum number of items to be sent in the response. If the total number of items matching the
query criteria exceeds the specified limit, Elements API includes a property called nextAnchor in the response. This nextAnchor
contains a link to the next batch of items, referred to as the “next page”. If you include this value in the request parameters,
Elements API will return the next page of items. You can repeat this process until nextAnchor is no longer present in the response.
In this recipe, the function list_all_devices() reads all devices of a specified organization that have a critical protection status.
The function sets the limit to 10, which means that the response can contain up to 10 items in the list from the property response.items.
The listing function ends its iteration when the value for the property response.nextAnchor is not found.
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
DEVICES_PATH = "/devices/v1/devices"
def get_devices(auth_token,
organization_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {"limit": 10} # use `limit` to define size of single page
params["protectionStatusOverview"] = "critical"
# with parameter `protectionStatusOverview` we can read select devices with given
# protection status.
if organization_id:
# `organization_id` parameter is optional. If authenticated client belongs to
# partner organization, it can use `organization_id` to select devices, that
# belong to given organization
params["organizationId"] = organization_id
if next_page:
params["anchor"] = next_page # add `anchor` to query parameters to read next page
# as value use property `nextAnchor` from last response
# if `nextAnchor` is not present in response it means
# that query returned last page
response = requests.get(API_URL + DEVICES_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
sys.exit(0)
body = response.json()
items = body["items"] # if no devices match the query, the `items` list is empty
for d in items:
print("Device name={}, status={}, company={}".format(d["name"], d["protectionStatusOverview"],
d["company"]["name"]))
if "nextAnchor" in body:
# if `nextAnchor` is present in response, client can use it to read next page
# of query result. For example by calling function
# get_devices_with_status(client_id, client_secret, next_page=response["nextAnchor"])
print("Link to next page=", body["nextAnchor"])
return body["nextAnchor"]
else:
# if `nextAnchor` is missing in response it means, that client received last page
# of query results
print("Last page")
return None
def list_all_devices(client_id, client_secret, organization_id=None):
auth_token = authenticate(client_id, client_secret)
next_page = True
anchor = None
while next_page:
anchor = get_devices(auth_token, organization_id, anchor)
next_page = anchor is not None
Polling Security Events
To maintain an up-to-date list of continuously generated resources, such as
EPP Security Events or
EDR Incidents,
you can use a technique called polling. Using request parameters to define the time range, you can periodically send a request to Elements API to retrieve all items that were created since the
previous check. In response, Elements API returns only those items that
were created or updated within the specified range.
The function poll_security_events() periodically retrieves all the events that were generated since the timestamp was saved in the
variable last_date. When the function is invoked, this variable is initialized with the current time in the UTC time zone.
If the function get_events_after() finds any EPP Security Event, the last_date variable is updated with the timestamp from the last event
and is used in the next iteration.
The get_events_after() function converts last_date to a string in the following format: YYYY-MM-DDThh:mm:ss.SSS.
This value is then included in the request as a parameter called persistenceTimestampStart. Furthermore, the function sets the
exclusiveStart parameter to true, signifying that Elements API will return all events with a
persistenceTimestamp greater than the value specified in persistenceTimestampStart.
You must always include the exclusiveStart parameter during polling to prevent duplicate items. If this parameter is omitted,
Elements API will return all items with a persistenceTimestamp greater than or equal to the value specified in
persistenceTimestampStart. As a result, you may receive the same event as in the previous iteration.
from datetime import datetime, timezone
from time import sleep
import requests
from authentication import authenticate
API_URL = 'https://api.connect.withsecure.com'
EVENTS_PATH = "/security-events/v1/security-events"
def get_events_page(auth_token, min_date, org_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script",
"Content-type": "application/x-www-form-urlencoded"
}
params = {"limit": 200,
"engineGroup": ["epp", "edr"],
"persistenceTimestampStart": min_date, # read all events created AFTER `min_date`
"order": "asc", # ascending order
}
# `min_date` represents timestamp of last event in previous page
# That event should not be included in next response. When `exclusiveStart=true` is set
# then `persistenceTimestampStart` is used as exclusive start of time range.
params["exclusiveStart"] = "true"
if next_page:
params['anchor'] = next_page
if org_id:
params["organizationId"] = org_id
response = requests.post(API_URL + EVENTS_PATH, data=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def get_events_after(auth_token, last_date, org_id):
# Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
# format: YYYY-MM-DDThh:mm:ss.SSS
last_date_str = last_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
next_page = None
fetch_page = True
print("Reading events created after {}".format(last_date_str))
while fetch_page:
page = get_events_page(auth_token, last_date_str, org_id, next_page)
next_page = page.get("nextAnchor")
# If `nextAnchor` is present we should fetch next page in next iteration
fetch_page = next_page is not None
for event in page["items"]:
print("EventId={}, EventTs={}".format(event["id"],
event["persistenceTimestamp"]))
# updating `last_date` with time when event was persisted
# when iteration ends, `last_date` will be equal to `persistenceTimestamp`
# of last received event
last_date_str = event["persistenceTimestamp"]
# Parse timestamp of last received event after reading all
# events, that were created after `min_dt`
return datetime.strptime(last_date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
def poll_security_events(client_id, client_secret, poll_interval, org_id=None):
# initialize start date with current timestamp
last_date = datetime.now(timezone.utc)
# start infinite loop
while True:
# obtain authentication token
auth_token = authenticate(client_id, client_secret)
# read all events created after `last_date`. After reading all events
# that variable is updated with timestamp of last events. `last_date` will be
# used in next iteration
last_date = get_events_after(auth_token, last_date, org_id)
print("Last date", last_date)
# execute next iteration every `poll_interval`
sleep(poll_interval)
Count Security Events by engine
In addition to listing Security Events, you can request Elements API to produce a data summary. The API selects items matching the parameters, groups them by the chosen property, and returns the number of items belonging to each group. In the response, the API returns a list of rows where each item represents one aggregation group.
from datetime import datetime, timezone, timedelta
import requests
from authentication import authenticate
API_URL = "https://api.connect.withsecure.com"
PATH = "/security-events/v1/security-events"
def count_by_engine_last_week(client_id, client_secret, org_id):
end = datetime.now(tz=timezone.utc)
start = end - timedelta(days=7)
stats = count_by_engine(client_id, client_secret, start, end, org_id)
fmt = "|{}|{}|"
print(fmt.format("Engine", "Events count"))
for row in stats["items"]:
print(fmt.format(row["engine"], row["count"]))
def count_by_engine(client_id, client_secret, start_dt, end_dt, org_id=None):
auth_token = authenticate(client_id, client_secret)
headers = {
# when `Accept` header has value `application/vnd.withsecure.aggr+json`
# then API calculates statistics for events matching query parameters
"Accept": "application/vnd.withsecure.aggr+json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script",
"Content-Type": "application/x-www-form-urlencoded",
}
param_start_dt = start_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
param_end_dt = end_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
# API selects all security events from `epp` engine group that were persisted
# between `start_dt` and `end_dt`, group items by value of `engine` property
# and return number of items in each group
params = {
"engineGroup": ["epp"],
"persistenceTimestampStart": param_start_dt,
"persistenceTimestampEnd": param_end_dt,
"count": "engine", # property `engine` is used to group
}
if org_id:
params["organizationId"] = org_id
response = requests.post(API_URL + PATH, data=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
In the recipe above, the count_by_engine function reads security events in a given time range, specified with the
persistenceTimestampStart and persistenceTimestampEnd query parameters, for the selected engine group. The request must include the
Accept: "application/vnd.withsecure.aggr+json" header to indicate that it expects a data summary, and it uses the count: "engine" parameter to instruct the API to group security events by the
engine.
The count_by_engine_last_week function gets the response from the API and iterates over the list of items to produce a table. Each row
represents a different engine: the engine name is shown in the first column and the number of events from that engine is printed in the
second column.
Polling EDR detections
It is only possible to read a list of EDR detections, for a single EDR incident. To keep the list up-to-date, you have to iterate over the list of open incidents and for each item, read the list of related detections. A long list of incidents will lead to a high number of requests that are sent to Elements API. It may lead to request throttling when the frequency is too high.
To limit the risk of request throttling, we recommend that you query detection only for those incidents that were updated since the previous check. The incidents are mutable entities, which means that their state may change over time, for example, when the system receives a new detection or the status of an incident is updated. Elements API allows you to find incidents that were updated within a specified time range. After getting a list of recent updates, you can use it to fetch last detections.
The function poll_edr_detections() periodically calls the get_incidents_updated_after function to retrieve EDR incidents that were
created after last_date. The last_date is initialized when the poll_detections() function is called and it gets updated whenever
the get_incidents_updated_after function finds a new incident. Whenever a new incident is found, the function print_all_detections is triggered
to read all the related detections using a paging technique.
from datetime import datetime, timezone
from time import sleep
import requests
from authentication import authenticate
API_URL = 'https://api.connect.withsecure.com'
INCIDENTS_PATH = "/incidents/v1/incidents"
DETECTIONS_PATH = "/incidents/v1/detections"
def get_updated_incidents(auth_token, min_date, org_id=None, next_page=None):
headers = {
"Accept": "application/json", # always use that header if you expect JSON response
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {
"limit": 50,
"archived": False, # read only not archived incidents
"updatedTimestampStart": min_date, # read all incidents updated AFTER `min_date`
"order": "asc", # ascending order by `updateTimestampStart`
}
# `min_date` represents timestamp of last event in previous page
# That event should not be included in next response. When `exclusiveStart=true` is set
# then `updatedTimestampStart` is used as exclusive start of time range.
params["exclusiveStart"] = "true"
if next_page:
params['anchor'] = next_page
if org_id:
params["organizationId"] = org_id
response = requests.get(API_URL + INCIDENTS_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def get_new_detections_page(auth_token, incident_id, min_date, next_page=None):
headers = {
"Accept": "application/json",
"Authorization": "Bearer " + auth_token,
"User-Agent": "my-script"
}
params = {"limit": 100,
"incidentId": incident_id
}
if next_page:
params['anchor'] = next_page
response = requests.get(API_URL + DETECTIONS_PATH, params=params, headers=headers)
if not response.ok:
print("Error", response.text)
raise Exception("Request error")
return response.json()
def print_all_detections(auth_token, incident_id, min_date):
next_page = None
fetch_page = True
while fetch_page:
page = get_new_detections_page(auth_token, incident_id, min_date, next_page)
next_page = page.get("nextAnchor")
fetch_page = next_page is not None
for det in page["items"]:
print("DetectionId={}, IncidentId={}, Created={}".format(det["detectionId"],
det["incidentId"], det["createdTimestamp"]))
def get_incidents_updated_after(auth_token, min_dt, org_id):
# Elements API accepts timestamps in [RFC 3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6)
# format: YYYY-MM-DDThh:mm:ss.fff
last_date = min_dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
next_page = None
fetch_page = True
print("Reading incidents updated after {}".format(last_date))
updated_incidents = []
while fetch_page:
page = get_updated_incidents(auth_token, last_date, org_id, next_page)
next_page = page.get("nextAnchor")
# If `nextAnchor` is present we should fetch next page in next iteration
fetch_page = next_page is not None
for incident in page["items"]:
print("IncidentId={}, Updated Ts={}".format(incident["incidentId"],
incident["updatedTimestamp"]))
# updating `last_date` with time when incident was updated
# when iteration ends, `last_date` will be equal to `updatedTimestamp`
# of last received incident
last_date = incident["updatedTimestamp"]
updated_incidents.append((incident["incidentId"], last_date))
# Parse timestamp of last received incident after reading all
# events, that were created after `min_dt`
return {
"last_date": datetime.strptime(last_date, "%Y-%m-%dT%H:%M:%S.%fZ"),
"updated_incidents": updated_incidents
}
def poll_edr_detections(client_id, client_secret, poll_interval, org_id=None):
# initialize start date with current timestamp
last_date = datetime.now(timezone.utc)
# start infinite loop
while True:
# obtain authentication token
auth_token = authenticate(client_id, client_secret)
# read all incidents created after `last_date`. After reading all items
# that variable is updated with timestamp of last incident. `last_date` will be
# used in next iteration
updates = get_incidents_updated_after(auth_token, last_date, org_id)
last_date = updates["last_date"]
for incident in updates["updated_incidents"]:
print_all_detections(auth_token, incident[0], incident[1])
print("Last date", last_date)
# execute next iteration every `poll_interval`
sleep(poll_interval)
Next Steps
- Review the API Reference for more endpoints
- Check out the Getting Started Guide for basic concepts
- Join the Community for support