Arc Output Plugin for Redpanda Connect: Stream Any Source into Arc

If you've worked with data pipelines long enough, you know the pattern. You have data in system A. You need it in system B. So you write a script. The script works. Then it crashes at 3am because someone changed a field name upstream. You add error handling. Then retry logic. Then batching. Then compression. Then you realize you've written a stream processor from scratch, and it's Tuesday.
Redpanda Connect exists so you don't have to do that.
It's a stream processor that connects sources to sinks with a YAML config file. Kafka to S3. MQTT to PostgreSQL. HTTP webhooks to Elasticsearch. Over 200 connectors, a built-in transformation language called Bloblang, and a single binary you can drop anywhere. No JVM, no cluster, no operator certification required.
We use it internally at Basekick. When we needed to pipe data from different systems into Arc for testing, Redpanda Connect was the obvious choice. The problem was there was no Arc output plugin. So we built one.
Why This Matters
Arc already has a Telegraf output plugin and native MQTT ingestion. Why add another integration?
Because Telegraf and MQTT solve different problems than Redpanda Connect.
Telegraf is a metric collector. It pulls data from systems at fixed intervals. MQTT is a publish-subscribe protocol for IoT devices. Redpanda Connect is a stream processor. It handles event-driven data, transformations, fan-out, fan-in, retries, dead letter queues, and backpressure. It's for when your data needs to be reshaped, filtered, or enriched before it lands in Arc.
Some examples:
- Kafka to Arc: Consume events from a Kafka topic, extract the fields you need with Bloblang, and write them to Arc as time-series data
- Webhooks to Arc: Receive HTTP webhooks from a third-party API, normalize the payload, and store it for analysis
- CDC to Arc: Capture database change events and stream them into Arc for historical tracking
- Multi-destination: Send the same data to Arc and another system simultaneously, with different transformations for each
The plugin speaks Arc's native MessagePack protocol. No translation layer, no line protocol conversion. Binary MessagePack with zstd compression, straight to Arc's ingestion endpoint.
What We Built
The plugin is a batch output that sends data to POST /api/v1/write/msgpack. It supports two payload formats:
Columnar format (default): Transposes batched messages into column arrays. This is Arc's fastest ingestion path, significantly faster than row format because it avoids per-row overhead and maps directly to Arc's internal Arrow buffers. The plugin collects messages in a batch, groups them by measurement name, and builds columnar payloads where each field becomes an array of values.
Row format: Sends each message as an individual record with fields and optional tags. Useful when messages have varying schemas or when you need per-message tags.
Both formats are compressed with zstd by default (gzip and no compression are also available). The compression encoders are pooled to avoid re-allocation under high throughput.
A Minimal Config
input:
kafka:
addresses: ["localhost:9092"]
topics: ["vehicle-telemetry"]
consumer_group: "arc-ingest"
output:
arc:
url: http://localhost:8000
database: logistics
measurement: fleet_tracking
format: columnar
compression: zstd
batching:
count: 1000
period: 1sThat's a Kafka-to-Arc pipeline. Vehicle telemetry is consumed from a topic, batched in groups of 1000 (or every second, whichever comes first), encoded as columnar MessagePack, compressed with zstd, and sent to Arc.
Configuration Reference
| Option | Description | Default |
|---|---|---|
url | Base URL of the Arc instance | Required |
token | Bearer token for authentication | Optional |
database | Target database in Arc | "default" |
measurement | Measurement (table) name, supports interpolation | Required |
format | Payload format: columnar or row | "columnar" |
compression | Compression: zstd, gzip, or none | "zstd" |
timestamp_field | Field containing the timestamp | Current time if empty |
timestamp_unit | Unit of numeric timestamps: us, ms, s, ns, auto | "auto" |
tags_mapping | Bloblang mapping to extract tags (row format only) | Optional |
tls | TLS configuration | Optional |
batching | Standard batch policy (count, period, byte_size) | Optional |
max_in_flight | Maximum parallel batches | 64 |
Dynamic Measurement Names
The measurement field supports Redpanda Connect's interpolation functions. This means you can route messages to different tables based on their content:
output:
arc:
url: http://localhost:8000
database: logistics
measurement: ${!json("asset_type")}
# Messages with {"asset_type": "truck", ...} go to the "truck" table
# Messages with {"asset_type": "drone", ...} go to the "drone" tableOr from message metadata:
output:
arc:
url: http://localhost:8000
database: logistics
measurement: ${!metadata("measurement")}Row Format with Tags
When you need tags for grouping and filtering in Arc, use row format with a Bloblang tags mapping:
output:
arc:
url: http://localhost:8000
database: logistics
measurement: fleet_tracking
format: row
tags_mapping: |
root = {"vehicle_id": this.vehicle_id, "fleet": this.fleet, "region": this.region}
compression: zstdGetting Started
Option 1: Build from Source
Clone the repository and build:
git clone https://github.com/redpanda-data/connect.git
cd connect
go build -o ./bin/redpanda-connect ./cmd/redpanda-connect/Option 2: Use a Pre-built Binary
Once the plugin is included in a Redpanda Connect release, you can download the binary from the https://github.com/redpanda-data/connect/releases or install via:
brew install redpanda-data/tap/redpanda-connectRun a Test Pipeline
Create a config file called arc-test.yaml:
input:
generate:
count: 100
interval: 100ms
mapping: |
root.vehicle_id = "truck-" + random_int(min: 1, max: 50).string()
root.lat = 40.7128 + (random_int(min: -1000, max: 1000).number() / 10000)
root.lon = -74.0060 + (random_int(min: -1000, max: 1000).number() / 10000)
root.speed_kmh = random_int(min: 0, max: 120)
root.fuel_level = random_int(min: 5, max: 100)
root.engine_temp = random_int(min: 70, max: 110)
output:
arc:
url: http://localhost:8000
database: logistics
measurement: fleet_tracking
format: columnar
compression: zstd
batching:
count: 50
period: 1sRun it:
./bin/redpanda-connect run ./arc-test.yamlYou should see output like:
INFO Running main config from specified file path=./arc-test.yaml
INFO Input type generate is now active
INFO Output type arc is now active
INFO Pipeline has terminated. Shutting down the service
Query Arc to verify:
curl -s -X POST http://localhost:8000/api/v1/query \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT time, vehicle_id, lat, lon, speed_kmh, fuel_level FROM logistics.fleet_tracking ORDER BY time DESC LIMIT 10"}' | jq .A Real-World Example: Kafka Events to Arc
Here's a more realistic pipeline. Suppose you have JSON events landing in a Kafka topic from your application:
{"event": "page_view", "user_id": "u123", "page": "/pricing", "duration_ms": 4200, "timestamp": 1712000000000}You want to store these in Arc for analytics. But you also want to enrich the data, filter out bot traffic, and normalize the timestamp:
input:
kafka:
addresses: ["kafka:9092"]
topics: ["app-events"]
consumer_group: "arc-analytics"
pipeline:
processors:
- mapping: |
# Drop bot traffic
root = if this.user_id.has_prefix("bot-") { deleted() }
# Keep and reshape the fields we care about
root.user_id = this.user_id
root.page = this.page
root.duration_ms = this.duration_ms
root.event_type = this.event
output:
arc:
url: http://localhost:8000
database: analytics
measurement: page_views
format: columnar
timestamp_field: timestamp
timestamp_unit: ms
compression: zstd
batching:
count: 5000
period: 5sBot traffic is dropped before it reaches Arc. The timestamp field from the Kafka message is used as the time column (converted from milliseconds). Everything is batched in groups of 5000 and compressed.
This is where Redpanda Connect shines compared to a direct integration. The transformation happens in the pipeline, not in your application code. Change the mapping, restart the process. No redeployment, no code review for a field rename.
How It Works Under the Hood
For the curious, here's what happens when WriteBatch is called:
- Messages in the batch are grouped by measurement name (which can vary per message via interpolation)
- For each group, the plugin transposes row data into columnar arrays. If your batch has 1000 messages with fields
vehicle_id,lat, andspeed_kmh, you get three arrays of 1000 values each, plus atimearray - The columnar payload is encoded as MessagePack using the same wire format Arc's decoder expects
- The encoded bytes are compressed with zstd (pooled encoder, no allocation per call)
- A single HTTP POST sends the batch to Arc's
/api/v1/write/msgpackendpoint - Arc decodes the MessagePack, validates the schema, and writes directly to its Arrow buffer for Parquet storage
No intermediate format. No JSON serialization. Binary all the way from Redpanda Connect to Arc's columnar storage.
No New Dependencies
The plugin uses libraries already present in Redpanda Connect's dependency tree:
vmihailenco/msgpack/v5for MessagePack encoding (same wire format as Arc's decoder)klauspost/compressfor zstd and gzip compression (the same library Arc uses server-side)
Zero new imports in go.mod. This keeps the binary size unchanged and avoids adding supply chain surface area.
What's Next
The plugin is submitted as a https://github.com/redpanda-data/connect/pull/4236 to the Redpanda Connect repository. Once merged and released, it will be available in all Redpanda Connect distributions (full, cloud, and community).
In future iterations, we're considering:
- An Arc input plugin, so you can read from Arc and push data to other systems
- A processor plugin for Bloblang functions that query Arc inline during pipeline execution
If you have ideas, opinions, or problems this could solve for you, come tell us.
Resources:
Ready to handle billion-record workloads?
Deploy Arc in minutes. Own your data in Parquet. Use for analytics, observability, AI, IoT, or data warehousing.