Table of Contents

Sink Subscription API

Concepts

The Sink Subscription API offers a REST API to create subscriptions to Kafka topics and entities/dossiers to deliver relevant Kafka event messages to a destination.

The subscription defines the list of topics of interest, the key filters (corresponding to entité unique identifiers) to sink and the sink configuration.

The key filters of a subscription are the values to use in filtering the key of the Kafka events, such that only events that have a key included in the key filters list are processed for sinking, all other events are ignored/discarded.

Context

In CAE Kafka events the message key corresponds to the entite_id, the unique identifier of an entité. As such, the key filters of the subscription are, effectively, filtering Kafka events by entity/dossier. An entity/dossier itself corresponds to a dossier.

Key filters may need to be initialised: the Sink Service and Subscription API does not take care of initialisation.

When a new key is added to the key filters list it is marked as "new" to signify that it needs to be initialised before Kafka events matching can be processed and pushed to the sink. The Sink Service only processes Kafka events that have in the key filters keys marked as "ready"; all other values are ignored.

The cae-initialisation service is responsible for initialising the data for the entities/dossiers from the keys in the subscription key filters with status of "new".

Structure

The format of the documents in the sink subscriptions collection of Mongo DB is as follows:

  • domainString — domain of the subscription;
  • keyString — key of the subscription, corresponding to the entity/dossier of the domain;
  • keyFiltersObject[] — list of values of the key of Kafka events accepted for sinking, with the following JSON format:
    • keyString — the value to accept in the key of the Kafka event, which corresponds to a dossier entity/dossier ID;
    • statusString — status of the entity/dossier's initialisation:
      • "new" for newly added value, not yet initialised;
      • "pending" for entity/dossier being initialised;
      • "ready" for fully initialised and ready to sink entity/dossier.
  • applicationIdString — unique identifier of the application associated with this subscription (not enforced for now);
  • suspendedBoolean — whether the subscription is suspended: only active (suspended: false) subscriptions sink messages;
  • sinkObject — configuration specific to the sink destination (refer to Sink Configuration for details);
  • topicsString[] — list of topics that the subscription is interested on;
  • filtersObject — filters to apply to the message in the form of property-value, such that the property with the same name as the property in this object must be present in the Kafka message and have the same value.
  • initWithHistory - Boolean - allows to specify whether the subscription should initialize with historical data for journal, compte and ecritures.

NOTE

The key of the subscription is populated by performing a lookup on the loop_kafka collection with the domain of the subscription.

Creating a subscription and providing a key is redundant and the value of the provided key is ignored.

Sink Configuration

The general format of the sink configuration is the following:

  • typeString — name of the sink configuration, used by the sink service to determine which sinking implementation to use;
  • credentialsObject — the format of this object is specific to each sink type;
  • retryPolicyObject — retry policy to apply if initial attempt of push to the sink destination fails...
    • maxRetriesNumber — how many times to retry, after the initial (failed) attempt;
    • backoffSecondsNumber — how many seconds to wait before each subsequent retry attempt;
    • backoffMultiplierNumber — by how much to scale the wait between retries after each failed retry attempt;
  • optionsString — optional configuration to apply in the sinking logic, specific to the sink type.

The credentials, and options object are specific to the sink type.

Sink type Azure Blob Storage

These are the expected formats of the credentials and options objects for the sink configuration of type Azure Blob Storage:

  • credentials:
    • accountName - String — name of the Azure Blob Storage account;
    • accountKeyString — key to use in accessing the Azure Blob Storage account.
  • options:
    • containerNameString — name of the container to create (if not existent) and store the JSON of the Kafka event;
    • folderNameString — name of the property in the Kafka event message to extract the value and use to name a virtual folder in Azure Blob Storage's container (this is read from the Kafka even payload, equivalent to the value property of the Kafka event message).

NOTE

Unlike AWS S3 buckets, the containers can be automatically created by the sink-service in case they don't exist.

Sink type AWS S3

This is the expected format of the credentials and options objects for the sink configuration of type AWS S3:

  • credentials:
    • accessKeyIdString — the ID of the access key to use in authentication to AWS;
    • secretAccessKeyString — the secret access key generated for authentication;
    • regionString — identifier of the AWS region (e.g., "eu-west-3" for Paris).
  • options:
    • bucketNameString — name of the AWS S3 bucket to store the Kafka event message (the bucket must exist);
    • folderNameString — name of the property in the Kafka event message to extract the value and use to name a virtual folder in AWS S3's bucket (this is read from the Kafka even payload, equivalent to the value property of the Kafka event message).

NOTE

The AWS S3 buckets used in subscriptions must be created beforehand and exist when the sinker operates. S3 bucket names must be globally unique, not only within the account, as such it's less error prone to create them beforehand.

Subscription

POST/ subscriptions

