Master JSON Lines (JSONL): the streaming-friendly JSON format for log processing, data pipelines, and big data. Learn how newline-delimited JSON enables Unix pipelines, fault-tolerant processing, and handling datasets larger than memory.
In
Part 1
, we explored JSON’s origins. In
Part 2
, we added validation. In
Part 3
and
Part 4
, we optimized with binary formats. In
Part 5
, we built RPC protocols.
Now we tackle JSON’s streaming problem: you can’t process JSON incrementally.
The Fundamental Problem: Standard JSON arrays require parsing the entire document. You cannot read the first element until you’ve read the last closing bracket. This all-or-nothing parsing makes JSON unsuitable for large datasets.
The Modular Solution: JSON Lines demonstrates the ecosystem’s response to incompleteness. Rather than add streaming to JSON’s grammar (the monolithic approach), the community created a minimal convention - just separate objects with newlines. This preserves JSON parsers unchanged while enabling new use cases. It’s modularity at its simplest: solve one problem (streaming) without touching the core format.
1
2
3
4
5
6
| [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
...
{"id": 1000000, "name": "Zoe"}
]
|
You must load all 1 million records into memory, parse the complete array, then process. For a 10GB file, this crashes your program.
JSON Lines solves this with one JSON object per line:
{"id": 1, "name": "Alice"}
{"id": 2, "name": "Bob"}
{"id": 1000000, "name": "Zoe"}
Read one line, parse one object, process it, discard it. Memory usage: constant. Dataset size: unlimited.
This article covers streaming JSON processing, log aggregation, Unix pipeline integration, fault tolerance, and real-world data engineering patterns.
Running Example: Exporting 10 Million Users
In
Part 1
, we started with basic JSON. In
Part 2
, we added validation. In
Part 3
, we stored efficiently in JSONB. In
Part 5
, we added protocol structure.
Now we face the scalability problem: our User API has grown to 10 million users. How do we export them for analytics?
JSON array approach (broken):
1
2
3
4
5
| [
{"id": "user-5f9d88c", "username": "alice", "email": "alice@example.com"},
{"id": "user-abc123", "username": "bob", "email": "bob@example.com"},
... 9,999,998 more users
]
|
Problems:
- Must load all 10M users into memory (30+ GB RAM)
- Cannot start processing until complete file is parsed
- Single corrupt user breaks entire export
- Cannot resume if process crashes at user 8 million
JSON Lines approach (scales):
{"id": "user-5f9d88c", "username": "alice", "email": "alice@example.com"}
{"id": "user-abc123", "username": "bob", "email": "bob@example.com"}
{"id": "user-def456", "username": "carol", "email": "carol@example.com"}
Export pipeline (constant memory):
1
2
3
4
5
6
7
| // Stream from database to file
const writeStream = fs.createWriteStream('users-export.jsonl');
const cursor = db.collection('users').find().stream();
cursor.on('data', (user) => {
writeStream.write(JSON.stringify(user) + '\n');
});
|
Memory usage: 10KB per user batch, regardless of total users. Process 100GB+ files with <1MB RAM.
This completes the streaming layer for our User API.
The Streaming Problem
JSON Arrays Don’t Stream
The issue:
1
2
3
4
5
| [
{"timestamp": "2023-01-15T10:00:00Z", "level": "info", "message": "Server started"},
{"timestamp": "2023-01-15T10:00:01Z", "level": "info", "message": "Database connected"},
{"timestamp": "2023-01-15T10:00:02Z", "level": "error", "message": "API timeout"}
]
|
To find all error logs, you must:
- Read entire file into memory
- Parse complete JSON array
- Iterate through array
- Filter for
"level": "error"
For a 10GB log file:
- Memory usage: 10GB+
- Parse time: Minutes
- Processing: All-or-nothing
Why JSON Arrays Are All-or-Nothing
JSON syntax requires reading the entire structure:
1
2
3
4
5
| [
{"id": 1},
{"id": 2},
{"id": 3}
]
|
The parser can’t know it’s valid JSON until it sees the closing ]. Each , could be followed by more elements. The structure is inherently non-streaming.
Streaming parsers exist (SAX-style event-based parsers), but they’re complex and still require tracking nesting depth, bracket matching, and state across the entire document.
What XML Had: SAX and StAX streaming parsers (1998-2004)
XML’s approach: Built-in streaming support through complex parser APIs. SAX provided event-driven parsing, StAX enabled pull-based streaming - both required extensive state management and event handling.
1
2
3
4
5
6
7
8
9
10
| // SAX: Complex event-driven streaming
DefaultHandler handler = new DefaultHandler() {
@Override
public void startElement(String uri, String localName, String qName, Attributes attr) {
if (qName.equals("user")) {
processUser(attr.getValue("id"), attr.getValue("name"));
}
}
};
parser.parse("users.xml", handler);
|
Benefit: Constant memory usage for any file size, built into XML ecosystem
Cost: Complex APIs (200+ lines for simple tasks), steep learning curve, stateful parsing
JSON’s approach: Format convention (JSON Lines) - separate standard
Architecture shift: Built-in streaming → Format-based streaming, Complex APIs → Simple line reading, Stateful parsing → Stateless processing
What XML Had: SAX and StAX
XML solved streaming with built-in parser APIs:
SAX (Simple API for XML) - Event-based streaming (1998):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // SAX parser for streaming XML
SAXParserFactory factory = SAXParserFactory.newInstance();
SAXParser parser = factory.newSAXParser();
DefaultHandler handler = new DefaultHandler() {
@Override
public void startElement(String uri, String localName, String qName, Attributes attributes) {
if (qName.equals("user")) {
String id = attributes.getValue("id");
String name = attributes.getValue("name");
processUser(id, name); // Process immediately
}
}
};
parser.parse("users.xml", handler); // Streams through file
|
StAX (Streaming API for XML) - Pull-based streaming (2004):
1
2
3
4
5
6
7
8
9
10
11
| XMLInputFactory factory = XMLInputFactory.newInstance();
XMLStreamReader reader = factory.createXMLStreamReader(new FileInputStream("users.xml"));
while (reader.hasNext()) {
int event = reader.next();
if (event == XMLStreamConstants.START_ELEMENT && reader.getLocalName().equals("user")) {
String id = reader.getAttributeValue(null, "id");
String name = reader.getAttributeValue(null, "name");
processUser(id, name);
}
}
|
Benefit: Constant memory. Parse 10GB XML files with <10MB RAM.
Cost: Complex API. Stateful parsing (track nesting, handle events, match tags). 200+ lines of code for simple streaming tasks.
JSON’s approach: No built-in streaming support. Standard JSON parsers are DOM-style (load entire document). Streaming JSON parsers exist but are complex and non-standard.
Memory Reality: Loading a 1GB JSON array uses 3-5GB of RAM due to parsing overhead and object allocation. A 10GB file requires 30-50GB of memory and will crash most systems.
XML comparison: SAX/StAX could process 10GB XML files with constant memory since 1998. JSON lacked this capability for its first decade, until JSON Lines emerged as the community solution.
The Simplicity Breakthrough
JSON Lines (also called JSONL, NDJSON, newline-delimited JSON) achieves streaming with minimal complexity:
Where XML needed complex APIs (SAX/StAX with 200+ LOC), JSON Lines uses one convention: newlines.
Comparison:
| Aspect | XML (SAX/StAX) | JSON Lines |
|---|
| Streaming support | Built-in parser APIs | Format convention |
| Code complexity | 200+ lines (handlers, state) | 5 lines (read line, parse) |
| Parser requirements | Special streaming parsers | Standard JSON parsers |
| Learning curve | Complex (events, pull model) | Trivial (readline + parse) |
| Error handling | Track state across events | Per-line isolation |
| Resume/skip | Complex (replay events) | Simple (seek to line) |
| Unix integration | Difficult (XML structure) | Native (text lines) |
JSON Lines approach:
1
2
3
4
5
6
7
8
| // Streaming 10GB file: 5 lines of code
const readline = require('readline');
const stream = readline.createInterface({ input: fs.createReadStream('data.jsonl') });
stream.on('line', (line) => {
const obj = JSON.parse(line); // Standard parser
process(obj); // Constant memory
});
|
Contrast with SAX (40+ lines minimum):
- No handler classes
- No state tracking
- No event matching
- No tag nesting management
- Just: read line, parse JSON, process
The modular brilliance: JSON Lines didn’t require new parsers or language features. It’s pure convention - use existing tools (readline, JSON.parse) in a streaming pattern.
Specification
JSON Lines rules:
- Each line is a valid JSON value (typically an object)
- Lines are separated by
\n (newline character) - The file has no outer array brackets
That’s it. No special syntax, no new parser needed.
Example:
{"id": 1, "name": "Alice", "active": true}
{"id": 2, "name": "Bob", "active": false}
{"id": 3, "name": "Carol", "active": true}
Specification:
jsonlines.org
Benefits
+ Streaming-friendly - Process one line at a time
+ Constant memory - Only one object in memory
+ Append-only - Add new records without reparsing
+ Fault-tolerant - One corrupt line doesn’t break the file
+ Unix-compatible - Works with grep, awk, sed, head, tail
+ Simple - No special format, just newlines between JSON
+ Resumable - Stop and restart processing at any line
+ Parallel-friendly - Multiple workers process different chunks
Comparison
| Aspect | JSON Array | JSON Lines |
|---|
| Memory usage | O(file size) | O(1 object) |
| Streaming | No | Yes |
| Append | Must rewrite file | Just append |
| Corruption | Entire file invalid | Only affected lines |
| Unix tools | Difficult | Native support |
| Parallel | Must coordinate | Independent chunks |
| Random access | Must parse to position | Seek to line |
flowchart LR
subgraph json["Standard JSON Array"]
file1[Read entire file]
parse1[Parse complete array]
mem1[Load all in memory]
proc1[Process all]
file1 --> parse1 --> mem1 --> proc1
end
subgraph jsonl["JSON Lines"]
file2[Read one line]
parse2[Parse one object]
mem2[Process object]
next[Next line]
file2 --> parse2 --> mem2 --> next
next -.Loop.-> file2
end
data[(10 GB file)] --> json
data --> jsonl
json -.Requires 30+ GB RAM.-> crash[Out of Memory]
jsonl -.Uses <10 MB RAM.-> success[Success]
style json fill:#4C3A3C,stroke:#6b7280,color:#f0f0f0
style jsonl fill:#3A4C43,stroke:#6b7280,color:#f0f0f0
Reading JSON Lines
Node.js (Streaming)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| const fs = require('fs');
const readline = require('readline');
async function processJSONL(filename) {
const fileStream = fs.createReadStream(filename);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
let count = 0;
for await (const line of rl) {
if (!line.trim()) continue; // Skip empty lines
try {
const obj = JSON.parse(line);
// Process object
if (obj.level === 'error') {
console.log('Error:', obj.message);
}
count++;
} catch (err) {
console.error(`Parse error on line ${count + 1}:`, err.message);
}
}
console.log(`Processed ${count} records`);
}
// Usage
processJSONL('logs.jsonl');
|
Memory usage: ~1KB per object, constant regardless of file size.
Go (bufio.Scanner)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
| package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
}
func processJSONL(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
// Increase buffer size for large lines (default 64KB)
const maxCapacity = 1024 * 1024 // 1MB
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)
count := 0
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
var entry LogEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
fmt.Printf("Parse error on line %d: %v\n", count+1, err)
continue
}
// Process entry
if entry.Level == "error" {
fmt.Printf("Error: %s\n", entry.Message)
}
count++
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
fmt.Printf("Processed %d records\n", count)
return nil
}
func main() {
if err := processJSONL("logs.jsonl"); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
|
Python (Line-by-line)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import json
def process_jsonl(filename):
count = 0
with open(filename, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
# Process object
if obj.get('level') == 'error':
print(f"Error: {obj['message']}")
count += 1
except json.JSONDecodeError as e:
print(f"Parse error on line {line_num}: {e}")
print(f"Processed {count} records")
# Usage
process_jsonl('logs.jsonl')
|
Pandas for analytics:
1
2
3
4
5
6
7
8
9
10
| import pandas as pd
# Read entire JSONL file into DataFrame
df = pd.read_json('data.jsonl', lines=True)
# Read in chunks (constant memory)
for chunk in pd.read_json('large.jsonl', lines=True, chunksize=1000):
# Process 1000 records at a time
filtered = chunk[chunk['status'] == 'active']
print(f"Active users: {len(filtered)}")
|
Rust (BufReader)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| use std::fs::File;
use std::io::{BufRead, BufReader};
use serde_json::Value;
fn process_jsonl(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
let file = File::open(filename)?;
let reader = BufReader::new(file);
let mut count = 0;
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<Value>(&line) {
Ok(obj) => {
// Process object
if obj["level"] == "error" {
println!("Error: {}", obj["message"]);
}
count += 1;
}
Err(e) => {
eprintln!("Parse error on line {}: {}", line_num + 1, e);
}
}
}
println!("Processed {} records", count);
Ok(())
}
fn main() {
if let Err(e) = process_jsonl("logs.jsonl") {
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
|
Streaming Advantage: These programs use constant memory regardless of file size. A 1GB file and a 100GB file use the same RAM - just one line at a time.
Writing JSON Lines
Node.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| const fs = require('fs');
class JSONLWriter {
constructor(filename) {
this.stream = fs.createWriteStream(filename);
}
write(obj) {
this.stream.write(JSON.stringify(obj) + '\n');
}
close() {
this.stream.end();
}
}
// Usage
const writer = new JSONLWriter('output.jsonl');
for (let i = 0; i < 1000000; i++) {
writer.write({
id: i,
timestamp: new Date().toISOString(),
value: Math.random()
});
}
writer.close();
|
Memory usage: Constant - objects are written and discarded immediately.
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| import (
"bufio"
"encoding/json"
"os"
)
func writeJSONL(filename string, records []interface{}) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
writer := bufio.NewWriter(file)
defer writer.Flush()
encoder := json.NewEncoder(writer)
encoder.SetEscapeHTML(false)
for _, record := range records {
if err := encoder.Encode(record); err != nil {
return err
}
}
return nil
}
// Streaming write (doesn't hold all records in memory)
func streamWriteJSONL(filename string) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
encoder := json.NewEncoder(file)
encoder.SetEscapeHTML(false)
for i := 0; i < 1000000; i++ {
record := map[string]interface{}{
"id": i,
"timestamp": time.Now().Format(time.RFC3339),
"value": rand.Float64(),
}
if err := encoder.Encode(record); err != nil {
return err
}
}
return nil
}
|
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import json
def write_jsonl(filename, records):
with open(filename, 'w') as f:
for record in records:
f.write(json.dumps(record) + '\n')
# Generator for streaming (doesn't load all records)
def stream_write_jsonl(filename, record_generator):
with open(filename, 'w') as f:
for record in record_generator:
f.write(json.dumps(record) + '\n')
# Usage
def generate_records():
for i in range(1000000):
yield {
'id': i,
'timestamp': datetime.now().isoformat(),
'value': random.random()
}
stream_write_jsonl('output.jsonl', generate_records())
|
Unix Pipeline Integration
JSON Lines works beautifully with Unix tools:
grep (Filter by content)
1
2
3
4
5
6
7
8
9
10
11
| # Find all error logs
grep '"level":"error"' logs.jsonl
# Case-insensitive search
grep -i '"status":"failed"' events.jsonl
# Count error logs
grep -c '"level":"error"' logs.jsonl
# Find logs from specific user
grep '"user_id":123' access.jsonl
|
head and tail
1
2
3
4
5
6
7
8
9
10
11
| # First 10 records
head -n 10 data.jsonl
# Last 100 records
tail -n 100 data.jsonl
# Monitor logs in real-time
tail -f application.jsonl
# Skip first 1000 records
tail -n +1001 data.jsonl
|
wc (Count)
1
2
3
4
5
| # Count total records
wc -l data.jsonl
# Count after filtering
grep '"status":"active"' users.jsonl | wc -l
|
1
2
3
4
5
| # Extract specific field (crude)
sed 's/.*"email":"\([^"]*\)".*/\1/' users.jsonl
# Remove field
sed 's/"password":"[^"]*",//g' users.jsonl
|
awk (Process)
1
2
3
4
5
6
7
8
9
10
11
| # Print specific field
awk -F'"' '{print $4}' users.jsonl # Print first field value
# Complex processing
awk '{
if ($0 ~ /"level":"error"/) {
count++
}
} END {
print "Errors:", count
}' logs.jsonl
|
jq (JSON processor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Extract field from each line
jq -r '.email' users.jsonl
# Filter objects
jq 'select(.level == "error")' logs.jsonl
# Transform objects
jq '{id, name, email}' users.jsonl
# Aggregate
jq -s 'map(.amount) | add' transactions.jsonl
# Complex query
jq 'select(.status == "active" and .age > 30) | {name, email}' users.jsonl
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Find errors, extract message, count unique
grep '"level":"error"' logs.jsonl | \
jq -r '.message' | \
sort | \
uniq -c | \
sort -rn
# Filter users, transform, save
jq 'select(.active == true) | {id, email}' users.jsonl > active-users.jsonl
# Sample 10% of data
awk 'rand() < 0.1' large-dataset.jsonl > sample.jsonl
# Split large file into chunks
split -l 10000 data.jsonl chunk_
# Results: chunk_aa, chunk_ab, chunk_ac (10K lines each)
|
flowchart LR
subgraph unix["Unix Pipeline"]
input[logs.jsonl]
grep[grep error]
jq[jq extract]
sort[sort]
uniq[uniq -c]
output[error-summary.txt]
end
input --> grep --> jq --> sort --> uniq --> output
style unix fill:#3A4A5C,stroke:#6b7280,color:#f0f0f0
Log Processing with JSON Lines
Structured Logging
Modern logging libraries output JSON Lines:
Node.js (pino):
1
2
3
4
5
6
7
8
9
10
11
12
13
| const pino = require('pino');
const logger = pino({
level: 'info',
// Output JSON Lines to stdout
});
logger.info({user: 'alice', action: 'login'}, 'User logged in');
logger.error({error: err.message, stack: err.stack}, 'Request failed');
// Output (JSONL):
// {"level":30,"time":1673780400000,"user":"alice","action":"login","msg":"User logged in"}
// {"level":50,"time":1673780401000,"error":"Timeout","msg":"Request failed"}
|
Go (zerolog):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import "github.com/rs/zerolog/log"
func main() {
// Logs output as JSONL
log.Info().
Str("user", "alice").
Str("action", "login").
Msg("User logged in")
log.Error().
Err(err).
Str("endpoint", "/api/users").
Msg("Request failed")
}
// Output:
// {"level":"info","user":"alice","action":"login","message":"User logged in","time":"2023-01-15T10:00:00Z"}
// {"level":"error","error":"timeout","endpoint":"/api/users","message":"Request failed","time":"2023-01-15T10:00:01Z"}
|
Python (structlog):
1
2
3
4
5
6
7
8
9
10
| import structlog
logger = structlog.get_logger()
logger.info("User logged in", user="alice", action="login")
logger.error("Request failed", error=str(err), endpoint="/api/users")
# Output (JSONL):
# {"event": "User logged in", "user": "alice", "action": "login", "timestamp": "2023-01-15T10:00:00Z"}
# {"event": "Request failed", "error": "timeout", "endpoint": "/api/users", "timestamp": "2023-01-15T10:00:01Z"}
|
Querying Logs
Find errors in last hour:
1
2
| tail -n 10000 app.jsonl | \
jq 'select(.level == "error" and .timestamp > "2023-01-15T09:00:00Z")'
|
Count errors by endpoint:
1
2
3
4
5
| grep '"level":"error"' app.jsonl | \
jq -r '.endpoint' | \
sort | \
uniq -c | \
sort -rn
|
Track slow requests:
1
| jq 'select(.duration > 1000) | {endpoint, duration, user}' app.jsonl
|
Monitor logs in real-time:
1
| tail -f app.jsonl | jq 'select(.level == "error")'
|
Log Aggregation Pipeline
Fluentd configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| <source>
@type tail
path /var/log/app/*.jsonl
format json
tag app.logs
</source>
<filter app.logs>
@type record_transformer
<record>
hostname ${hostname}
environment production
</record>
</filter>
<match app.logs>
@type elasticsearch
host elasticsearch.local
port 9200
index_name app-logs
</match>
|
Logstash configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| input {
file {
path => "/var/log/app/*.jsonl"
codec => "json_lines"
}
}
filter {
if [level] == "error" {
mutate {
add_tag => ["error"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
|
Data Pipelines with JSON Lines
ETL Example: Database Export to Data Warehouse
Step 1: Export from PostgreSQL to JSONL
1
2
3
4
5
6
7
8
9
10
| psql -d mydb -c "
SELECT json_build_object(
'id', id,
'name', name,
'email', email,
'created', created_at
)
FROM users
WHERE active = true
" -t | grep '{' > users.jsonl
|
Step 2: Transform with jq
1
2
3
4
5
6
| jq '{
user_id: .id,
full_name: .name,
email_address: .email,
signup_date: .created | split("T")[0]
}' users.jsonl > transformed.jsonl
|
Step 3: Load into data warehouse
1
2
3
4
5
6
7
8
9
| import json
def load_to_warehouse(filename):
with open(filename, 'r') as f:
for line in f:
record = json.loads(line)
warehouse_db.insert('users_dim', record)
load_to_warehouse('transformed.jsonl')
|
Kafka Messages
Kafka often uses JSON Lines for batch export/import:
Export Kafka topic to file:
1
2
3
4
5
| kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic events \
--from-beginning \
--max-messages 100000 > events.jsonl
|
Import to Kafka from file:
1
2
3
| cat events.jsonl | kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic events
|
Parallel Processing
GNU Parallel with JSONL:
1
2
3
4
5
6
7
8
| # Process file in parallel (4 workers)
cat large.jsonl | parallel --pipe -N 1000 'process-chunk.sh'
# Split, process, merge
split -l 10000 data.jsonl chunk_
ls chunk_* | parallel 'process-file.sh {} > {}.result'
cat chunk_*.result > final.jsonl
rm chunk_*
|
Process script (process-file.sh):
1
2
3
4
| #!/bin/bash
while IFS= read -r line; do
echo "$line" | jq '.processed = true'
done
|
MongoDB and JSON Lines
MongoDB’s mongoexport outputs JSONL by default:
Export Collection
1
2
3
4
5
6
7
8
| mongoexport \
--db myapp \
--collection users \
--out users.jsonl
# Output:
# {"_id":{"$oid":"507f1f77bcf86cd799439011"},"name":"Alice","email":"alice@example.com"}
# {"_id":{"$oid":"507f1f77bcf86cd799439012"},"name":"Bob","email":"bob@example.com"}
|
With query:
1
2
3
4
5
| mongoexport \
--db myapp \
--collection users \
--query '{"active": true}' \
--out active-users.jsonl
|
Specific fields:
1
2
3
4
5
| mongoexport \
--db myapp \
--collection users \
--fields name,email \
--out users-minimal.jsonl
|
Import from JSON Lines
1
2
3
4
5
6
7
8
9
10
11
| mongoimport \
--db myapp \
--collection users \
--file users.jsonl
# With upsert (update existing)
mongoimport \
--db myapp \
--collection users \
--file users.jsonl \
--mode upsert
|
Backup and Restore
Backup all collections:
1
2
3
| for collection in $(mongo mydb --quiet --eval 'db.getCollectionNames()' | tr ',' '\n'); do
mongoexport --db mydb --collection $collection --out "backup/${collection}.jsonl"
done
|
Restore:
1
2
3
4
| for file in backup/*.jsonl; do
collection=$(basename "$file" .jsonl)
mongoimport --db mydb --collection "$collection" --file "$file"
done
|
Real-World Use Cases
1. Application Logging
Setup (Node.js with pino):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| const pino = require('pino');
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
// File transport
transport: {
target: 'pino/file',
options: { destination: '/var/log/app/app.jsonl' }
}
});
// Usage throughout application
logger.info({userId: 123, action: 'purchase', amount: 99.99}, 'Order placed');
logger.error({error: err.message, stack: err.stack}, 'Payment failed');
logger.debug({query: sql, duration: 45}, 'Database query');
|
Analysis:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # Count log levels
jq -r '.level' /var/log/app/app.jsonl | sort | uniq -c
# Find slow queries (>100ms)
jq 'select(.duration > 100)' /var/log/app/app.jsonl
# Error rate over time
jq 'select(.level == "error") | .timestamp' /var/log/app/app.jsonl | \
cut -d'T' -f1 | \
uniq -c
# User activity
jq 'select(.userId) | {userId, action}' /var/log/app/app.jsonl
|
2. Data Science Workflows
Read large dataset:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import pandas as pd
# Read in chunks to avoid memory overflow
chunk_size = 10000
results = []
for chunk in pd.read_json('events.jsonl', lines=True, chunksize=chunk_size):
# Filter
filtered = chunk[chunk['event_type'] == 'purchase']
# Aggregate
daily_revenue = filtered.groupby(
pd.to_datetime(filtered['timestamp']).dt.date
)['amount'].sum()
results.append(daily_revenue)
# Combine results
total_revenue = pd.concat(results).groupby(level=0).sum()
print(total_revenue)
|
Write processed data:
1
2
3
4
5
6
7
8
9
10
11
| # Process and write in streaming fashion
with open('input.jsonl', 'r') as infile, open('output.jsonl', 'w') as outfile:
for line in infile:
record = json.loads(line)
# Transform
record['processed'] = True
record['processed_at'] = datetime.now().isoformat()
# Write immediately (don't accumulate)
outfile.write(json.dumps(record) + '\n')
|
3. Elasticsearch Bulk API
Elasticsearch uses JSONL for bulk operations:
{"index": {"_index": "users", "_id": 1}}
{"name": "Alice", "email": "alice@example.com", "age": 30}
{"index": {"_index": "users", "_id": 2}}
{"name": "Bob", "email": "bob@example.com", "age": 25}
{"update": {"_index": "users", "_id": 3}}
{"doc": {"status": "active"}}
{"delete": {"_index": "users", "_id": 4}}
Pattern: Action line, then data line (for index/update).
Bulk import:
1
2
3
| curl -X POST "localhost:9200/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @data.jsonl
|
Generate bulk file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| const fs = require('fs');
function generateBulk(records, filename) {
const stream = fs.createWriteStream(filename);
for (const record of records) {
// Action line
stream.write(JSON.stringify({
index: {_index: 'users', _id: record.id}
}) + '\n');
// Data line
stream.write(JSON.stringify(record) + '\n');
}
stream.end();
}
|
4. Machine Learning Training Data
TensorFlow datasets:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import tensorflow as tf
def parse_jsonl(filename):
dataset = tf.data.TextLineDataset(filename)
def parse_line(line):
parsed = tf.io.decode_json_example(line)
features = parsed['features']
label = parsed['label']
return features, label
return dataset.map(parse_line)
# Use for training
train_data = parse_jsonl('train.jsonl')
model.fit(train_data.batch(32), epochs=10)
|
PyTorch datasets:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import json
from torch.utils.data import IterableDataset
class JSONLDataset(IterableDataset):
def __init__(self, filename):
self.filename = filename
def __iter__(self):
with open(self.filename, 'r') as f:
for line in f:
record = json.loads(line)
features = record['features']
label = record['label']
yield features, label
# Use for training
dataset = JSONLDataset('train.jsonl')
dataloader = DataLoader(dataset, batch_size=32)
for batch in dataloader:
features, labels = batch
# Train model
|
5. Database Replication
Real-time change stream:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // MongoDB change stream to JSONL
const stream = fs.createWriteStream('changes.jsonl', {flags: 'a'});
collection.watch().on('change', (change) => {
stream.write(JSON.stringify(change) + '\n');
});
// Replay changes to replica
function replayChanges(filename) {
const rl = readline.createInterface({
input: fs.createReadStream(filename)
});
for await (const line of rl) {
const change = JSON.parse(line);
await applyChange(replicaDb, change);
}
}
|
6. API Response Streaming
Server sends JSONL for large result sets:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| app.get('/api/users/export', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
// Stream results from database
const cursor = db.collection('users').find().stream();
cursor.on('data', (doc) => {
res.write(JSON.stringify(doc) + '\n');
});
cursor.on('end', () => {
res.end();
});
});
// Client processes streaming response
const response = await fetch('/api/users/export');
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
const user = JSON.parse(line);
console.log('User:', user.name);
}
}
}
|
Fault Tolerance
Corrupted Lines Don’t Break Processing
Problem with JSON arrays:
1
2
3
4
5
| [
{"id": 1, "name": "Alice"},
{"id": 2, "name": CORRUPT},
{"id": 3, "name": "Carol"}
]
|
Result: Entire file unparseable. You get nothing.
JSON Lines:
{"id": 1, "name": "Alice"}
{"id": 2, "name": CORRUPT}
{"id": 3, "name": "Carol"}
Result: Lines 1 and 3 process successfully. Line 2 skipped with error message. You still get 2 of 3 records.
Resilient parser:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| async function processWithErrorHandling(filename) {
const fileStream = fs.createReadStream(filename);
const rl = readline.createInterface({input: fileStream});
let processed = 0;
let errors = 0;
for await (const line of rl) {
try {
const obj = JSON.parse(line);
await process(obj);
processed++;
} catch (err) {
errors++;
console.error(`Line ${processed + errors} failed: ${err.message}`);
// Continue processing remaining lines
}
}
console.log(`Success: ${processed}, Errors: ${errors}`);
}
|
Resumable Processing
Track progress:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| const fs = require('fs');
const readline = require('readline');
async function resumableProcess(filename, checkpointFile) {
// Load checkpoint
let lastProcessed = 0;
if (fs.existsSync(checkpointFile)) {
lastProcessed = parseInt(fs.readFileSync(checkpointFile, 'utf8'));
}
const fileStream = fs.createReadStream(filename);
const rl = readline.createInterface({input: fileStream});
let lineNum = 0;
for await (const line of rl) {
lineNum++;
// Skip already processed
if (lineNum <= lastProcessed) continue;
const obj = JSON.parse(line);
await process(obj);
// Save checkpoint every 1000 lines
if (lineNum % 1000 === 0) {
fs.writeFileSync(checkpointFile, lineNum.toString());
}
}
// Final checkpoint
fs.writeFileSync(checkpointFile, lineNum.toString());
}
// Usage
resumableProcess('large-dataset.jsonl', 'progress.txt');
// If process crashes, restart from checkpoint
// No need to reprocess completed lines
|
Append-Only Writes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| // Logger appends without locking entire file
class AppendOnlyLogger {
constructor(filename) {
this.stream = fs.createWriteStream(filename, {flags: 'a'});
}
log(obj) {
this.stream.write(JSON.stringify(obj) + '\n');
}
close() {
this.stream.end();
}
}
// Multiple processes can append safely
const logger = new AppendOnlyLogger('/var/log/app.jsonl');
setInterval(() => {
logger.log({
timestamp: new Date().toISOString(),
pid: process.pid,
memory: process.memoryUsage()
});
}, 60000);
|
Fault Tolerance Benefits:
- Corrupted lines are isolated (don’t affect other lines)
- Processing is resumable (checkpoint at any line)
- Append-only writes are safe (no file locking needed)
- Partial results available (process what you can)
Streaming vs Batch Processing
Batch (load all):
1
2
3
4
5
6
7
8
9
| // Load entire file
const data = JSON.parse(fs.readFileSync('data.json'));
// Process all
for (const record of data) {
await process(record);
}
// Memory: O(n), Time to start: O(n)
|
Streaming (one at a time):
1
2
3
4
5
6
7
8
9
10
11
12
| // Stream file
const rl = readline.createInterface({
input: fs.createReadStream('data.jsonl')
});
// Process each
for await (const line of rl) {
const record = JSON.parse(line);
await process(record);
}
// Memory: O(1), Time to start: O(1)
|
Key differences:
| Aspect | Batch Processing | Stream Processing |
|---|
| Memory usage | O(n) - entire file | O(1) - one record |
| Time to first record | Must parse all | Immediate |
| Large file handling | May run out of memory | Constant memory |
| Partial results | No | Yes |
| Resumable | No | Yes |
flowchart TB
subgraph batch["JSON Array (Batch)"]
load[Load entire file]
parse[Parse all records]
mem[Hold all in memory]
proc[Process all]
load --> parse --> mem --> proc
end
subgraph stream["JSON Lines (Stream)"]
read[Read one line]
parse2[Parse one record]
proc2[Process immediately]
discard[Discard from memory]
next[Next line]
read --> parse2 --> proc2 --> discard --> next
next -.Loop.-> read
end
file[(Large file)] --> batch
file --> stream
batch -.Requires memory
proportional to file size.-> risk[May run out of memory]
stream -.Uses constant memory
regardless of file size.-> success[Handles any size]
style batch fill:#4C3A3C,stroke:#6b7280,color:#f0f0f0
style stream fill:#3A4C43,stroke:#6b7280,color:#f0f0f0
Best Practices
1. One Object Per Line
Good:
{"id": 1, "name": "Alice"}
{"id": 2, "name": "Bob"}
Bad (multiline):
1
2
3
4
5
6
7
8
| {
"id": 1,
"name": "Alice"
}
{
"id": 2,
"name": "Bob"
}
|
Multiline breaks line-based processing tools (grep, wc, split).
2. Compact JSON (No Whitespace)
{"id":1,"name":"Alice","tags":["go","rust"]}
Not:
{ "id": 1, "name": "Alice", "tags": [ "go", "rust" ] }
Whitespace wastes space. Each record should be compact.
3. Handle Parse Errors Gracefully
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| for await (const line of rl) {
if (!line.trim()) continue; // Skip empty lines
try {
const obj = JSON.parse(line);
await process(obj);
successful++;
} catch (err) {
failed++;
logger.error({line: lineNum, error: err.message}, 'Parse failed');
// Continue processing other lines
}
}
console.log(`Processed: ${successful}, Failed: ${failed}`);
|
4. Use Newline as Separator Only
Correct:
{"text": "Line 1\nLine 2"}
{"text": "Single line"}
Note: Newlines inside JSON strings are escaped (\n). Only unescaped newlines separate records.
5. Add Timestamps for Time-Series
{"timestamp": "2023-01-15T10:00:00Z", "event": "user_login", "user_id": 123}
{"timestamp": "2023-01-15T10:00:01Z", "event": "page_view", "page": "/home"}
Makes time-based queries and sorting possible.
6. Include Record Version
{"_version": 1, "id": 1, "name": "Alice"}
{"_version": 2, "id": 1, "name": "Alice", "email": "alice@example.com"}
Enables schema evolution tracking and migration.
7. Compress Large Files
1
2
3
4
5
6
7
8
| # Write compressed JSONL
gzip -c data.jsonl > data.jsonl.gz
# Process compressed (streaming)
zcat data.jsonl.gz | jq 'select(.status == "active")'
# Store compressed, process on-the-fly
gunzip -c logs.jsonl.gz | grep error
|
Storage savings:
- JSONL: 500 MB
- JSONL + gzip: 85 MB (83% compression)
8. Use Line Buffering
1
2
3
4
5
6
7
| // Enable line buffering for real-time streaming
process.stdout._handle.setBlocking(true);
// Or use proper stream wrapper
const stream = fs.createWriteStream('output.jsonl', {
highWaterMark: 64 * 1024 // 64KB buffer
});
|
9. Validate Objects (Optional)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| const Ajv = require('ajv');
const ajv = new Ajv();
const validate = ajv.compile(schema);
for await (const line of rl) {
const obj = JSON.parse(line);
if (!validate(obj)) {
console.error('Validation failed:', validate.errors);
continue;
}
await process(obj);
}
|
10. File Rotation for Logs
1
2
3
4
5
6
7
8
9
10
11
| // Rotate logs by size or time
const rfs = require('rotating-file-stream');
const stream = rfs.createStream('app.jsonl', {
size: '100M', // Rotate every 100MB
interval: '1d', // Or daily
path: '/var/log/app',
compress: 'gzip' // Compress rotated files
});
logger.stream(stream);
|
jq - JSON processor
1
2
| brew install jq # macOS
apt-get install jq # Ubuntu
|
Miller - Like awk for structured data
1
2
| brew install miller
mlr --json filter '$status == "active"' users.jsonl
|
xsv - CSV/JSON toolkit
ndjson-cli - JSON Lines utilities
1
2
3
4
5
6
7
8
9
10
| npm install -g ndjson-cli
# Filter
ndjson-filter 'obj.status === "active"' < users.jsonl
# Map
ndjson-map '{id: obj.id, name: obj.name}' < users.jsonl
# Reduce
ndjson-reduce < events.jsonl
|
Libraries
Node.js:
ndjson - Streaming parser/serializerJSONStream - JSON stream parser
Go:
- Standard library
bufio.Scanner works perfectly encoding/json Decoder with Decode() in loop
Python:
- Pandas
read_json(..., lines=True) jsonlines library
Rust:
serde_json with Deserializer::from_reader
Common Patterns
1
2
3
4
5
6
7
8
| # Filter active users and extract emails
jq 'select(.active == true) | {email: .email}' users.jsonl > active-emails.jsonl
# Add field to all records
jq '. + {processed: true}' input.jsonl > output.jsonl
# Rename field
jq '{id, username: .name, email}' users.jsonl > renamed.jsonl
|
Aggregate and Group
1
2
3
4
5
6
7
8
| # Count by status
jq -r '.status' users.jsonl | sort | uniq -c
# Sum amounts
jq -s 'map(.amount) | add' transactions.jsonl
# Group by date
jq -r '.timestamp | split("T")[0]' events.jsonl | sort | uniq -c
|
Join Two JSONL Files
1
2
3
4
5
6
7
| # Create lookup map from first file
jq -r '{(.id): .email}' users.jsonl > user-emails.json
# Enrich second file
jq --slurpfile emails user-emails.json '
. + {email: $emails[0][.user_id | tostring]}
' orders.jsonl
|
Sample Large Files
1
2
3
4
5
6
7
8
| # Get 1% random sample
awk 'rand() < 0.01' large.jsonl > sample.jsonl
# Get every 100th line
awk 'NR % 100 == 0' large.jsonl > sample.jsonl
# Get first 10,000 lines
head -n 10000 large.jsonl > sample.jsonl
|
Split and Merge
1
2
3
4
5
6
7
8
9
10
11
| # Split large file
split -l 100000 huge.jsonl chunk_
# Process chunks in parallel
ls chunk_* | parallel 'process.sh {} > {}.result'
# Merge results
cat chunk_*.result > final.jsonl
# Cleanup
rm chunk_*
|
When NOT to Use JSON Lines
1. Human Editing Needed
JSON arrays are more readable:
1
2
3
4
| [
{"name": "Alice"},
{"name": "Bob"}
]
|
JSON Lines is harder to edit:
{"name": "Alice"}
{"name": "Bob"}
For configuration files edited by humans, standard JSON or YAML is better.
2. Small Datasets
For files under 10MB that fit comfortably in memory, standard JSON arrays are fine:
1
| const data = JSON.parse(fs.readFileSync('small.json'));
|
The streaming benefit doesn’t matter for small files.
3. Nested Relationships
JSON Lines works best for flat records. Complex nested relationships are harder:
JSON (good for nested):
1
2
3
4
5
6
7
8
9
10
| {
"user": {
"id": 1,
"name": "Alice",
"orders": [
{"id": 100, "amount": 50},
{"id": 101, "amount": 75}
]
}
}
|
JSONL (denormalized):
{"user_id": 1, "name": "Alice", "order_id": 100, "amount": 50}
{"user_id": 1, "name": "Alice", "order_id": 101, "amount": 75}
You must denormalize or reference IDs across files.
4. Need JSON Schema Validation of Structure
JSON Schema expects a single root object or array:
1
2
3
4
5
| {
"$schema": "...",
"type": "array",
"items": {...}
}
|
JSONL files have multiple root objects. You validate each line individually, not the file as a whole.
Conclusion: JSON Lines for Scale
JSON Lines is the pragmatic solution to JSON’s streaming problem. It’s not a new format - just a convention of using newlines to separate JSON objects.
Core Benefits
JSON Lines provides:
- Streaming processing (constant memory)
- Fault tolerance (corrupt lines isolated)
- Unix pipeline compatibility (grep, awk, sed, jq)
- Append-only writes (no file locking)
- Resumable processing (checkpoint at any line)
- Parallel processing (split into chunks)
Key patterns:
- One compact JSON object per line
- Use streaming parsers (don’t load entire file)
- Handle parse errors gracefully
- Checkpoint progress for long-running jobs
- Compress for storage (gzip, zstd)
- Rotate log files by size or time
When to use JSON Lines:
- Log files (application logs, access logs)
- Large datasets (analytics, ML training data)
- Data pipelines (ETL, stream processing)
- Database exports (MongoDB, PostgreSQL)
- Message streams (Kafka, queues)
- API streaming responses
When to avoid:
- Small datasets that fit in memory
- Human-edited configuration
- Complex nested relationships
- Need whole-file JSON Schema validation
Series Progress:
- Part 1: JSON’s origins and fundamental weaknesses
- Part 2: JSON Schema for validation and contracts
- Part 3: Binary JSON in databases (JSONB, BSON)
- Part 4: Binary JSON for APIs (MessagePack, CBOR)
- Part 5: JSON-RPC protocol and patterns
- Part 6 (this article): JSON Lines for streaming
- Part 7: Security (JWT, canonicalization, attacks)
In Part 6, we’ll complete the series with JSON security: JWT authentication, JWS/JWE encryption, canonicalization for signatures, and common JSON-based attacks (injection, deserialization, schema poisoning).
Next: Part 6 - JSON Security: Authentication, Encryption, and Attacks
Further Reading
Specifications:
Tools:
Libraries:
Real-World:
Related: