tutorial

27 Menambahkan Interceptor pada Streaming RPC

Streaming RPC (Remote Procedure Call) di gRPC membawa paradigma baru dalam komunikasi layanan modern. Tidak hanya efisien dan scalable—streaming RPC juga membuka jalan untuk mengirimkan data secara real-time dan dua arah, menjadikannya vital dalam banyak kasus pada microservices architecture.

Namun, dalam skenario produksi, kita tidak cukup hanya dengan mentransmisikan data. Seringkali, ada kebutuhan untuk menambahkan lapisan tambahan, seperti logging, authentication, rate limiting, observability, hingga metering. Di sinilah konsep interceptor berperan.

Pada artikel kali ini, saya akan membahas bagaimana cara menambahkan interceptor pada Streaming RPC di gRPC dengan Go, lengkap dengan contoh kode, simulasi skenario, serta diagram alur. Mari kita mulai dengan pemahaman dasar interceptors dulu.

Apa itu Interceptor?

Interceptor di gRPC mirip dengan middleware pada HTTP server. Mereka memberikan cara terstruktur untuk melakukan pre-processing atau post-processing pada request maupun response RPC—mulai dari validasi, audit, sampai meng-handle error secara konsisten.

Secara garis besar, ada dua tipe interceptors di gRPC:

  • Unary Interceptor: untuk unary RPC (single request & single response).
  • Stream Interceptor: untuk streaming RPC (client, server, dan bi-directional streaming).

Pada artikel ini, fokus kita adalah stream interceptor.

Ilustrasi Alur Interceptor

Untuk memberikan gambaran, berikut diagram alur pemrosesan request dengan stream interceptor:

sequenceDiagram
    participant Client
    participant Interceptor
    participant ServerHandler
    Client->>Interceptor: Kirim Stream Request
    Interceptor->>ServerHandler: Pre process
    ServerHandler->>Interceptor: Handler Response
    Interceptor->>Client: Post process lalu Kirim Response

Dengan skema ini, interceptor dapat menjalankan logika tambahan sebelum atau setelah handler utama dijalankan.


Studi Kasus: Logging Interceptor pada Streaming RPC

Use Case

Katakanlah kamu memiliki gRPC API berupa server streaming. Setiap kali klien melakukan streaming request, kita ingin mencatat semua data yang masuk dan keluar—tanpa merubah logic asli service.

Contoh service: streaming portfolio harga saham.

Proto Definition:

service PortfolioService {
    rpc StreamPrices (PriceRequest) returns (stream PriceUpdate) {};
}

Implementasi Interceptor di Golang

Mari kita breakdown cara implementasinya:

1. Struktur Interceptor

gRPC Go menyediakan ServerStream Interceptor dengan signature berikut:

type StreamServerInterceptor func(srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler) error
  • srv: implementasi service gRPC.
  • ss: interface stream.
  • info: metadata method.
  • handler: handler utama RPC,yang biasanya kita panggil di dalam interceptor.

2. Implementasi Logging Stream Interceptor

Berikut contoh interceptor sederhana yang akan mencatat kapan streaming dimulai, diakhiri, dan error yang muncul:

func LoggingStreamInterceptor(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler) error {

    log.Printf("==== Start streaming RPC: %s, IsClientStream=%v, IsServerStream=%v ====",
        info.FullMethod, info.IsClientStream, info.IsServerStream)

    err := handler(srv, ss) // jalankan main handler

    if err != nil {
        log.Printf(">>> Stream method %s error: %v", info.FullMethod, err)
    } else {
        log.Printf("---- Streaming %s done ----", info.FullMethod)
    }
    return err
}

3. Registrasi Interceptor pada Server

Saat membuat gRPC server, kita pasangkan stream interceptor melalui options:

server := grpc.NewServer(
    grpc.StreamInterceptor(LoggingStreamInterceptor),
)

Untuk multiple interceptor, gunakan chaining dari third-party package seperti grpc-middleware.

4. Menyisipkan Custom ServerStream

Jika ingin logging payload atau memonitor semua komunikasi streaming (data read/write), kita perlu membungkus grpc.ServerStream agar dapat mencegat setiap pesan yang dikirim atau diterima.

type loggingServerStream struct {
    grpc.ServerStream
}

func (l *loggingServerStream) SendMsg(m interface{}) error {
    log.Printf("[SendMsg] %v", m)
    return l.ServerStream.SendMsg(m)
}

func (l *loggingServerStream) RecvMsg(m interface{}) error {
    err := l.ServerStream.RecvMsg(m)
    log.Printf("[RecvMsg] %v", m)
    return err
}

Kemudian pada interceptor, stream default dibungkus dengan versi logging:

func LoggingStreamInterceptor(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler) error {

    wrapped := &loggingServerStream{ss}

    return handler(srv, wrapped)
}

Dengan pattern ini, setiap pesan streaming keluar/masuk akan dilog.


Simulasi Penggunaan

a. Client Mengirim Request

Client mengirim request untuk stream harga saham:

// Membuka streaming
stream, _ := client.StreamPrices(ctx, &PriceRequest{Symbol: "BBCA"})

for {
    update, err := stream.Recv()
    if err == io.EOF {
        break
    }
    log.Printf("Price update: %v", update)
}

b. Output Logging Server

Log server setelah interceptor aktif:

==== Start streaming RPC: /PortfolioService/StreamPrices, IsClientStream=false, IsServerStream=true ====
[SendMsg] &PriceUpdate{...}
[SendMsg] &PriceUpdate{...}
---- Streaming /PortfolioService/StreamPrices done ----

Perbandingan Unary vs Stream Interceptor

Unary InterceptorStream Interceptor
ScopesSingle request-responseLong-lived streams
Use CasesAuth, Logging, RetryObservability, Msg-level
Integrationgrpc.UnaryInterceptorgrpc.StreamInterceptor
Message LevelTidak bisa nested msgBisa wrap ServerStream

Kesimpulan

Menambahkan interceptor pada Streaming RPC di gRPC memberikan kekuatan ekstra bagi software engineer untuk mengelola cross-cutting concern tanpa mengotori kode logic utama. Mulai dari logging, authentication, hingga observability bisa disisipkan via interceptor, baik secara global maupun per-method.

Langkah praktis di atas dapat dikembangkan lebih lanjut: kamu bisa menambah rate limiting, audit, hingga pengamanan berbasis metadata. Tidak heran, interceptor telah menjadi fondasi krusial pada arsitektur gRPC modern.

Kalau kamu punya eksperimen lain dengan interceptor pada streaming RPC, silakan share di kolom komentar! Selamat bereksperimen dan happy coding! 🚀

comments powered by Disqus