🚀 Quick Start with Postman
Download our Postman collection above, import it into Postman, and start testing the API in minutes! The collection includes pre-configured requests, sample responses, and auto-saves your instance ID.
Before you start: Contact support@collabrix.co to get your API key.
Real-time REST API for receiving Purchase Order events from Sponsors.
This API enables CMO partners to receive real-time Purchase Order (PO) events when Sponsors create new POs. The REST API provides a simple, HTTP-based interface to consume events from Kafka without needing to set up Kafka client libraries.
Simple 3-Step Process:
Kafka automatically handles:
Our REST API uses persistent consumers that never timeout:
How it works: Create a consumer once with your consumer group name. Poll anytime (daily, hourly, or as needed). Your progress is automatically saved - even if the server restarts, your consumer continues from where it left off!
All API endpoints use your provided base URL:
Format: <BASE_URL>/api/v1/...
Example: https://api.your-company.collabrix.co/api/v1/consumers/{groupId}
📧 Need API Access?
Contact support@collabrix.co to receive:
<BASE_URL>)
<API_KEY>)
<CONSUMER_GROUP>)
Topic Name: sh6.PO01.NEW
Event Type: Purchase Order Creation Events (Sponsor → CMO)
All API requests require authentication using an
API Key provided by Collabrix. The
API key must be included in the
x-api-key header of every request.
To: support@collabrix.co
Subject: API Key Request for
Purchase Order Events
Include in your email:
Organization Name: [Your CMO company name]
Contact Name: [Your name]
Contact Email: [Your email]
Topics Needed: sh6.PO01.NEW (Purchase Order events)
Expected Volume: [estimated messages per hour]
Collabrix will provide:
API Key: <API_KEY>
Consumer Group ID: <CONSUMER_GROUP>
Topics: <TOPIC_NAMES> (e.g., sh6.PO01.NEW)
Base URL: <BASE_URL>
Include the API key in the
x-api-key header of all requests:
# Example: cURL request
curl -X GET "<BASE_URL>/api/v1/..." \
-H "x-api-key: <API_KEY>"
Creates a new consumer instance in the specified consumer group.
| Parameter | Type | Required | Description |
|---|---|---|---|
consumerGroupId |
string | Required | Your consumer group ID (provided by Collabrix) |
| Header | Value | Required |
|---|---|---|
x-api-key |
Your API key | Required |
Content-Type |
application/json | Required |
✅ Save the instanceId! You'll need it for all subsequent API calls (subscribing and fetching messages).
POST <BASE_URL>/api/v1/consumers/your-company-prod
Headers:
x-api-key: <API_KEY>
Content-Type: application/json
Body: (empty or optional)
{
"status": true,
"message": "Consumer instance created successfully",
"data": {
"success": true,
"instanceId": "consumer-1738318200000",
"baseUri": "<BASE_URL>/consumers/..."
},
"timestamp": "2026-02-03T10:30:00.000Z"
}
instanceId
value. You'll use it in the next step.
Subscribes the consumer instance to one or more topics.
| Field | Type | Required | Description |
|---|---|---|---|
topics |
array[string] | Required | Array of topic names to subscribe to |
✅ You're now subscribed! Kafka will start delivering PO events created from this moment forward.
POST <BASE_URL>/api/v1/consumers/your-company-prod/instances/consumer-1738318200000/subscription
Headers:
x-api-key: <API_KEY>
Content-Type: application/json
Body:
{
"topics": ["sh6.PO01.NEW"]
}
{
"status": true,
"message": "Subscribed to topics successfully",
"data": {
"success": true,
"subscribedTopics": ["sh6.PO01.NEW"]
},
"timestamp": "2026-02-03T10:30:00.000Z"
}
Fetches new messages from the subscribed topics.
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
integer | 5000 | Max time to wait for messages in milliseconds (1000-30000) |
max_bytes |
integer | 1048576 | Maximum bytes to return (default 1MB) |
💡 Recommended: Poll at your
preferred schedule (e.g., daily or as needed) with
timeout=30000.
GET <BASE_URL>/api/v1/consumers/your-company-prod/instances/consumer-1738318200000/records?timeout=30000
Headers:
x-api-key: <API_KEY>
{
"status": true,
"message": "Records fetched successfully",
"data": {
"success": true,
"records": [],
"count": 0
}
}
{
"status": true,
"message": "Records fetched successfully",
"data": {
"success": true,
"records": [
{
"topic": "sh6.PO01.NEW",
"partition": 2,
"offset": "24",
"key": "poCreation",
"value": "PO Data IDOC here...",
"timestamp": "2026-02-06T06:20:45.281Z"
}
],
"count": 1
},
"timestamp": "2026-02-06T06:22:27.700Z"
}
Deletes the consumer instance and releases resources. Always delete the consumer when you're done to free up resources.
| Parameter | Type | Required | Description |
|---|---|---|---|
consumerGroupId |
string | Required | Your consumer group ID |
instanceId |
string | Required | Consumer instance ID (from create consumer) |
⚠️ Important: Good practice to call this in your cleanup/shutdown logic. Deleting a consumer frees up resources on the server.
DELETE /api/v1/consumers/your-company-prod/instances/consumer-1738318200000
Headers:
x-api-key: YOUR_API_KEY
{
"status": true,
"message": "Consumer deleted successfully",
"data": {
"success": true,
"message": "Consumer deleted successfully"
},
"timestamp": "2026-02-03T10:35:00.000Z"
}
Publish PO Confirmation (ORDRSP05 IDOC) with sales order number.
| Header | Value | Required |
|---|---|---|
x-api-key |
Your API key | Required |
Content-Type |
application/json | Required |
| Field | Type | Required |
|---|---|---|
topic |
string | Required |
key |
string | Optional |
value |
string | Required |
⭐ IDOC Fields:
• E1EDK01.ACTION = 29
• E1EDK02 QUALF=001 (PO Number)
• E1EDK02 QUALF=017 (Sales Order Number)
POST /api/v1/messages
Headers:
x-api-key: YOUR_API_KEY
Content-Type: application/json
Body:
{
"topic": "sh6.PO05.CONFIRMATION",
"key": "4500005037",
"value": "<?xml version=\"1.0\"?>
<IDOC BEGIN=\"1\">
<EDI_DC40 SEGMENT=\"1\">
<IDOCTYP>ORDRSP05</IDOCTYP>
</EDI_DC40>
<E1EDK01 SEGMENT=\"1\">
<ACTION>29</ACTION>
</E1EDK01>
<E1EDK02 SEGMENT=\"1\">
<QUALF>001</QUALF>
<BELNR>4500005037</BELNR>
</E1EDK02>
<E1EDK02 SEGMENT=\"1\">
<QUALF>017</QUALF>
<BELNR>SO-2026-001</BELNR>
</E1EDK02>
</IDOC>",
"headers": {
"content-type": "application/xml"
}
}
{
"status": true,
"message": "Message published successfully",
"data": {
"topic": "sh6.PO05.CONFIRMATION",
"partition": 0,
"offset": 12345
}
}
Publish multiple messages to the same Kafka topic in a single request. More efficient than multiple individual requests.
| Header | Value | Required |
|---|---|---|
x-api-key |
Your API key | Required |
Content-Type |
application/json | Required |
| Field | Type | Required | Description |
|---|---|---|---|
topic |
string | Required | Kafka topic name |
messages |
array | Required | Array of message objects (each with key, value, headers) |
POST /api/v1/messages/batch
Headers:
x-api-key: YOUR_API_KEY
Content-Type: application/json
Body:
{
"topic": "sh6.PO05.CONFIRMATION",
"messages": [
{
"key": "4500005037",
"value": "<?xml version=\"1.0\"?><IDOC>...</IDOC>",
"headers": { "content-type": "application/xml" }
},
{
"key": "4500005038",
"value": "<?xml version=\"1.0\"?><IDOC>...</IDOC>",
"headers": { "content-type": "application/xml" }
}
]
}
{
"status": true,
"message": "Batch published successfully",
"data": {
"topic": "sh6.PO05.CONFIRMATION",
"count": 2,
"results": [
{ "partition": 0, "offset": 12345 },
{ "partition": 0, "offset": 12346 }
]
}
}
const axios = require('axios');
const API_KEY = '<API_KEY>';
const BASE_URL = '<BASE_URL>'; // Your domain
const CONSUMER_GROUP = 'your-company-prod';
const INSTANCE_ID = 'consumer-1738318200000';
// Poll for messages at your preferred schedule
async function pollMessages() {
try {
const response = await axios.get(
`${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/records?timeout=30000`,
{ headers: { 'x-api-key': API_KEY } }
);
const records = response.data.data.records;
if (records.length > 0) {
for (const record of records) {
await processPO(record.value);
}
}
} catch (error) {
console.error('Error:', error.message);
}
}
// Process PO
async function processPO(record) {
const poNumber = record.key; // Use key (PO number) for deduplication
const offset = record.offset;
// Check for duplicate using offset or PO number
if (await database.checkIfProcessed(poNumber, offset)) return;
// Parse XML and save PO
const poData = parseXML(record.value); // Parse IDOC XML
await database.savePO(poData);
await database.markProcessed(cordaTxId);
}
// Start polling (example: run daily using cron or scheduler)
// For continuous polling: setInterval(pollMessages, 86400000); // 24 hours
pollMessages();
import requests
import time
API_KEY = '<API_KEY>'
BASE_URL = '<BASE_URL>'
CONSUMER_GROUP = '<CONSUMER_GROUP>'
INSTANCE_ID = 'consumer-1738318200000'
def poll_messages():
try:
response = requests.get(
f'{BASE_URL}/api/v1/consumers/{CONSUMER_GROUP}/instances/{INSTANCE_ID}/records?timeout=30000',
headers={'x-api-key': API_KEY}
)
records = response.json()['data']['records']
if records:
for record in records:
process_po(record['value'])
except Exception as e:
print(f'Error: {e}')
def process_po(record):
po_number = record['key'] # Use key (PO number) for deduplication
offset = record['offset']
if database.check_if_processed(po_number, offset):
return
# Parse XML and save PO
po_data = parse_xml(record['value']) # Parse IDOC XML
database.save_po(po_data)
database.mark_processed(po_number, offset)
# Start polling at your preferred schedule
# Example: Run daily using cron or scheduler
# For continuous polling: time.sleep(86400) # 24 hours
poll_messages()
⚠️ Deduplication Required: Always
check offset and key (PO number) against your
database to prevent processing the same PO twice.
Choose a stable consumer group name (e.g.,
acme-pharma-prod) and use it every time. This ensures:
Example:
const CONSUMER_GROUP = 'acme-pharma-prod'; // ←
Never change this!
Use
key
(PO number) and
offset
to check if you've already processed a PO:
async function processPO(record) {
const poNumber = record.key; // PO number
const offset = record.offset;
// Check if already processed
if (await database.alreadyProcessed(poNumber, offset)) {
console.log('Already processed, skipping');
return;
}
// Process PO
await database.savePO(po.payload.json.SADetails);
// Mark as processed
await database.markProcessed(txId);
}
Poll at your preferred schedule - daily, hourly, or as needed:
| Schedule | Cron Expression | Use Case |
|---|---|---|
| Daily (recommended) |
0 0 * * *
|
Batch processing overnight |
| Hourly |
0 * * * *
|
More frequent updates |
| Every 6 hours |
0 */6 * * *
|
Balanced approach |
| Manual | Run when needed | On-demand processing |
💡 No timeout worries - Your consumer stays active even if you don't poll for days!
// config.js
const CONSUMER_GROUP = 'acme-pharma-prod'; // Stable name
const INSTANCE_ID = 'my-consumer-1';
const BASE_URL = '<BASE_URL>'; // Your domain
const API_KEY = '<API_KEY>';
// setup.js - Run once to create consumer
async function setupConsumer() {
// Create consumer
await fetch(`${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}`, {
method: 'POST',
headers: {
'x-api-key': API_KEY,
'Content-Type': 'application/json'
},
body: JSON.stringify({
name: INSTANCE_ID,
format: 'binary',
'auto.offset.reset': 'latest'
})
});
// Subscribe to topic
await fetch(
`${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/subscription`,
{
method: 'POST',
headers: {
'x-api-key': API_KEY,
'Content-Type': 'application/json'
},
body: JSON.stringify({ topics: ['sh6.PO01.NEW'] })
}
);
console.log('✅ Consumer setup complete');
}
// poll.js - Run daily via cron
async function pollMessages() {
const response = await fetch(
`${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/records?timeout=30000`,
{ headers: { 'x-api-key': API_KEY } }
);
const data = await response.json();
if (data.data.records.length > 0) {
console.log(`Received ${data.data.count} new POs`);
for (const record of data.data.records) {
await processPO(record);
}
} else {
console.log('No new POs');
}
}
async function processPO(record) {
const poNumber = record.key; // PO number
const offset = record.offset;
// Check for duplicates using offset or PO number
if (await db.alreadyProcessed(poNumber, offset)) {
return;
}
// Save PO to your database
await db.savePO({
poNumber: po.payload.json.SADetails.scheduleAgreementNumber,
vendor: po.payload.json.SADetails.vendorName,
amount: po.payload.json.SLDetails[0].netPrice,
// ... other fields
});
// Mark as processed
await db.markProcessed(txId);
}
pollMessages();
# 1. Create Consumer (once at startup)
POST /api/v1/consumers/{consumerGroupId}
→ Returns: instanceId
# 2. Subscribe to Topic (once at startup)
POST /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/subscription
Body: { "topics": ["sh6.PO01.NEW"] }
# 3. Get POs (repeat at your preferred schedule, e.g., daily)
GET /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/records?timeout=30000
→ Returns: Array of new POs
x-api-key header in every
request
instanceId from Step 1
timeout=30000
Email: support@collabrix.co
Response Time: Within 4 business
hours
Hours: Monday-Friday, 9AM-6PM EST