Create a sink subscription, defining the sink configuration, format, retry and filtering options.

Documentation in French : https://assistanceloop.blob.core.windows.net/documentation/API%20Publiques/CegidEvents.pdf

POST https://api.cegid.com/loop-api-publiques/subscriptions

Attention, before adding it is necessary to do the

GET https://api.cegid.com/loop-api-publiques/dossierClient?codeDossier={codeDossier}[&filter][&sort][&skip][&take]

or

POST https://api.cegid.com/loop-api-publiques/loopHub/getDossiersByApiKey

in order to have objectId of the accounting file.

Example response


 [{
    "objectId": "ad559758-7bf9-4c21-bd9a-6870236e17a2",
    "codeIbs": "TEST001",
    "nom": "",
    "prenom": "",
    "nomUsuel": "Dossier TEST",
    "raisonSociale": "TEST",
    "maitreDossier": "profildev@cegidpartenaires.onmicrosoft.com",
    "maitreDossierId": "68e49540-d6bb-456f-9ba5-3924f548b12e",
    "engagementManager": "",
    "engagementManagerId": ""
}]

You have to put the value of objectid in the "key"

{
    "keyFilters": [
        {
            "key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
            "status": "new"
        }
    ]
}

Parameter

The C R UD – Retrieve resources.

Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions

On body

{
    "domain": "CEFIDPARTENAIRES",
    "sink": {
        "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxxxxxxxxx"
        },
        "options": {
            "containerName": "subscription",
            "folderName": "entite_id"
        }
    },
    "keyFilters": [
        {
            "key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
            "status": "new",
            "initializer": "sink-initialisation-qz7hr-pcpml"
        }
    ],
    "applicationId": "APIM"
}

Class Response

Example:

Code Description Links
HTTP/1.1 201 Created
Media type application/json

All responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

{
    "domain": "CEGIDPARTENAIRES",
    "key": "8c152a9f-cb98-4c2e-a27e-834aff497ba0",
    "keyFilters": [{
        "key": "bd139d35-1faf-40b9-91d7-b3d1ca6e2c00",
        "status": "new"
    }],
    "applicationId": "APIM",
    "suspended": false,
    "sink": {
        "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxxxxxxx"
        },
        "options": {
            "containerName": "loop",
            "folderName": "entite_id"
        }
    },
    "topics": ["queuing.cpa.comptabilite"],
    "id": "62df94e10e66db8d8d71fcf5",
    "initWithHistory": false
}

GET/ subscriptions

Read a sink subscription, defining the topic(s) to listen to as wel as the sink configuration, format, retry and filtering options

GET https://developers.cegid.com/api-details#api=loop-api-publiques&operation=readsubscriptionquery

Parameter

Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions/{id}

Name Description example mandatory
ID SUBSCRIPTION ID 620a2d961922340bf04e30aa NO
domain DOMAIN CEGIDPARTENAIRES NO
topic TOPIC queuing.cpa.comptabilite NO

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

All responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

{
    "domain": "TESTRDDCABINET",
    "suspended": false,
    "sink": {
        "retryPolicy": {
            "maxRetries": 2,
            "backoffSeconds": 1,
            "backoffMultiplier": 1
        },
        "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "XXXXXXXX"
        },
        "options": {
            "containerName": "cegid",
            "folderName": "entite_id"
        }
    },
    "topics": ["queuing.cpa.comptabilite"],
    "keyFilters": [{
        "entite_id": "b42ba973-07ce-4c6a-873a-5509ab7a20e2",
        "status": "new"
    }],
    "id": "620a2d961922340bf04e30aa",
    "initWithHistory": false
}

DEL/ subscriptions

Deletes the subscription corresponding to the unique identifier in the URL path Before deleting the subscription, it is necessary to do the GET subscription in order to obtain the id of the subscription

DEL https://developers.cegid.com/api-details#api=loop-api-publiques&operation=deletesubscription

Parameter

Endpoint : https://api.cegid.com/loop-api-publiques/subscriptions?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 633d5f41ec13b91a20039d99 YES

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

{
    "domain": "CEGIDPARTENAIRES",
    "key": "AAAAAAAAAa",
    "keyFilters": [{
        "key": "6a450436-f813-4526-b8ff-2385a13f5fd7",
        "status": "ready",
        "initializer": "sink-initialisation-6hhmh-69lnd"
    }],
    "applicationId": "SESHA",
    "suspended": false,
    "sink": {
        "type": "AWS S3",
        "credentials": {
            "accessKeyId": "YYYYYYYYYYY",
            "secretAccessKey": "XXXXXXXXXXXX",
            "region": "eu-west-3"
        },
        "options": {
            "bucketName": "cegid-loop",
            "folderName": "entite_id"
        }
    },
    "topics": ["queuing.cpa.comptabilite"],
    "id": "633d5f41ec13b91a20039d99",
    "initWithHistory": false
}

