Collabrix REST API 1.0.0

🚀 Quick Start with Postman

Download our Postman collection above, import it into Postman, and start testing the API in minutes! The collection includes pre-configured requests, sample responses, and auto-saves your instance ID.

Before you start: Contact support@collabrix.co to get your API key.

Collabrix REST API Documentation

Real-time REST API for receiving Purchase Order events from Sponsors.

Overview

This API enables CMO partners to receive real-time Purchase Order (PO) events when Sponsors create new POs. The REST API provides a simple, HTTP-based interface to consume events from Kafka without needing to set up Kafka client libraries.

What You'll Receive

  • Real-time notifications when Sponsors create new Purchase Orders
  • Complete PO details including header information, line items, pricing, and delivery dates
  • Automatic tracking - you'll never receive the same PO twice
  • Persistent message storage - all messages are kept permanently

How It Works

Simple 3-Step Process:

  1. Create a consumer instance
  2. Subscribe to the Purchase Order topic
  3. Poll for new messages (e.g., daily at a scheduled time or as needed)

Kafka automatically handles:

  • Tracking which POs you've already received (no duplicates!)
  • Resuming from where you left off if your application restarts
  • Delivering only NEW POs (created after you subscribed)
  • Keeping all messages permanently available

✅ Persistent Consumers - No Timeouts!

Our REST API uses persistent consumers that never timeout:

  • No expiration - Consumers run 24/7, never timeout
  • Auto-restore - Survives server restarts automatically
  • High performance - Messages buffered and ready when you poll
  • Simple HTTP API - No Kafka knowledge needed
  • Guaranteed delivery - Kafka preserves your progress, no duplicate POs

How it works: Create a consumer once with your consumer group name. Poll anytime (daily, hourly, or as needed). Your progress is automatically saved - even if the server restarts, your consumer continues from where it left off!

Base URL

All API endpoints use your provided base URL:

Format: <BASE_URL>/api/v1/...

Example: https://api.your-company.collabrix.co/api/v1/consumers/{groupId}

📧 Need API Access?

Contact support@collabrix.co to receive:

  • Base URL - Your API endpoint (<BASE_URL>)
  • API Key - Authentication header value (<API_KEY>)
  • Consumer Group ID - Your group identifier (<CONSUMER_GROUP>)
  • Topic Names - Allowed Kafka topics you can access

Event Topic

Topic Name: sh6.PO01.NEW

Event Type: Purchase Order Creation Events (Sponsor → CMO)

Authentication

How Authentication Works

All API requests require authentication using an API Key provided by Collabrix. The API key must be included in the x-api-key header of every request.

Requesting an API Key

Step 1: Email Collabrix Support

To: support@collabrix.co
Subject: API Key Request for Purchase Order Events

Include in your email:

Organization Name: [Your CMO company name]
Contact Name: [Your name]
Contact Email: [Your email]
Topics Needed: sh6.PO01.NEW (Purchase Order events)
Expected Volume: [estimated messages per hour]

Step 2: Receive Your Credentials

Collabrix will provide:

API Key: <API_KEY>
Consumer Group ID: <CONSUMER_GROUP>
Topics: <TOPIC_NAMES> (e.g., sh6.PO01.NEW)
Base URL: <BASE_URL>

Using Your API Key

Include the API key in the x-api-key header of all requests:

# Example: cURL request
curl -X GET "<BASE_URL>/api/v1/..." \
  -H "x-api-key: <API_KEY>"

Security Best Practices

  • Store API keys in environment variables or secrets manager
  • Never commit API keys to version control (Git)
  • Rotate keys quarterly
  • Use HTTPS in production

Subscription Process

POST /api/v1/consumers/{consumerGroupId}

Creates a new consumer instance in the specified consumer group.

Path Parameters

Parameter Type Required Description
consumerGroupId string Required Your consumer group ID (provided by Collabrix)

Request Headers

Header Value Required
x-api-key Your API key Required
Content-Type application/json Required

✅ Save the instanceId! You'll need it for all subsequent API calls (subscribing and fetching messages).

Example Request
POST <BASE_URL>/api/v1/consumers/your-company-prod

Headers:
  x-api-key: <API_KEY>
  Content-Type: application/json

Body: (empty or optional)
Example Response (200 OK)
{
  "status": true,
  "message": "Consumer instance created successfully",
  "data": {
    "success": true,
    "instanceId": "consumer-1738318200000",
    "baseUri": "<BASE_URL>/consumers/..."
  },
  "timestamp": "2026-02-03T10:30:00.000Z"
}
💡 What to do: Save the instanceId value. You'll use it in the next step.

