Arc Cloud is live. Start free — no credit card required.

Arc Output Plugin for Redpanda Connect: Stream Any Source into Arc

#Arc#Redpanda Connect#Redpanda#output plugin#stream processing#data pipeline#ingestion#MessagePack#zstd#Bloblang
Cover image for Arc Output Plugin for Redpanda Connect: Stream Any Source into Arc

If you've worked with data pipelines long enough, you know the pattern. You have data in system A. You need it in system B. So you write a script. The script works. Then it crashes at 3am because someone changed a field name upstream. You add error handling. Then retry logic. Then batching. Then compression. Then you realize you've written a stream processor from scratch, and it's Tuesday.

Redpanda Connect exists so you don't have to do that.

It's a stream processor that connects sources to sinks with a YAML config file. Kafka to S3. MQTT to PostgreSQL. HTTP webhooks to Elasticsearch. Over 200 connectors, a built-in transformation language called Bloblang, and a single binary you can drop anywhere. No JVM, no cluster, no operator certification required.

We use it internally at Basekick. When we needed to pipe data from different systems into Arc for testing, Redpanda Connect was the obvious choice. The problem was there was no Arc output plugin. So we built one.

Why This Matters

Arc already has a Telegraf output plugin and native MQTT ingestion. Why add another integration?

Because Telegraf and MQTT solve different problems than Redpanda Connect.

Telegraf is a metric collector. It pulls data from systems at fixed intervals. MQTT is a publish-subscribe protocol for IoT devices. Redpanda Connect is a stream processor. It handles event-driven data, transformations, fan-out, fan-in, retries, dead letter queues, and backpressure. It's for when your data needs to be reshaped, filtered, or enriched before it lands in Arc.

Some examples:

  • Kafka to Arc: Consume events from a Kafka topic, extract the fields you need with Bloblang, and write them to Arc as time-series data
  • Webhooks to Arc: Receive HTTP webhooks from a third-party API, normalize the payload, and store it for analysis
  • CDC to Arc: Capture database change events and stream them into Arc for historical tracking
  • Multi-destination: Send the same data to Arc and another system simultaneously, with different transformations for each

The plugin speaks Arc's native MessagePack protocol. No translation layer, no line protocol conversion. Binary MessagePack with zstd compression, straight to Arc's ingestion endpoint.

What We Built

The plugin is a batch output that sends data to POST /api/v1/write/msgpack. It supports two payload formats:

Columnar format (default): Transposes batched messages into column arrays. This is Arc's fastest ingestion path, significantly faster than row format because it avoids per-row overhead and maps directly to Arc's internal Arrow buffers. The plugin collects messages in a batch, groups them by measurement name, and builds columnar payloads where each field becomes an array of values.

Row format: Sends each message as an individual record with fields and optional tags. Useful when messages have varying schemas or when you need per-message tags.

Both formats are compressed with zstd by default (gzip and no compression are also available). The compression encoders are pooled to avoid re-allocation under high throughput.

A Minimal Config

input:
  kafka:
    addresses: ["localhost:9092"]
    topics: ["vehicle-telemetry"]
    consumer_group: "arc-ingest"
 
output:
  arc:
    url: http://localhost:8000
    database: logistics
    measurement: fleet_tracking
    format: columnar
    compression: zstd
    batching:
      count: 1000
      period: 1s

That's a Kafka-to-Arc pipeline. Vehicle telemetry is consumed from a topic, batched in groups of 1000 (or every second, whichever comes first), encoded as columnar MessagePack, compressed with zstd, and sent to Arc.

Configuration Reference

OptionDescriptionDefault
urlBase URL of the Arc instanceRequired
tokenBearer token for authenticationOptional
databaseTarget database in Arc"default"
measurementMeasurement (table) name, supports interpolationRequired
formatPayload format: columnar or row"columnar"
compressionCompression: zstd, gzip, or none"zstd"
timestamp_fieldField containing the timestampCurrent time if empty
timestamp_unitUnit of numeric timestamps: us, ms, s, ns, auto"auto"
tags_mappingBloblang mapping to extract tags (row format only)Optional
tlsTLS configurationOptional
batchingStandard batch policy (count, period, byte_size)Optional
max_in_flightMaximum parallel batches64

Dynamic Measurement Names

The measurement field supports Redpanda Connect's interpolation functions. This means you can route messages to different tables based on their content:

output:
  arc:
    url: http://localhost:8000
    database: logistics
    measurement: ${!json("asset_type")}
    # Messages with {"asset_type": "truck", ...} go to the "truck" table
    # Messages with {"asset_type": "drone", ...} go to the "drone" table

Or from message metadata:

output:
  arc:
    url: http://localhost:8000
    database: logistics
    measurement: ${!metadata("measurement")}

Row Format with Tags

When you need tags for grouping and filtering in Arc, use row format with a Bloblang tags mapping:

output:
  arc:
    url: http://localhost:8000
    database: logistics
    measurement: fleet_tracking
    format: row
    tags_mapping: |
      root = {"vehicle_id": this.vehicle_id, "fleet": this.fleet, "region": this.region}
    compression: zstd

