Jika Anda pernah membangun aplikasi berbasis gRPC, pasti sudah akrab dengan konsep interceptor. Di ekosistem gRPC, interceptor menempati peran layaknya middleware di framework web seperti Express (Node.js) atau Gin (Go). Namun, ada perbedaan fundamental: selain interceptor untuk unary RPC, gRPC punya mekanisme khusus untuk menangani streaming RPC, yaitu Stream Interceptor. Kali ini, kita akan membedah bagaimana menulis stream interceptor sendiri—mulai dari konsep, perancangan, hingga implementasi dan simulasi penggunaannya.
Kenapa Stream Interceptor Penting?
gRPC mendukung beberapa jenis RPC:
- Unary: Satu request, satu response.
- Server streaming: Satu request, banyak response.
- Client streaming: Banyak request, satu response.
- Bidirectional streaming: Banyak request, banyak response secara simultan.
Saat kita menghadapi streaming—baik dari sisi client, server, ataupun keduanya—middleware standar tidak cukup. Di sinilah stream interceptor menjadi penting karena:
- Dapat melakukan validasi atau manipulasi pada every message (bukan hanya di initial request/response).
- Bisa logging, monitoring, pembatasan rate per message, manipulasi konteks, hingga error handling pada aliran data.
- Membuka pintu ke berbagai fitur seperti metrik per stream, tracing, dsb.
Anatomy: Bagaimana Stream Interceptor Bekerja di gRPC Go?
Mari kita lihat signature–nya (versi Go):
type StreamServerInterceptor func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error
Argumen penting di sini:
srv
: implementasi service.ss
: objek stream (untuk kirim/terima message).info
: informasi stream (nama RPC, metode, dsb).handler
: fungsi “next”, mirip middleware handler di web.
Gambaran alur interceptor:
flowchart LR Client--Stream Msgs--> Server[gRPC Server] Server-->|Incoming Context| Interceptor Interceptor-->|Manipulate or Pass| Handler Handler-->|Responses| Interceptor Interceptor-->|Manipulate or Pass| Client
Studi Kasus: Membuat Logging Stream Interceptor
Mari implementasikan logging di setiap pesan pada stream server-side. Kita ingin tahu kapan client mengirim/menerima pesan, serta waktu proses setiap pesan.
1. Membungkus ServerStream
Pada gRPC Go, grpc.ServerStream
adalah interface untuk stream. Untuk intercept pesan, kita perlu membungkus metode RecvMsg
dan SendMsg
.
type wrappedStream struct {
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
log.Printf("[Interceptor] Received message: %#v, Error: %v", m, err)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("[Interceptor] Sending message: %#v", m)
return w.ServerStream.SendMsg(m)
}
2. Menulis Interceptor
Lalu, kita siapkan interceptor yang menggunakan wrappedStream
:
func LoggingStreamInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("[Interceptor] Stream started: %s", info.FullMethod)
err := handler(srv, &wrappedStream{ss})
if err != nil {
log.Printf("[Interceptor] Stream error: %v", err)
} else {
log.Printf("[Interceptor] Stream completed successfully")
}
return err
}
3. Register Interceptor di Server
Ketika membangun server:
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(LoggingStreamInterceptor),
)
4. Simulasi: Service Sederhana
Misal, service streaming upload:
service FileService {
rpc Upload(stream FileChunk) returns (UploadStatus);
}
message FileChunk {
bytes data = 1;
}
message UploadStatus {
string message = 1;
}
Handler:
func (s *fileServiceServer) Upload(stream pb.FileService_UploadServer) error {
var total int
for {
chunk, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UploadStatus{Message: "Received"})
}
if err != nil {
return err
}
total += len(chunk.Data)
// Processing chunk...
}
}
Ketika client streaming file, setiap chunk akan “di-log” oleh interceptor.
5. Output Simulasi
Action | Message | Data |
---|---|---|
Stream started | /FileService/Upload | - |
Received message | FileChunk{data:…} | len: 1024 bytes |
Received message | FileChunk{data:…} | len: 483 bytes |
Sending message | UploadStatus{…} | message: “Received” |
Stream completed | - | - |
Log real-time akan terlihat di konsol server.
Memodifikasi Interceptor: Menambah Fitur Rate Limiting
Ayo kembangkan interceptor kita: implementasi rate limiter berbasis pesan per detik.
1. Rate Limiter Sederhana
type rateLimitStream struct {
grpc.ServerStream
tokens chan struct{}
}
func NewRateLimitStream(ss grpc.ServerStream, rate int) *rateLimitStream {
rl := &rateLimitStream{
ServerStream: ss,
tokens: make(chan struct{}, rate),
}
// refill tokens setiap detik
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
for i := 0; i < rate; i++ {
select {
case rl.tokens <- struct{}{}:
default:
}
}
}
}()
return rl
}
func (w *rateLimitStream) RecvMsg(m interface{}) error {
<-w.tokens // akan blocking jika quota habis
return w.ServerStream.RecvMsg(m)
}
2. Integrasi ke Interceptor
func RateLimitStreamInterceptor(rate int) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
limited := NewRateLimitStream(ss, rate)
return handler(srv, limited)
}
}
3. Penggunaan di Server
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(RateLimitStreamInterceptor(5)), // max 5 pesan/detik
)
Ringkasan: Stream Interceptor adalah “Middleware on Steroids”
Membuat stream interceptor sendiri di gRPC sangat powerful untuk kebutuhan observabilitas, security, atau traffic shaping. Anda bisa membangun fitur logging granular, tracing, rate limiting, custom authentication, quota enforcement, dan berbagai kasus lain—semua dengan single point of change tanpa mengutak-atik handler service.
👇 Tips produksi:
- Jangan lupa handle context cancellation/error.
- Jika ingin multi-interceptor, gunakan chaining atau integrasi dengan third party (misal, go-grpc-middleware).
- Logging hati-hati, jangan log data sensitif.
- Ukur performa, overhead interceptor bisa memengaruhi throughput stream.
Kapan Anda akan menulis stream interceptor sendiri? Jika ingin kontrol penuh atas semua data yang streaming masuk/keluar server, inilah caranya. Selamat bereksperimen!
Referensi:
Jika artikel ini bermanfaat, silakan bookmark dan share ke rekan developer Anda. Sampai jumpa di eksperimen berikutnya! 🚀
10 Menyiapkan Playground GraphQL untuk Testing Query
Artikel Terhangat
33 Menulis Stream Interceptor Sendiri
07 Jul 2025
32 Menulis Unary Interceptor Sendiri
07 Jul 2025
31 Apa Itu Interceptor dalam gRPC?
07 Jul 2025
30 Studi Kasus: Streaming Progress Update
07 Jul 2025

33 Menulis Stream Interceptor Sendiri

32 Menulis Unary Interceptor Sendiri

31 Apa Itu Interceptor dalam gRPC?
