Database

Database Sharding: Scaling Beyond Single Server Limits

✍️ Taylson Martinez
13 min read
Database Sharding: Scaling Beyond Single Server Limits

Learn database sharding strategies to scale horizontally. Covers sharding keys, rebalancing, cross-shard queries, and real-world implementation.

🌐

This article is also available in Portuguese

Read in Portuguese →

What is Database Sharding?

Sharding is horizontal partitioning of data across multiple databases. Instead of one massive database, you split data across multiple smaller databases (shards).

Why Shard?

Single Database Limits:

  • Storage capacity (disk space)
  • Memory constraints (RAM)
  • CPU bottlenecks
  • I/O throughput limits

Sharding Benefits:

  • Linear scalability (add more servers)
  • Better performance (smaller datasets per shard)
  • Fault isolation (one shard failure doesn’t kill everything)
  • Geographic distribution (data close to users)

Vertical vs Horizontal Partitioning

Vertical Partitioning

Split by features/tables:

Database 1: Users, Auth
Database 2: Orders, Payments
Database 3: Products, Inventory

Horizontal Partitioning (Sharding)

Split same table across multiple databases:

Users Table:
Shard 1: Users 1-1,000,000
Shard 2: Users 1,000,001-2,000,000
Shard 3: Users 2,000,001-3,000,000

Sharding Strategies

1. Range-Based Sharding

Partition data by value ranges.

fun getShardForUser(userId: Long): String = when {
    userId <= 1_000_000 -> "shard-1"
    userId <= 2_000_000 -> "shard-2"
    userId <= 3_000_000 -> "shard-3"
    else -> "shard-4"
}

Pros:

  • Simple to implement
  • Easy to add new ranges
  • Range queries are efficient

Cons:

  • Uneven distribution (hotspots)
  • Rebalancing is complex

Best for: Time-series data (shard by date)

// Shard logs by month
fun getLogShard(timestamp: Long): String {
    val instant = Instant.ofEpochMilli(timestamp)
    val month = instant.atZone(ZoneId.systemDefault()).monthValue
    return "logs-2026-${month.toString().padStart(2, '0')}"
}

2. Hash-Based Sharding

Use hash function to determine shard.

fun getShardForUser(userId: Long): String {
    val hash = hashFunction(userId)
    val shardCount = 4
    val shardIndex = hash % shardCount
    return "shard-$shardIndex"
}

fun hashFunction(key: Any): Int {
    val str = key.toString()
    var hash = 0
    
    str.forEach { char ->
        hash = ((hash shl 5) - hash) + char.code
        hash = hash and hash // Convert to 32-bit integer
    }
    
    return kotlin.math.abs(hash)
}

Pros:

  • Even distribution
  • No hotspots

Cons:

  • Range queries are impossible
  • Resharding requires moving all data

Best for: Evenly distributed access patterns

3. Consistent Hashing

Minimizes data movement when adding/removing shards.

class ConsistentHash(
    shards: List<String>,
    private val virtualNodes: Int = 150
) {
    private val ring = mutableMapOf<Int, String>()
    private val shards = shards.toMutableList()
    private var sortedKeys: List<Int>
    
    init {
        // Create virtual nodes for each shard
        shards.forEach { shard ->
            repeat(virtualNodes) { i ->
                val virtualKey = "$shard-$i"
                val hash = hash(virtualKey)
                ring[hash] = shard
            }
        }
        
        // Sort ring by hash value
        sortedKeys = ring.keys.sorted()
    }
    
    fun getShard(key: String): String? {
        if (sortedKeys.isEmpty()) return null
        
        val hash = hash(key)
        
        // Find first node >= hash
        sortedKeys.forEach { nodeHash ->
            if (nodeHash >= hash) {
                return ring[nodeHash]
            }
        }
        
        // Wrap around to first node
        return ring[sortedKeys.first()]
    }
    
    private fun hash(key: String): Int {
        var hash = 0
        key.forEach { char ->
            hash = ((hash shl 5) - hash) + char.code
            hash = hash and hash
        }
        return kotlin.math.abs(hash)
    }
    
    fun addShard(shard: String) {
        shards.add(shard)
        
        repeat(virtualNodes) { i ->
            val virtualKey = "$shard-$i"
            val hash = hash(virtualKey)
            ring[hash] = shard
        }
        
        sortedKeys = ring.keys.sorted()
    }
}

