How to use Aquarium webhooks to communicate with your own services, including automating labeling service integrations.
Overview
This page provides two guides:
Integrating with Webhooks (Generically)
Integrating with Labeling Using Webhooks
Integrating With Webhooks
As a data operations platform, we encourage you to integrate with your other systems and tooling in your practice with Aquarium. To support use cases where you'd prefer for these events to be pushed to you via an HTTP request, we've implemented webhooks.
These webhooks can be triggered from these events:
dataset-complete
dataset-failed
issues-created
issue-update
issue-exported
It is up to you to provide an endpoint that the webhooks can communicate with. The webhook will send a POST request with a standard payload. The payload schemas can be seen here.
Configuration
Webhooks are configured for events per project, and can be edited under the Webhooks tab on a project page:
One endpoint can service multiple events, or you can configure a unique endpoint for every event you want to be alerted on. You can also deactivate the entire webhook if you need to quickly disable sending to it.
Testing the Webhook
You can also test the endpoint by sending a minimal test payload using the send icon; we will generate a test event and send it through the same processing pipelines that other events go through.
Payloads will always be POSTed to the endpoint, with the headers Content-Type: application/json
An authenticating header will also be included if one has been configured for the organization (see below).
Specific schemas can be viewed here, but all payloads will follow the same basic schema:
{ event: str,// the event-type that triggered the call project: str; // the name of the project that the event originated in [entity]: Dict; // a json representation of the entity that the event refers to, if any}
Authentication
When you configure a webhook that will be called by an Aquarium service, it will also be available for the public to call. To ensure that any payload you receive on that endpoint originated with Aquarium, we've provided a method to generate a secret key in your organization settings page. This key applies across all configured webhooks in all projects in your organization. We'll generate and show it only once in the UI, so please remember to write it down!
Once a secret key is generated, it will be set in all request headers as X-Aquarium-Secret which you can verify against the secret key you've been shown.
# example of how to compare secret key for verificationaq_secret = request.headers.get("x-aquarium-secret")if aq_secret != AQ_WEBHOOK_SECRET:returnf"Bad Request: {msg}",400
A new secret key will immediately replace the previous one.
Event Schemas
All event payloads will follow the same basic schema:
{ event: str,// the event-type that triggered the call project: str; // the name of the project that the event originated in [entity]: Dict; // a json representation of the entity that the event refers to, if any}
Below is an exhaustive list of schema definitions for every event that Aquarium supports and the shape of the [entity] in the payload.
Dataset Completed Processing
The dataset-complete event fires when a labeled dataset or an inference set for a labeled dataset has been uploaded; the shape will differ slightly depending on what was uploaded.
The dataset-failed event fires when a labeled dataset or an inference set for a labeled dataset has been uploaded; the shape will differ slightly depending on what was uploaded.
{
event: "issues-created",
project: str,
issues: [{
id: str,
issue_name: str,
creation_type: "auto" | "manual",
element_type: "frame" | "crop",
num_elements: int,
originating_inference_set_name?: str // the inference set that was used to generate this issue's element, usually applicable to auto-created issues
}]
}
Issue Updated
{
event: "issue-updated",
project: str,
issue_update: {
id: str,
update_type: "add" | "remove" | "move" | "elt_status" | "issue_status",
num_elements_added?: int, // if update_type == "add"
num_elements_removed?: int, // if update_type == "remove"
num_elements_moved?: int, // if update_type == "move"
num_elements_already_present?: int, // if update_type == "move"
original_issue_name?: int, // if update_type == "move", the issue the elements were originally in
new_elements_status?: str, // if update_type == "elt_status"
num_elements_updated?: int, // if update_type == "elt_status"
previous_issue_status?: str, // if update_type == "issue_status"
new_issue_status?: str // if update_type == "issue_status"
}
}
A common use case for Aquarium datasets is to identify problem labels and group them using segments, downloading a JSON or CSV representation of a segment's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.
The issue-exported event will POST the issue's elements to a webhook. A full schema can be found here:
You can then use the elements in segment to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.
server.py
from flask import Flask, requestimport osfrom python_graphql_client import GraphqlClientAQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")LABELING_API_KEY = os.getenv("LABELING_API_KEY")LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")labeling_api_headers ={# Replace with the proper API key header if any for your service"Authorization":f"Bearer {LABELING_API_KEY}"}client =GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)# Replace with appropriate graphql mutation.# In this example, the way to requeue a label is to remove it and mark as a templaterelabel_mutation_fragment =""" mutation BulkDeleteLabels ( $projectId: ID!, $makeTemplates: Boolean = true, $labelIds: [ID!]) { project (where: {id: $projectId}) { bulkDeleteLabels ( where: { id_in: $labelIds }, makeTemplates: $makeTemplates, waitForQueue: true ) { count } } }"""app =Flask(__name__)@app.route("/webhook", methods=["POST"])defhandle_webhook_payload():# Optionally verify that the payload came from Aquarium aq_secret = request.headers.get("x-aquarium-secret")if aq_secret != AQ_WEBHOOK_SECRET:returnf"Bad Request: {msg}",400 payload_envelope = request.get_json()ifnot payload_envelope: msg ="no payload body received"print(f"error: {msg}")returnf"Bad Request: {msg}",400ifnotisinstance(payload_envelope, dict)ornot payload_envelope.get("event"): msg ="invalid webhook payload format"print(f"error: {msg}")returnf"Bad Request: {msg}",400 event_type = payload_envelope["event"]if event_type =="issue-exported":ifnot payload_envelope.get("issue"): msg ="webhook payload did not contain expected key: issue"print(f"error: {msg}")returnf"Bad Request: {msg}",400_format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])else: msg =f"endpoint not setup to handle {event_type} events yet"print(f"error: {msg}")returnf"Bad Request: {msg}",400return ("",204)def_format_and_export_to_graphql_api(project_name,issue): label_ids =set()for element in issue["elements"]:if element["element_type"]=="crop": label_ids.add(element["crop_data"]["uuid"])else:for label in element["frame_data"]["label_data"]: label_ids.add(label["uuid"]) variables ={"projectId": project_name,"labelIds":list(label_ids)} client.execute( query=relabel_mutation_fragment, variables=variables, )if__name__=="__main__": PORT =int(os.getenv("PORT"))if os.getenv("PORT")else8080 app.run(host="127.0.0.1", port=PORT, debug=True)
server.py
from flask import Flask, requestimport osimport requestsAQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")LABELING_API_KEY = os.getenv("LABELING_API_KEY")LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")labeling_api_headers ={# Substitute with the proper API key header if any for your service"Authorization":f"Bearer {LABELING_API_KEY}"}app =Flask(__name__)@app.route("/webhook", methods=["POST"])defhandle_webhook_payload():# Optionally verify that the payload came from Aquarium aq_secret = request.headers.get("x-aquarium-secret")if aq_secret != AQ_WEBHOOK_SECRET:returnf"Bad Request: {msg}",400 payload_envelope = request.get_json()ifnot payload_envelope: msg ="no payload body received"print(f"error: {msg}")returnf"Bad Request: {msg}",400ifnotisinstance(payload_envelope, dict)ornot payload_envelope.get("event"): msg ="invalid webhook payload format"print(f"error: {msg}")returnf"Bad Request: {msg}",400 event_type = payload_envelope["event"]if event_type =="issue-exported":ifnot payload_envelope.get("issue"): msg ="webhook payload did not contain expected key: issue"print(f"error: {msg}")returnf"Bad Request: {msg}",400_format_and_export_to_rest_api(payload_envelope["project_name"], payload_envelope["issue"])else: msg =f"endpoint not setup to handle {event_type} events yet"print(f"error: {msg}")returnf"Bad Request: {msg}",400return ("",204)def_format_and_export_to_rest_api(project_name,issue): relabel_frames = []for element in issue["elements"]: relabel_frames.append({"id": element["frame_id"], "url": element["frame_data"]["sensor_data"]["data_urls"]["image_url"] # select the right key for your media type
})# Replace with appropriate post body.# In this example, we assume the way to requeue a label is to resubmit the frame(s) as a new batch new_dataset_payload ={"name":f"{issue['dataset']}_relabel_{issue['issue_name']}","project": project_name,"frames": relabel_frames} requests.post(LABELING_API_ENDPOINT, json=new_dataset_payload, headers=labeling_api_headers)if__name__=="__main__": PORT =int(os.getenv("PORT"))if os.getenv("PORT")else8080 app.run(host="127.0.0.1", port=PORT, debug=True)
server.py
import datetime
from nis import cat
from flask import Flask, request
import os
import labelbox
from labelbox import MediaType
import labelbox
from labelbox.schema.ontology import OntologyBuilder, Tool, Classification,Option
import aquariumlearning as al
labeling_api_headers = {
# Substitute with the proper API key header if any for your service
"Authorization": f"Bearer {LABELBOX_API_KEY}"
}
app = Flask(__name__)
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELBOX_API_KEY = os.getenv("LABELBOX_API_KEY")
AQUARIUM_API_KEY = os.getenv("AQUARIUM_API_KEY")
print(LABELBOX_API_KEY)
# set up LabelBox client
# depending on your webhook setup, how you access
# your labelbox API key may be different
lb_client = labelbox.Client(api_key=LABELBOX_API_KEY)
# set up Aquarium client
# depending on your webhook setup, how you access
# your labelbox API key may be different
al_client = al.Client()
al_client.set_credentials(api_key=AQUARIUM_API_KEY)
@app.route("/labelbox_webhook", methods=["POST"])
def handle_labelbox_webhook():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
payload = request.get_json(force=True)
if not payload:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload, dict) or not payload.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload["event"]
if event_type == "issue-exported":
if not payload.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
_format_from_aquarium_to_labelbox(payload["project"], payload["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_from_aquarium_to_labelbox(project_name, issue):
#####################################################
# Step 1: Setup and configure our project
#####################################################
# Create a new project
# Pay attention to what option is selected for Media Type
lb_project = lb_client.create_project(
name=project_name+"_aquarium_export",
description=project_name+"_aquarium_export",
media_type=MediaType.Image
)
# make it easier for later get classmap from Aquarium
# can get the classmap a variety of ways or hard code it
# these are the classes you'll use to label
aq_project = al_client.get_project(project_name)
class_map_dict = aq_project['label_class_map']
categories = []
for class_entry in class_map_dict:
categories.append(class_entry['category'])
# Configure the Labelbox editor
# Rules and layout to your specific format of
# Data you are working with
#editor = next(lb_client.get_labeling_frontends(where = LabelingFrontend.name == 'editor'))
# define the object features tooling within Labelbox
# one for each class to label atm with
object_features=[]
for category in categories:
newTool = Tool(
tool=Tool.Type.BBOX,
name='label_' + category,
classifications=[
Classification(
class_type=Classification.Type.RADIO,
instructions="Please label",
options=[
Option(value=category, label=category),
]
)
]
)
object_features.append(newTool)
# example to add classification markings
options_list=[]
for category in categories:
options_list.append(Option(value=category, label=category))
classification_features = [
Classification(
class_type=Classification.Type.CHECKLIST,
instructions="label these things",
options=options_list
)
]
ontology_builder = OntologyBuilder(
tools=object_features,
classifications=classification_features
)
ontology = lb_client.create_ontology("Ontology_"+project_name, ontology_builder.asdict())
lb_project.setup_editor(ontology)
#####################################################
# Step 2: add data to our project
#####################################################
# create a dataset
# get dataset name from export results
exported_elements = issue['elements']
dataset_name = exported_elements[0]['dataset']
dataset = lb_client.create_dataset(name=dataset_name)
# now to add data
metadata_ontology = lb_client.get_data_row_metadata_ontology()
# we create assets to represent each row of data
assets = []
# then you add asset to a data_row
for exported_element in exported_elements:
image_url = exported_element['frame_data']['sensor_data'][0]['data_urls']['image_url']
external_id = exported_element['element_id']
asset_obj = {
'row_data': image_url,
'external_id': external_id
}
assets.append(asset_obj)
# now we define our metadata
# these are just example fields from labelbox's guide
asset_metadata_fields = [
{"schema_id": metadata_ontology.reserved_by_name["captureDateTime"].uid, "value": datetime.datetime.utcnow()},
{"schema_id": metadata_ontology.reserved_by_name["tag"].uid, "value": "tag_string"},
{"schema_id": metadata_ontology.reserved_by_name["split"]["train"].parent, "value": metadata_ontology.reserved_by_name["split"]["train"].uid}
]
# add the metadata to each asset
for item in assets:
item["metadata_fields"] = asset_metadata_fields
# create the data to be labeled in our project
task = dataset.create_data_rows(assets)
task.wait_till_done()
# you have to connect a dataset to a project in order to label
lb_project.datasets.connect(dataset)
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 9000
app.run(host="127.0.0.1", port=PORT, debug=True)
server.py
from weakref import finalize
from flask import Flask, request
import os
import requests
import aquariumlearning as al
# initialize flask server
app = Flask(__name__)
# pulling values from env variables
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
SCALE_API_KEY = os.getenv("SCALE_API_KEY")
AQUARIUM_API_KEY = os.getenv("AQUARIUM_API_KEY")
# customizing any headers needed by the labeling provider
labeling_api_headers = {
# Substitute with the proper API key header if any for your service
"Authorization": f"Bearer {SCALE_API_KEY}"
}
# set up Aquarium client
# depending on your webhook setup, how you access
# your labelbox API key may be different
al_client = al.Client()
al_client.set_credentials(api_key=AQUARIUM_API_KEY)
# the endpoint Aquarium will hit for you to test your webhook
@app.route("/scale_webhook", methods=["POST"])
def handle_scale_webhook():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
# confirm payload is as expected
payload = request.get_json(force=True)
if not payload:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload, dict) or not payload.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload["event"]
# confirm it's an issue-exported type webhook
if event_type == "issue-exported":
# issue == segment
if not payload.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
# if we see the data we expect, format it to be sent to labeling provider
_format_and_export_to_rest_api(payload["project"], payload["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_and_export_to_rest_api(project_name, issue):
#####################################################
# Step 1: Setup and configure our project
#####################################################
# define the urls to send requests to
project_url = "https://api.scale.com/v1/projects"
batch_url = "https://api.scale.com/v1/batches"
annotation_url = "https://api.scale.com/v1/task/imageannotation"
# configure project payload
# set issue name as part of label project
issue_name = issue['elements'][0]['issue_name']
project_payload = {
"type": "imageannotation",
"name": project_name+"_"+issue_name,
"rapid": False,
"studio": True,
"params": {"instruction": "Please label"}
}
project_headers = {
"Content-Type": "application/json"
}
requests.post(project_url, json=project_payload, headers=project_headers, auth=(SCALE_API_KEY, ''))
#####################################################
# Step 2: Create the batch
#####################################################
batch_payload = {
"project": project_name+"_"+issue_name,
"name": issue_name+"_batch",
"calibration_batch": False,
"self_label_batch": False,
"studio": True
}
batch_headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
requests.post(batch_url, json=batch_payload, headers=batch_headers, auth=(SCALE_API_KEY, ''))
#####################################################
# Step 3: Create the annotations
#####################################################
# get the categories
# make it easier for later get classmap from Aquarium
aq_project = al_client.get_project(project_name)
class_map_dict = aq_project['label_class_map']
categories = []
for class_entry in class_map_dict:
categories.append(class_entry['category'])
annotation_headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
# this is just an example, scale provides lots of customization
# tailor the creation of the payload to what makes sense for your team
for exported_element in issue['elements']:
image_url = exported_element['frame_data']['sensor_data'][0]['data_urls']['image_url']
annotation_payload = {
"instruction": "**Instructions:** Please label all the things",
"attachment": image_url,
"geometries": {
"box": {
"min_height": None,
"min_width": None,
"can_rotate": None,
"integer_pixels": None,
"objects_to_annotate": categories
}
},
"padding": None,
"paddingX": None,
"paddingY": None,
"priority": 10,
"project": project_name+"_"+issue_name,
"batch": issue_name+"_batch",
"studio": True
}
requests.post(annotation_url, json=annotation_payload, headers=annotation_headers, auth=(SCALE_API_KEY, ''))
#####################################################
# Step 4: Finalize the batch
#####################################################
finalize_url = 'https://api.scale.com/v1/batches/'+issue_name+'_batch'+'/finalize'
finalize_headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
response = requests.post(finalize_url, json={}, headers=finalize_headers, auth=(SCALE_API_KEY, ''))
# finalized
return response
# kicks off flask server and sets address for webhook to hit
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 9000
app.run(host="127.0.0.1", port=PORT, debug=True)
Triggering a Segment Export
To trigger an export, click the Export To Labeling button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.
If the payload was sent to the webhook successfully the modal will close. If not, an error will display in the modal window.