POST/ /key-filters

Creates the new key filters in existing the subscription, by adding the id of non-existent dossiers

POST https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createkeyfilter

Parameter

Endpoint : https://api.cegid.com/loop-api-publiques/key-filters?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 620a2d961922340bf04e30aa NO

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

[
    {"key": "620a2d961922340bf04e30aa", "status": "new/ready"}
]

or

[
    {"key": "620a2d961922340bf04e30aa"}
]

Once the subscription has been created for a customer domain, you have to go through the API PUT /key-filters?id={id} to modify the status of accounting file (dossier).

PUT /key-filters

Updates the keyfilters in the subscription, by adding non-existent keys provided and replacing the fields if keyfilters with existing keys.

By example : if you want to reset the sinker on a accounting file and set the status back to new

PUT https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createfilters

Parameter

EndPoint: https://api.cegid.com/loop-api-publiques/key-filters?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 620a2d961922340bf04e30aa NO

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

[
    {
        "key": "620a2d961922340bf04e30aa",
        "status": "new"
    }
]

POST /suspended

Create/replace suspended state for the subscription with the specified id. It's necessary to know the id of the subscription by the api GET/ subscriptions.

https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createsuspended

Parameters

Endpoint : https://api.cegid.com/loop-api-publiques/suspended?id={id}&value={value}

Name Description example mandatory
ID SUBSCRIPTION ID 634419e592b0137ae69cff81 YES
value SUBSCRIPTION ID false/true YES

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Default is true.

Example, if value is true.

Response

{
    ....
    "suspended": true,
    .....
    "id": "634419e592b0137ae69cff81"
}

Example, if value is false

Response

{
    ....
    "suspended": false,
    .....
    "id": "634419e592b0137ae69cff81"
}

POST /topic

Create/replace topics for the subscription with the specified id

https://developers.cegid.com/api-details#api=loop-api-publiques&operation=createtopics

Parameters

Endpoint : https://api.cegid.com/loop-api-publiques/topics?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 633d5f41ec13b91a20039d99 YES

There is currently only one topic queuing.cpa.comptabilite that is working into the this version.

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Default is true.

Example

Response

{
   "topics": ["queuing.cpa.comptabilite"]
}

POST /sink

Replace sink configuration in the subscription.

https://developers.cegid.com/api-details#api=loop-api-publiques&operation=replacesink

Parameters

Endpoint : https://api.cegid.com/loop-api-publiques/sink?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 633d5f41ec13b91a20039d99 YES

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

For example we want to replace the sinker "AWS S3" by "Azure Blob Storage". The value before :

{
          "type": "AWS S3",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "bucketName": "LOOP",
            "folderName": "entite_id"
  
}

It is necessary to put on body :

{
          "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "containerName": "CEGID",
            "folderName": "entite_id"
  
}

Example

Response

{
          "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "containerName": "CEGID",
            "folderName": "entite_id"
  
}

PUT /sink

Partially update the sink configuration in the subscription.

https://developers.cegid.com/api-details#api=loop-api-publiques&operation=updatesink

Parameters

Endpoint : https://api.cegid.com/loop-api-publiques/sink?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 633d5f41ec13b91a20039d99 YES

There is currently only one topic queuing.cpa.comptabilite that is working into the this version.

Class Response

Example

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

For example we want to change the containerName. The value before :

{
          "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "containerName": "LOOP",
            "folderName": "entite_id"
  
}

It is necessary to put on body :

{
          "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "containerName": "CEGID",
            "folderName": "entite_id"
  
}

Example

Response

{
          "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "cegidpartenairesloop",
            "accountKey": "xxxxx"
        },
        "options": {
            "containerName": "CEGID",
            "folderName": "entite_id"
  
}

Schema of json files

DEL /key-filters

Deletes the keyfilters in the subscription, by deleting existent keys provided

By example : if you want to delete the keyfilters on a accounting file and set the status back to new

DEL hhttps://developers.cegid.com/api-details#api=loop-api-publiques&operation=deletekeyfilters

Parameter

EndPoint: https://api.cegid.com/loop-api-publiques/key-filters?id={id}

Name Description example mandatory
ID SUBSCRIPTION ID 620a2d961922340bf04e30aa YES

You must first read a sink subscription, request in order to have the keyfilters information. example on this subscription we want to delete c094a7ac-0b60-4783-affb-17d5529a3299