// Usage
val sharding = ConsistentHash(listOf("shard-1", "shard-2", "shard-3"))
val shard = sharding.getShard("user-12345")
println(shard) // shard-2

Pros:

  • Minimal data movement on resharding
  • Even distribution

Cons:

  • More complex to implement

Best for: Dynamic sharding environments

4. Geographic Sharding

Shard by user location.

fun getShardForUser(userId: Long): String {
    val user = getUserLocation(userId)
    
    return when (user.country) {
        "US" -> "us-shard"
        "UK" -> "eu-shard"
        "JP" -> "asia-shard"
        else -> "default-shard"
    }
}

Best for:

  • Reducing latency
  • Data sovereignty compliance (GDPR)
  • Multi-region applications

5. Directory-Based Sharding

Maintain a lookup table for shard locations.

// Shard directory (typically in Redis or database)
val shardDirectory = mapOf(
    "user-1" to "shard-1",
    "user-2" to "shard-1",
    "user-3" to "shard-2",
    "user-4" to "shard-3",
    // ... millions of entries
)

fun getShardForUser(userId: Long): String? {
    return shardDirectory["user-$userId"]
}

Pros:

  • Flexible assignment
  • Easy to rebalance specific keys

Cons:

  • Directory becomes a bottleneck
  • Directory must be highly available

Solution: Cache directory in application memory, use Redis for persistence.

Choosing a Shard Key

The shard key determines data distribution. Choose wisely!

Good Shard Key Characteristics

  1. High Cardinality: Many unique values
  2. Even Distribution: No hotspots
  3. Immutable: Doesn’t change over time
  4. Query-Aligned: Supports common queries

Examples

// ✅ Good shard keys
userId          // High cardinality, immutable
email           // Unique, immutable
orderId         // High cardinality

// ❌ Bad shard keys
country         // Low cardinality (hotspots)
status          // Low cardinality
createdDate     // Changes over time (time-based hotspot)

Multi-Column Shard Keys

Combine multiple fields for better distribution:

// Shard by (tenant_id, user_id)
fun getShard(tenantId: String, userId: Long): Int {
    val key = "$tenantId-$userId"
    return hashFunction(key) % shardCount
}

Handling Cross-Shard Operations

The biggest challenge with sharding!

1. Cross-Shard Queries

// Query all shards and merge results
suspend fun searchUsers(searchQuery: String): List<User> {
    val shards = listOf("shard-1", "shard-2", "shard-3")
    
    val results = coroutineScope {
        shards.map { shard ->
            async {
                queryDatabase(shard, "SELECT * FROM users WHERE name LIKE ?", searchQuery)
            }
        }.awaitAll()
    }
    
    // Merge and sort results
    return results
        .flatten()
        .sortedBy { it.createdAt }
        .take(100) // Limit
}

Performance tip: Parallel queries reduce latency.

2. Cross-Shard Joins

Avoid if possible! If you must:

// Application-level join
suspend fun getUsersWithOrders(): List<UserWithOrders> {
    // Get users from user shard
    val users = queryUserShard("SELECT * FROM users")
    
    // Get orders for each user
    val userIds = users.map { it.id }
    val orders = coroutineScope {
        userIds.map { id ->
            async {
                queryOrderShard(id, "SELECT * FROM orders WHERE user_id = ?")
            }
        }.awaitAll()
    }
    
    // Join in application
    return users.mapIndexed { index, user ->
        UserWithOrders(
            user = user,
            orders = orders[index]
        )
    }
}

data class UserWithOrders(
    val user: User,
    val orders: List<Order>
)

Better solution: Denormalize data to avoid joins.

3. Cross-Shard Transactions