Getting Started

Option 1: Build from Source

Clone the repository and build:

git clone https://github.com/redpanda-data/connect.git
cd connect
go build -o ./bin/redpanda-connect ./cmd/redpanda-connect/

Option 2: Use a Pre-built Binary

Once the plugin is included in a Redpanda Connect release, you can download the binary from the https://github.com/redpanda-data/connect/releases or install via:

brew install redpanda-data/tap/redpanda-connect

Run a Test Pipeline

Create a config file called arc-test.yaml:

input:
  generate:
    count: 100
    interval: 100ms
    mapping: |
      root.vehicle_id = "truck-" + random_int(min: 1, max: 50).string()
      root.lat = 40.7128 + (random_int(min: -1000, max: 1000).number() / 10000)
      root.lon = -74.0060 + (random_int(min: -1000, max: 1000).number() / 10000)
      root.speed_kmh = random_int(min: 0, max: 120)
      root.fuel_level = random_int(min: 5, max: 100)
      root.engine_temp = random_int(min: 70, max: 110)
 
output:
  arc:
    url: http://localhost:8000
    database: logistics
    measurement: fleet_tracking
    format: columnar
    compression: zstd
    batching:
      count: 50
      period: 1s

Run it:

./bin/redpanda-connect run ./arc-test.yaml

You should see output like:

INFO Running main config from specified file       path=./arc-test.yaml
INFO Input type generate is now active
INFO Output type arc is now active
INFO Pipeline has terminated. Shutting down the service

Query Arc to verify:

curl -s -X POST http://localhost:8000/api/v1/query \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT time, vehicle_id, lat, lon, speed_kmh, fuel_level FROM logistics.fleet_tracking ORDER BY time DESC LIMIT 10"}' | jq .

A Real-World Example: Kafka Events to Arc

Here's a more realistic pipeline. Suppose you have JSON events landing in a Kafka topic from your application:

{"event": "page_view", "user_id": "u123", "page": "/pricing", "duration_ms": 4200, "timestamp": 1712000000000}

You want to store these in Arc for analytics. But you also want to enrich the data, filter out bot traffic, and normalize the timestamp:

input:
  kafka:
    addresses: ["kafka:9092"]
    topics: ["app-events"]
    consumer_group: "arc-analytics"
 
pipeline:
  processors:
    - mapping: |
        # Drop bot traffic
        root = if this.user_id.has_prefix("bot-") { deleted() }
 
        # Keep and reshape the fields we care about
        root.user_id = this.user_id
        root.page = this.page
        root.duration_ms = this.duration_ms
        root.event_type = this.event
 
output:
  arc:
    url: http://localhost:8000
    database: analytics
    measurement: page_views
    format: columnar
    timestamp_field: timestamp
    timestamp_unit: ms
    compression: zstd
    batching:
      count: 5000
      period: 5s

Bot traffic is dropped before it reaches Arc. The timestamp field from the Kafka message is used as the time column (converted from milliseconds). Everything is batched in groups of 5000 and compressed.

This is where Redpanda Connect shines compared to a direct integration. The transformation happens in the pipeline, not in your application code. Change the mapping, restart the process. No redeployment, no code review for a field rename.

How It Works Under the Hood

For the curious, here's what happens when WriteBatch is called:

  1. Messages in the batch are grouped by measurement name (which can vary per message via interpolation)
  2. For each group, the plugin transposes row data into columnar arrays. If your batch has 1000 messages with fields vehicle_id, lat, and speed_kmh, you get three arrays of 1000 values each, plus a time array
  3. The columnar payload is encoded as MessagePack using the same wire format Arc's decoder expects
  4. The encoded bytes are compressed with zstd (pooled encoder, no allocation per call)
  5. A single HTTP POST sends the batch to Arc's /api/v1/write/msgpack endpoint
  6. Arc decodes the MessagePack, validates the schema, and writes directly to its Arrow buffer for Parquet storage

No intermediate format. No JSON serialization. Binary all the way from Redpanda Connect to Arc's columnar storage.

No New Dependencies

The plugin uses libraries already present in Redpanda Connect's dependency tree:

  • vmihailenco/msgpack/v5 for MessagePack encoding (same wire format as Arc's decoder)
  • klauspost/compress for zstd and gzip compression (the same library Arc uses server-side)

Zero new imports in go.mod. This keeps the binary size unchanged and avoids adding supply chain surface area.

What's Next

The plugin is submitted as a https://github.com/redpanda-data/connect/pull/4236 to the Redpanda Connect repository. Once merged and released, it will be available in all Redpanda Connect distributions (full, cloud, and community).

In future iterations, we're considering:

  • An Arc input plugin, so you can read from Arc and push data to other systems
  • A processor plugin for Bloblang functions that query Arc inline during pipeline execution

If you have ideas, opinions, or problems this could solve for you, come tell us.


Resources:

Questions? Reach out on Twitter or join our Discord.

Ready to handle billion-record workloads?

Deploy Arc in minutes. Own your data in Parquet. Use for analytics, observability, AI, IoT, or data warehousing.

Get Started ->