{
    "domain": "CEGIDPARTENAIRES",
    "keyFilters": [{
        "key": "b6335a14-3391-4fba-b397-32835fe15e2b",
        "status": "ready",
        "initializer": "sink-initialisation-rk5ns-vvnt8",
        "message": "success",
        "report": {
            "processedDate": "2024-08-05T01:12:22.093Z",
            "status": "processed",
            "locked": false
        }
    }, {
        "key": "c094a7ac-0b60-4783-affb-17d5529a3299",
        "status": "ready",
        "initializer": "sink-initialisation-dw7zt-zgn8t",
        "message": "success",
        "report": {
            "processedDate": "2024-08-05T01:12:23.517Z",
            "status": "processed",
            "locked": false
        }
    }],
    "applicationId": "APIM001-Ready",
    "suspended": false,
    "sink": {
        "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "dat************",
            "accountKey": "4Mw*************************************************************************************"
        },
        "options": {
            "containerName": "cegid",
            "folderName": "entite_id"
        }
    },
    "topics": ["queuing.cpa.comptabilite"],
    "id": "649e9bc3168c057b1f08e52a",
    "initWithHistory": false
}

On body

[{
    "key": "c094a7ac-0b60-4783-affb-17d5529a3299",
    "status": "ready",
    "initializer": "sink-initialisation-dw7zt-zgn8t"
}]

Class Response

Example:

Code Description Links
HTTP/1.1 200 OK
Media type application/json

Therefore, all responses from the API are in the JSON format and all payloads to the API must be in the JSON format. The request headers must contain Content-Type: application-json.

Example

Response

{
    "domain": "CEGIDPARTENAIRES",
    "keyFilters": [{
        "key": "b6335a14-3391-4fba-b397-32835fe15e2b",
        "status": "ready",
        "initializer": "sink-initialisation-rk5ns-vvnt8",
        "message": "success",
        "report": {
            "processedDate": "2024-08-05T01:12:22.093Z",
            "status": "processed",
            "locked": false
        }
    }],
    "applicationId": "APIM001-Ready",
    "suspended": false,
    "sink": {
        "type": "Azure Blob Storage",
        "credentials": {
            "accountName": "dat************",
            "accountKey": "4Mw*************************************************************************************"
        },
        "options": {
            "containerName": "cegid",
            "folderName": "entite_id"
        }
    },
    "topics": ["queuing.cpa.comptabilite"],
    "id": "649e9bc3168c057b1f08e52a",
    "initWithHistory": false
}

Initialize Synchronization with Historical Data

When the initwithhistory feature is enabled at the subscription level, additional files will be sent along with the initialization files. These extra files contain historical data for journals, accounts, and entries specific to each situation period.

This option will not be applied to company files that have already been initialized. Therefore, company files should be re-initialized if we wish to include their historical data files.

Once activated, this option will be applied by default to all new company files added to the subscription.

Note that when this feature is enabled, multiple files with the same object ID may be present, each with different validFrom and validTo values. In the example below, the first file represents a snapshot of the current state of a journal, while the second one captures a snapshot from the journal history table. The same applies to "écriture" and accounts.

{
  "application_id": "1999353e-52d1-4598-b76e-6fcbd15620fe",
  "timestamp": 1734684480000,
  "id": "1a8793c7-02a1-4e83-aa49-074d0021bf4a",
  "code": "OD3",
  "libelle": "Opérations diverses",
  "compteContrepartie_id": null,
  "typeContrepartie": 1,
  "type": 2,
  "ferme": false,
  "validFrom": 1734684480000,
  "validTo": 0
}
{
  "application_id": "1999353e-52d1-4598-b76e-6fcbd15620fe",
  "timestamp": 1734511169000,
  "id": "1a8793c7-02a1-4e83-aa49-074d0021bf4a",
  "code": "OD1",
  "libelle": "Opérations diverses",
  "compteContrepartie_id": null,
  "typeContrepartie": 1,
  "type": 2,
  "ferme": false,
  "validFrom": 1734511169000,
  "validTo": 1734591643000
}

Each situation period has a property called reference date, which is automatically set to the date when the period was created. This reference date plays a crucial role in restoring the status of the situation period as of that specific point in time.

The following data will be restored for each situation period:

•Accounting entries

•Journals

•Accounts

For each situation period, the latest version of the relevant accounting entries, journals, and accounts up to the reference date will be retrieved, ensuring an accurate representation of the situation period’s historical state.

Example: Subscription for Company File "dossier_test"

Consider the following details for the company file "dossier_test":

-The dossier has a situation period with a reference date of 01/10/2024.

-The dossier contains a total of 10 "écritures."

During initialization, we should send:

•A snapshot of the 10 "écritures" as they are.

•The most recent value for each "écriture" up to 01/10/2024.

If an "écriture" has been updated multiple times, only its latest value as of 01/10/2024 should be sent during initialization from history. If the last updated value is the same as the one in the snapshot, 10 files should be sent; otherwise, 10 plus the file from history.

This ensures that the system accurately reflects the state of the situation period based on the reference date, allowing for consistent and reliable data restoration.