Challenge: ACID transactions across shards are very hard.

Solutions:

Two-Phase Commit (2PC):

async function transferMoney(fromUserId, toUserId, amount) {
  const fromShard = getShardForUser(fromUserId);
  const toShard = getShardForUser(toUserId);
  
  // Phase 1: Prepare
  const tx1 = await fromShard.begin();
  const tx2 = await toShard.begin();
  
  try {
    await tx1.execute('UPDATE accounts SET balance = balance - ? WHERE user_id = ?', [amount, fromUserId]);
    await tx2.execute('UPDATE accounts SET balance = balance + ? WHERE user_id = ?', [amount, toUserId]);
    
    // Phase 2: Commit
    await tx1.commit();
    await tx2.commit();
  } catch (error) {
    // Rollback both
    await tx1.rollback();
    await tx2.rollback();
    throw error;
  }
}

Saga Pattern (Better for microservices):

async function transferMoney(fromUserId, toUserId, amount) {
  const transferId = generateId();
  
  try {
    // Step 1: Debit from account
    await debitAccount(fromUserId, amount, transferId);
    
    // Step 2: Credit to account
    await creditAccount(toUserId, amount, transferId);
  } catch (error) {
    // Compensating transaction
    await refundAccount(fromUserId, amount, transferId);
    throw error;
  }
}

Rebalancing Shards

When adding new shards, you need to move data.

Planned Rebalancing

async function rebalanceShard(oldShard, newShard, keyRange) {
  // 1. Start dual writes (write to both shards)
  enableDualWrites(oldShard, newShard, keyRange);
  
  // 2. Copy existing data
  const data = await oldShard.query('SELECT * FROM users WHERE id BETWEEN ? AND ?', keyRange);
  await newShard.bulkInsert('users', data);
  
  // 3. Verify data integrity
  const verified = await verifyData(oldShard, newShard, keyRange);
  if (!verified) throw new Error('Data verification failed');
  
  // 4. Switch reads to new shard
  updateShardMapping(keyRange, newShard);
  
  // 5. Stop dual writes, delete from old shard
  disableDualWrites(oldShard, keyRange);
  await oldShard.delete('DELETE FROM users WHERE id BETWEEN ? AND ?', keyRange);
}

Monitoring Sharded Databases

Essential metrics:

// Per-shard metrics
{
  shard: 'shard-1',
  metrics: {
    storage_used: '450GB',
    query_latency_p95: '45ms',
    queries_per_second: 5000,
    connection_pool: { active: 80, idle: 20 },
    hottest_keys: ['user-12345', 'user-67890']
  }
}

Watch for:

  • Hotspots (one shard much busier)
  • Storage imbalance
  • Slow queries
  • Connection pool saturation

Real-World Example: Instagram

Instagram shards by user ID:

  1. User joins → assigned to least-loaded shard
  2. All user data (posts, follows) goes to same shard
  3. Shard lookup cached in Redis
  4. Cross-shard queries use fan-out approach

Best Practices

  1. Start with vertical partitioning before sharding
  2. Choose shard key carefully - hard to change later
  3. Avoid cross-shard operations when possible
  4. Denormalize data to keep related data together
  5. Monitor shard balance proactively
  6. Plan for rebalancing from day one
  7. Use consistent hashing for dynamic environments
  8. Cache shard mappings in application
  9. Implement circuit breakers for shard failures
  10. Test failover scenarios regularly

When NOT to Shard

  • You have < 1TB of data
  • Single server handles your load
  • You can scale vertically (bigger server)
  • Your team lacks distributed systems expertise

Remember: Sharding adds significant complexity. Exhaust all other options first!

Conclusion

Sharding is a powerful technique for scaling databases beyond single-server limits. Key takeaways:

  • Choose the right sharding strategy for your use case
  • Select a good shard key (high cardinality, immutable)
  • Minimize cross-shard operations
  • Plan for rebalancing
  • Monitor shard health continuously

Sharding is the last resort, not the first choice. But when you need it, implementing it correctly makes all the difference.

Happy sharding! 🔪