POST /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/subscription

Subscribes the consumer instance to one or more topics.

Request Body

Field Type Required Description
topics array[string] Required Array of topic names to subscribe to

✅ You're now subscribed! Kafka will start delivering PO events created from this moment forward.

Example Request
POST <BASE_URL>/api/v1/consumers/your-company-prod/instances/consumer-1738318200000/subscription

Headers:
  x-api-key: <API_KEY>
  Content-Type: application/json

Body:
{
  "topics": ["sh6.PO01.NEW"]
}
Example Response (200 OK)
{
  "status": true,
  "message": "Subscribed to topics successfully",
  "data": {
    "success": true,
    "subscribedTopics": ["sh6.PO01.NEW"]
  },
  "timestamp": "2026-02-03T10:30:00.000Z"
}
✅ Success! You'll now receive all NEW PO events (created after this subscription).

Getting Purchase Order Events

GET /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/records

Fetches new messages from the subscribed topics.

Query Parameters

Parameter Type Default Description
timeout integer 5000 Max time to wait for messages in milliseconds (1000-30000)
max_bytes integer 1048576 Maximum bytes to return (default 1MB)

💡 Recommended: Poll at your preferred schedule (e.g., daily or as needed) with timeout=30000.

Example Request
GET <BASE_URL>/api/v1/consumers/your-company-prod/instances/consumer-1738318200000/records?timeout=30000

Headers:
  x-api-key: <API_KEY>
Example Response - No New Messages
{
  "status": true,
  "message": "Records fetched successfully",
  "data": {
    "success": true,
    "records": [],
    "count": 0
  }
}
Example Response - New PO Received
{
    "status": true,
    "message": "Records fetched successfully",
    "data": {
        "success": true,
        "records": [
            {
                "topic": "sh6.PO01.NEW",
                "partition": 2,
                "offset": "24",
                "key": "poCreation",
                "value": "PO Data IDOC here...",
                "timestamp": "2026-02-06T06:20:45.281Z"
            }
        ],
        "count": 1
    },
    "timestamp": "2026-02-06T06:22:27.700Z"
}

DELETE /api/v1/consumers/{consumerGroupId}/instances/{instanceId}

Deletes the consumer instance and releases resources. Always delete the consumer when you're done to free up resources.

Path Parameters

Parameter Type Required Description
consumerGroupId string Required Your consumer group ID
instanceId string Required Consumer instance ID (from create consumer)

⚠️ Important: Good practice to call this in your cleanup/shutdown logic. Deleting a consumer frees up resources on the server.

Example Request
DELETE /api/v1/consumers/your-company-prod/instances/consumer-1738318200000

Headers:
  x-api-key: YOUR_API_KEY
Example Response (200 OK)
{
  "status": true,
  "message": "Consumer deleted successfully",
  "data": {
    "success": true,
    "message": "Consumer deleted successfully"
  },
  "timestamp": "2026-02-03T10:35:00.000Z"
}

Producer API

POST /api/v1/messages

Publish PO Confirmation (ORDRSP05 IDOC) with sales order number.

Request Headers

Header Value Required
x-api-key Your API key Required
Content-Type application/json Required

Request Body

Field Type Required
topic string Required
key string Optional
value string Required

⭐ IDOC Fields:
• E1EDK01.ACTION = 29
• E1EDK02 QUALF=001 (PO Number)
• E1EDK02 QUALF=017 (Sales Order Number)

Example Request
POST /api/v1/messages

Headers:
  x-api-key: YOUR_API_KEY
  Content-Type: application/json

Body:
{
  "topic": "sh6.PO05.CONFIRMATION",
  "key": "4500005037",
  "value": "<?xml version=\"1.0\"?>
<IDOC BEGIN=\"1\">
  <EDI_DC40 SEGMENT=\"1\">
    <IDOCTYP>ORDRSP05</IDOCTYP>
  </EDI_DC40>
  <E1EDK01 SEGMENT=\"1\">
    <ACTION>29</ACTION>
  </E1EDK01>
  <E1EDK02 SEGMENT=\"1\">
    <QUALF>001</QUALF>
    <BELNR>4500005037</BELNR>
  </E1EDK02>
  <E1EDK02 SEGMENT=\"1\">
    <QUALF>017</QUALF>
    <BELNR>SO-2026-001</BELNR>
  </E1EDK02>
</IDOC>",
  "headers": {
    "content-type": "application/xml"
  }
}
Example Response (200 OK)
{
  "status": true,
  "message": "Message published successfully",
  "data": {
    "topic": "sh6.PO05.CONFIRMATION",
    "partition": 0,
    "offset": 12345
  }
}
💡 Use Case: After accepting a PO, send your internal sales order number to Sponsor.

