Sink Subscription API
Concepts
The Sink Subscription API offers a REST API to create subscriptions to Kafka topics and entities/dossiers to deliver relevant Kafka event messages to a destination.
The subscription defines the list of topics of interest, the key filters (corresponding to entité
unique identifiers) to sink and the sink configuration.
The key filters of a subscription are the values to use in filtering the key
of the Kafka events,
such that only events that have a key
included in the key filters list are processed for sinking, all other events
are ignored/discarded.
Context
In CAE Kafka events the message
key
corresponds to theentite_id
, the unique identifier of anentité
. As such, the key filters of the subscription are, effectively, filtering Kafka events by entity/dossier. An entity/dossier itself corresponds to a dossier.
Key filters may need to be initialised: the Sink Service
and Subscription API does not take care of initialisation.
When a new key is added to the key filters list it is marked as "new"
to signify that it needs to be initialised before
Kafka events matching can be processed and pushed to the sink.
The Sink Service
only processes Kafka events that have in the key filters keys marked as "ready"
; all other values are ignored.
The cae-initialisation
service is responsible for initialising the data for the entities/dossiers from the keys in the subscription key filters with
status of "new"
.
The REST API for managing subscriptions is described in Open API v3 format in the subscription-api.json file, that can be consumed by Swagger.
Structure
The format of the documents in the sink subscriptions collection of Mongo DB is as follows:
domain
— String — domain of the subscription;key
— String — key of the subscription, corresponding to the entity/dossier of the domain;keyFilters
— Object[] — list of values of thekey
of Kafka events accepted for sinking, with the following JSON format:key
— String — the value to accept in thekey
of the Kafka event, which corresponds to a dossier entity/dossier ID;status
— String — status of the entity/dossier's initialisation:"new"
for newly added value, not yet initialised;"pending"
for entity/dossier being initialised;"ready"
for fully initialised and ready to sink entity/dossier.
applicationId
— String — unique identifier of the application associated with this subscription (not enforced for now);suspended
— Boolean — whether the subscription is suspended: only active (suspended
:false
) subscriptions sink messages;sink
— Object — configuration specific to the sink destination (refer to Sink Configuration for details);topics
— String[] — list of topics that the subscription is interested on;filters
— Object — filters to apply to the message in the form of property-value, such that the property with the same name as the property in this object must be present in the Kafka message and have the same value.
NOTE
The
key
of the subscription is populated by performing a lookup on theloop_kafka
collection with thedomain
of the subscription.Creating a subscription and providing a
key
is redundant and the value of the providedkey
is ignored.
Sink Configuration
The general format of the sink configuration is the following:
type
— String — name of the sink configuration, used by the sink service to determine which sinking implementation to use;credentials
— Object — the format of this object is specific to each sink type;retryPolicy
— Object — retry policy to apply if initial attempt of push to the sink destination fails...maxRetries
— Number — how many times to retry, after the initial (failed) attempt;backoffSeconds
— Number — how many seconds to wait before each subsequent retry attempt;backoffMultiplier
— Number — by how much to scale the wait between retries after each failed retry attempt;
options
— String — optional configuration to apply in the sinking logic, specific to the sink type.
The credentials
, and options
object are specific to the sink type.
Sink type Azure Blob Storage
These are the expected formats of the credentials
and options
objects for the sink configuration of type Azure Blob Storage
:
credentials
:accountName
- String — name of the Azure Blob Storage account;accountKey
— String — key to use in accessing the Azure Blob Storage account.
options
:containerName
— String — name of the container to create (if not existent) and store the JSON of the Kafka event;folderName
— String — name of the property in the Kafka event message to extract the value and use to name a virtual folder in Azure Blob Storage's container (this is read from the Kafka even payload, equivalent to thevalue
property of the Kafka event message).
NOTE
Unlike AWS S3 buckets, the containers can be automatically created by the
sink-service
in case they don't exist.
Sink type AWS S3
This is the expected format of the credentials
and options
objects for the sink configuration of type AWS S3
:
credentials
:accessKeyId
— String — the ID of the access key to use in authentication to AWS;secretAccessKey
— String — the secret access key generated for authentication;region
— String — identifier of the AWS region (e.g.,"eu-west-3"
for Paris).
options
:bucketName
— String — name of the AWS S3 bucket to store the Kafka event message (the bucket must exist);folderName
— String — name of the property in the Kafka event message to extract the value and use to name a virtual folder in AWS S3's bucket (this is read from the Kafka even payload, equivalent to thevalue
property of the Kafka event message).
NOTE
The AWS S3 buckets used in subscriptions must be created beforehand and exist when the sinker operates. S3 bucket names must be globally unique, not only within the account, as such it's less error prone to create them beforehand.
Subscription
POST/ subscriptions
Create a sink subscription, defining the sink configuration, format, retry and filtering options.
Documentation in French : https://assistanceloop.blob.core.windows.net/documentation/API%20Publiques/CegidEvents.pdf
POST https://api.cegid.com/loop-api-publiques/subscriptions
Attention, before adding it is necessary to do the
or
POST
https://api.cegid.com/loop-api-publiques/loopHub/getDossiersByApiKey
in order to have objectId of the accounting file.
Example response
[{
"objectId": "ad559758-7bf9-4c21-bd9a-6870236e17a2",
"codeIbs": "TEST001",
"nom": "",
"prenom": "",
"nomUsuel": "Dossier TEST",
"raisonSociale": "TEST",
"maitreDossier": "profildev@cegidpartenaires.onmicrosoft.com",
"maitreDossierId": "68e49540-d6bb-456f-9ba5-3924f548b12e",
"engagementManager": "",
"engagementManagerId": ""
}]
You have to put the value of objectid
in the "key"
{
"keyFilters": [
{
"key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
"status": "new"
}
]
}
Base URL
The base URL has the same structure for all public API requests. It consists of:
Domain | Mode | Role | |
---|---|---|---|
Example | https://loop.loopsoftware.fr | /YPN | /cabinet |
on cegid Developers : https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createsubscription
For the purpose of this document, Base URL will be referred to as <baseUrl>
.
Request Types
The base url is followed by one of Request Types:
query
for CRUD operations on modelstransaction
for transactional requestsserverless
for requests to server functionsservice
for requests to services
Parameter
The C R UD – Retrieve resources.
Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions
On body
{
"domain": "CEFIDPARTENAIRES",
"sink": {
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxxxxxxxxx"
},
"options": {
"containerName": "subscription",
"folderName": "entite_id"
}
},
"keyFilters": [
{
"key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
"status": "new",
"initializer": "sink-initialisation-qz7hr-pcpml"
}
],
"applicationId": "APIM"
}
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 201 | Created | |
Media type | application/json |
All responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
{
"domain": "CEGIDPARTENAIRES",
"key": "8c152a9f-cb98-4c2e-a27e-834aff497ba0",
"keyFilters": [{
"key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
"status": "new"
}],
"applicationId": "APIM",
"suspended": false,
"sink": {
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxxxxxxx"
},
"options": {
"containerName": "loop",
"folderName": "entite_id"
}
},
"topics": ["queuing.cpa.comptabilite"],
"id": "62df94e10e66db8d8d71fcf5"
}
GET/ subscriptions
Read a sink subscription, defining the topic(s) to listen to as wel as the sink configuration, format, retry and filtering options
GET https://developers.cegid.com/api-details#api=loop-api-publiques&operation=readsubscriptionquery
Parameter
Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions/{id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 620a2d961922340bf04e30aa | NO |
domain | DOMAIN | CEGIDPARTENAIRES | NO |
topic | TOPIC | queuing.cpa.comptabilite | NO |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
All responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
{
"domain": "TESTRDDCABINET",
"suspended": false,
"sink": {
"retryPolicy": {
"maxRetries": 2,
"backoffSeconds": 1,
"backoffMultiplier": 1
},
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "XXXXXXXX"
},
"options": {
"containerName": "cegid",
"folderName": "entite_id"
}
},
"topics": ["queuing.cpa.comptabilite"],
"keyFilters": [{
"entite_id": "b42ba973-07ce-4c6a-873a-5509ab7a20e2",
"status": "new"
}],
"id": "620a2d961922340bf04e30aa"
}
DEL/ subscriptions
Deletes the subscription corresponding to the unique identifier in the URL path Before deleting the subscription, it is necessary to do the GET subscription in order to obtain the id of the subscription
DEL https://developers.cegid.com/api-details#api=loop-api-publiques&operation=deletesubscription
Parameter
Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 633d5f41ec13b91a20039d99 | YES |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
{
"domain": "CEGIDPARTENAIRES",
"key": "AAAAAAAAAa",
"keyFilters": [{
"key": "6a450436-f813-4526-b8ff-2385a13f5fd7",
"status": "ready",
"initializer": "sink-initialisation-6hhmh-69lnd"
}],
"applicationId": "SESHA",
"suspended": false,
"sink": {
"type": "AWS S3",
"credentials": {
"accessKeyId": "YYYYYYYYYYY",
"secretAccessKey": "XXXXXXXXXXXX",
"region": "eu-west-3"
},
"options": {
"bucketName": "cegid-loop",
"folderName": "entite_id"
}
},
"topics": ["queuing.cpa.comptabilite"],
"id": "633d5f41ec13b91a20039d99"
}
POST/ /key-filters
Creates the new key filters in existing the subscription, by adding the id of non-existent dossiers
POST https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createkeyfilter
Parameter
Endpoint : https://api.cegid.com/loop-api-publiques/key-filters?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 620a2d961922340bf04e30aa | NO |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
[
{"key": "620a2d961922340bf04e30aa", "status": "new/ready"}
]
or
[
{"key": "620a2d961922340bf04e30aa"}
]
Once the subscription has been created for a customer domain, you have to go through the API PUT /key-filters?id={id}
to modify the status of accounting file (dossier).
PUT /key-filters
Updates the keyfilters in the subscription, by adding non-existent keys provided and replacing the fields if keyfilters with existing keys.
By example : if you want to reset the sinker on a accounting file and set the status back to new
PUT https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createfilters
Parameter
EndPoint: https://api.cegid.com/loop-api-publiques/key-filters?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 620a2d961922340bf04e30aa | NO |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
[
{
"key": "620a2d961922340bf04e30aa",
"status": "new"
}
]
POST /suspended
Create/replace suspended state for the subscription with the specified id. It's necessary to know the id of the subscription by the api GET/ subscriptions.
https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createsuspended
Parameters
Endpoint : https://api.cegid.com/loop-api-publiques/suspended?id={id}&value={value}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 634419e592b0137ae69cff81 | YES |
value | SUBSCRIPTION ID | false/true | YES |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Default is true
.
Example, if value is true
.
Response
{
....
"suspended": true,
.....
"id": "634419e592b0137ae69cff81"
}
Example, if value is false
Response
{
....
"suspended": false,
.....
"id": "634419e592b0137ae69cff81"
}
POST /topic
Create/replace topics for the subscription with the specified id
https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createtopics
Parameters
Endpoint : https://api.cegid.com/loop-api-publiques/topics?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 633d5f41ec13b91a20039d99 | YES |
There is currently only one topic queuing.cpa.comptabilite
that is working into the this version.
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Default is true
.
Example
Response
{
"topics": ["queuing.cpa.comptabilite"]
}
POST /sink
Replace sink configuration in the subscription.
https://developers.cegid.com/api-details#api=loop-api-publiques&operation=replacesink
Parameters
Endpoint : https://api.cegid.com/loop-api-publiques/sink?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 633d5f41ec13b91a20039d99 | YES |
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
For example we want to replace the sinker "AWS S3" by "Azure Blob Storage". The value before :
{
"type": "AWS S3",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"bucketName": "LOOP",
"folderName": "entite_id"
}
It is necessary to put on body :
{
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"containerName": "CEGID",
"folderName": "entite_id"
}
Example
Response
{
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"containerName": "CEGID",
"folderName": "entite_id"
}
PUT /sink
Partially update the sink configuration in the subscription.
https://developers.cegid.com/api-details#api=loop-api-publiques&operation=updatesink
Parameters
Endpoint : https://api.cegid.com/loop-api-publiques/sink?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 633d5f41ec13b91a20039d99 | YES |
There is currently only one topic queuing.cpa.comptabilite
that is working into the this version.
Class Response
Example
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
For example we want to change the containerName. The value before :
{
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"containerName": "LOOP",
"folderName": "entite_id"
}
It is necessary to put on body :
{
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"containerName": "CEGID",
"folderName": "entite_id"
}
Example
Response
{
"type": "Azure Blob Storage",
"credentials": {
"accountName": "cegidpartenairesloop",
"accountKey": "xxxxx"
},
"options": {
"containerName": "CEGID",
"folderName": "entite_id"
}
Schema of json files
DEL /key-filters
Deletes the keyfilters in the subscription, by deleting existent keys provided
By example : if you want to delete the keyfilters on a accounting file and set the status back to new
DEL hhttps://developers.cegid.com/api-details#api=loop-api-publiques&operation=deletekeyfilters
Parameter
EndPoint: https://api.cegid.com/loop-api-publiques/key-filters?id={id}
Name | Description | example | mandatory |
---|---|---|---|
ID | SUBSCRIPTION ID | 620a2d961922340bf04e30aa | YES |
You must first read a sink subscription, request in order to have the keyfilters information. example on this subscription we want to delete c094a7ac-0b60-4783-affb-17d5529a3299
{
"domain": "CEGIDPARTENAIRES",
"keyFilters": [{
"key": "b6335a14-3391-4fba-b397-32835fe15e2b",
"status": "ready",
"initializer": "sink-initialisation-rk5ns-vvnt8",
"message": "success",
"report": {
"processedDate": "2024-08-05T01:12:22.093Z",
"status": "processed",
"locked": false
}
}, {
"key": "c094a7ac-0b60-4783-affb-17d5529a3299",
"status": "ready",
"initializer": "sink-initialisation-dw7zt-zgn8t",
"message": "success",
"report": {
"processedDate": "2024-08-05T01:12:23.517Z",
"status": "processed",
"locked": false
}
}],
"applicationId": "APIM001-Ready",
"suspended": false,
"sink": {
"type": "Azure Blob Storage",
"credentials": {
"accountName": "dat************",
"accountKey": "4Mw*************************************************************************************"
},
"options": {
"containerName": "cegid",
"folderName": "entite_id"
}
},
"topics": ["queuing.cpa.comptabilite"],
"id": "649e9bc3168c057b1f08e52a"
}
On body
[{
"key": "c094a7ac-0b60-4783-affb-17d5529a3299",
"status": "ready",
"initializer": "sink-initialisation-dw7zt-zgn8t"
}]
Class Response
Example:
Code | Description | Links |
---|---|---|
HTTP/1.1 200 | OK | |
Media type | application/json |
Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format.
The request headers must contain Content-Type: application-json
.
Example
Response
{
"domain": "CEGIDPARTENAIRES",
"keyFilters": [{
"key": "b6335a14-3391-4fba-b397-32835fe15e2b",
"status": "ready",
"initializer": "sink-initialisation-rk5ns-vvnt8",
"message": "success",
"report": {
"processedDate": "2024-08-05T01:12:22.093Z",
"status": "processed",
"locked": false
}
}],
"applicationId": "APIM001-Ready",
"suspended": false,
"sink": {
"type": "Azure Blob Storage",
"credentials": {
"accountName": "dat************",
"accountKey": "4Mw*************************************************************************************"
},
"options": {
"containerName": "cegid",
"folderName": "entite_id"
}
},
"topics": ["queuing.cpa.comptabilite"],
"id": "649e9bc3168c057b1f08e52a"
}