POST /api/v1/messages/batch

Publish multiple messages to the same Kafka topic in a single request. More efficient than multiple individual requests.

Request Headers

Header Value Required
x-api-key Your API key Required
Content-Type application/json Required

Request Body

Field Type Required Description
topic string Required Kafka topic name
messages array Required Array of message objects (each with key, value, headers)
Example Request
POST /api/v1/messages/batch

Headers:
  x-api-key: YOUR_API_KEY
  Content-Type: application/json

Body:
{
  "topic": "sh6.PO05.CONFIRMATION",
  "messages": [
    {
      "key": "4500005037",
      "value": "<?xml version=\"1.0\"?><IDOC>...</IDOC>",
      "headers": { "content-type": "application/xml" }
    },
    {
      "key": "4500005038",
      "value": "<?xml version=\"1.0\"?><IDOC>...</IDOC>",
      "headers": { "content-type": "application/xml" }
    }
  ]
}
Example Response (200 OK)
{
  "status": true,
  "message": "Batch published successfully",
  "data": {
    "topic": "sh6.PO05.CONFIRMATION",
    "count": 2,
    "results": [
      { "partition": 0, "offset": 12345 },
      { "partition": 0, "offset": 12346 }
    ]
  }
}

Code Examples

Polling Best Practices

Complete Node.js Example

const axios = require('axios');

const API_KEY = '<API_KEY>';
const BASE_URL = '<BASE_URL>';  // Your domain
const CONSUMER_GROUP = 'your-company-prod';
const INSTANCE_ID = 'consumer-1738318200000';

// Poll for messages at your preferred schedule
async function pollMessages() {
  try {
    const response = await axios.get(
      `${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/records?timeout=30000`,
      { headers: { 'x-api-key': API_KEY } }
    );

    const records = response.data.data.records;

    if (records.length > 0) {
      for (const record of records) {
        await processPO(record.value);
      }
    }
  } catch (error) {
    console.error('Error:', error.message);
  }
}

// Process PO
async function processPO(record) {
  const poNumber = record.key;  // Use key (PO number) for deduplication
  const offset = record.offset;

  // Check for duplicate using offset or PO number
  if (await database.checkIfProcessed(poNumber, offset)) return;

  // Parse XML and save PO
  const poData = parseXML(record.value);  // Parse IDOC XML
  await database.savePO(poData);
  await database.markProcessed(cordaTxId);
}

// Start polling (example: run daily using cron or scheduler)
// For continuous polling: setInterval(pollMessages, 86400000); // 24 hours
pollMessages();

Complete Python Example

import requests
import time

API_KEY = '<API_KEY>'
BASE_URL = '<BASE_URL>'
CONSUMER_GROUP = '<CONSUMER_GROUP>'
INSTANCE_ID = 'consumer-1738318200000'

def poll_messages():
    try:
        response = requests.get(
            f'{BASE_URL}/api/v1/consumers/{CONSUMER_GROUP}/instances/{INSTANCE_ID}/records?timeout=30000',
            headers={'x-api-key': API_KEY}
        )

        records = response.json()['data']['records']

        if records:
            for record in records:
                process_po(record['value'])
    except Exception as e:
        print(f'Error: {e}')

def process_po(record):
    po_number = record['key']  # Use key (PO number) for deduplication
    offset = record['offset']

    if database.check_if_processed(po_number, offset):
        return

    # Parse XML and save PO
    po_data = parse_xml(record['value'])  # Parse IDOC XML
    database.save_po(po_data)
    database.mark_processed(po_number, offset)

# Start polling at your preferred schedule
# Example: Run daily using cron or scheduler
# For continuous polling: time.sleep(86400) # 24 hours
poll_messages()

⚠️ Deduplication Required: Always check offset and key (PO number) against your database to prevent processing the same PO twice.

Best Practices

🔑 Use the Same Consumer Group Name

Choose a stable consumer group name (e.g., acme-pharma-prod) and use it every time. This ensures:

  • ✅ No duplicate POs
  • ✅ Automatic progress tracking
  • ✅ Resume from where you left off

Example:

const CONSUMER_GROUP = 'acme-pharma-prod'; // ← Never change this!

🛡️ Prevent Duplicate Processing

Use key (PO number) and offset to check if you've already processed a PO:

async function processPO(record) {
  const poNumber = record.key;  // PO number
  const offset = record.offset;

  // Check if already processed
  if (await database.alreadyProcessed(poNumber, offset)) {
    console.log('Already processed, skipping');
    return;
  }

  // Process PO
  await database.savePO(po.payload.json.SADetails);

  // Mark as processed
  await database.markProcessed(txId);
}

📅 Polling Schedule

Poll at your preferred schedule - daily, hourly, or as needed:

Schedule Cron Expression Use Case
Daily (recommended) 0 0 * * * Batch processing overnight
Hourly 0 * * * * More frequent updates
Every 6 hours 0 */6 * * * Balanced approach
Manual Run when needed On-demand processing

💡 No timeout worries - Your consumer stays active even if you don't poll for days!

📝 Complete Example

// config.js
const CONSUMER_GROUP = 'acme-pharma-prod';  // Stable name
const INSTANCE_ID = 'my-consumer-1';
const BASE_URL = '<BASE_URL>';  // Your domain
const API_KEY = '<API_KEY>';

// setup.js - Run once to create consumer
async function setupConsumer() {
  // Create consumer
  await fetch(`${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}`, {
    method: 'POST',
    headers: {
      'x-api-key': API_KEY,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      name: INSTANCE_ID,
      format: 'binary',
      'auto.offset.reset': 'latest'
    })
  });

  // Subscribe to topic
  await fetch(
    `${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/subscription`,
    {
      method: 'POST',
      headers: {
        'x-api-key': API_KEY,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ topics: ['sh6.PO01.NEW'] })
    }
  );

  console.log('✅ Consumer setup complete');
}

// poll.js - Run daily via cron
async function pollMessages() {
  const response = await fetch(
    `${BASE_URL}/api/v1/consumers/${CONSUMER_GROUP}/instances/${INSTANCE_ID}/records?timeout=30000`,
    { headers: { 'x-api-key': API_KEY } }
  );

  const data = await response.json();

  if (data.data.records.length > 0) {
    console.log(`Received ${data.data.count} new POs`);

    for (const record of data.data.records) {
      await processPO(record);
    }
  } else {
    console.log('No new POs');
  }
}

async function processPO(record) {
  const poNumber = record.key;  // PO number
  const offset = record.offset;

  // Check for duplicates using offset or PO number
  if (await db.alreadyProcessed(poNumber, offset)) {
    return;
  }

  // Save PO to your database
  await db.savePO({
    poNumber: po.payload.json.SADetails.scheduleAgreementNumber,
    vendor: po.payload.json.SADetails.vendorName,
    amount: po.payload.json.SLDetails[0].netPrice,
    // ... other fields
  });

  // Mark as processed
  await db.markProcessed(txId);
}

pollMessages();

✅ Quick Checklist

  • ☑ Use a stable consumer group name (never change it)
  • ☑ Run setup once to create and subscribe consumer
  • ☑ Poll at your preferred schedule (daily recommended)
  • ☑ Handle errors gracefully (log and retry)
  • ☑ Monitor your consumer (check logs, track message counts)

Quick Reference Summary

Complete API Flow

# 1. Create Consumer (once at startup)
POST /api/v1/consumers/{consumerGroupId}
→ Returns: instanceId

# 2. Subscribe to Topic (once at startup)
POST /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/subscription
Body: { "topics": ["sh6.PO01.NEW"] }

# 3. Get POs (repeat at your preferred schedule, e.g., daily)
GET /api/v1/consumers/{consumerGroupId}/instances/{instanceId}/records?timeout=30000
→ Returns: Array of new POs

Key Points to Remember

  • ✅ Include x-api-key header in every request
  • ✅ Save the instanceId from Step 1
  • ✅ Poll at your preferred schedule (e.g., daily) with timeout=30000
  • ✅ You only receive NEW POs (created after subscription)
  • ✅ Kafka automatically tracks what you've received

Need Help?

Email: support@collabrix.co
Response Time: Within 4 business hours
Hours: Monday-Friday, 9AM-6PM EST