tutorial

33 Menulis Stream Interceptor Sendiri

Jika Anda pernah membangun aplikasi berbasis gRPC, pasti sudah akrab dengan konsep interceptor. Di ekosistem gRPC, interceptor menempati peran layaknya middleware di framework web seperti Express (Node.js) atau Gin (Go). Namun, ada perbedaan fundamental: selain interceptor untuk unary RPC, gRPC punya mekanisme khusus untuk menangani streaming RPC, yaitu Stream Interceptor. Kali ini, kita akan membedah bagaimana menulis stream interceptor sendiri—mulai dari konsep, perancangan, hingga implementasi dan simulasi penggunaannya.

Kenapa Stream Interceptor Penting?

gRPC mendukung beberapa jenis RPC:

  • Unary: Satu request, satu response.
  • Server streaming: Satu request, banyak response.
  • Client streaming: Banyak request, satu response.
  • Bidirectional streaming: Banyak request, banyak response secara simultan.

Saat kita menghadapi streaming—baik dari sisi client, server, ataupun keduanya—middleware standar tidak cukup. Di sinilah stream interceptor menjadi penting karena:

  • Dapat melakukan validasi atau manipulasi pada every message (bukan hanya di initial request/response).
  • Bisa logging, monitoring, pembatasan rate per message, manipulasi konteks, hingga error handling pada aliran data.
  • Membuka pintu ke berbagai fitur seperti metrik per stream, tracing, dsb.

Anatomy: Bagaimana Stream Interceptor Bekerja di gRPC Go?

Mari kita lihat signature–nya (versi Go):

type StreamServerInterceptor func(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error

Argumen penting di sini:

  • srv: implementasi service.
  • ss: objek stream (untuk kirim/terima message).
  • info: informasi stream (nama RPC, metode, dsb).
  • handler: fungsi “next”, mirip middleware handler di web.

Gambaran alur interceptor:

flowchart LR
    Client--Stream Msgs--> Server[gRPC Server]
    Server-->|Incoming Context| Interceptor
    Interceptor-->|Manipulate or Pass| Handler
    Handler-->|Responses| Interceptor
    Interceptor-->|Manipulate or Pass| Client

Studi Kasus: Membuat Logging Stream Interceptor

Mari implementasikan logging di setiap pesan pada stream server-side. Kita ingin tahu kapan client mengirim/menerima pesan, serta waktu proses setiap pesan.

1. Membungkus ServerStream

Pada gRPC Go, grpc.ServerStream adalah interface untuk stream. Untuk intercept pesan, kita perlu membungkus metode RecvMsg dan SendMsg.

type wrappedStream struct {
    grpc.ServerStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    err := w.ServerStream.RecvMsg(m)
    log.Printf("[Interceptor] Received message: %#v, Error: %v", m, err)
    return err
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("[Interceptor] Sending message: %#v", m)
    return w.ServerStream.SendMsg(m)
}

2. Menulis Interceptor

Lalu, kita siapkan interceptor yang menggunakan wrappedStream:

func LoggingStreamInterceptor(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    log.Printf("[Interceptor] Stream started: %s", info.FullMethod)
    err := handler(srv, &wrappedStream{ss})
    if err != nil {
        log.Printf("[Interceptor] Stream error: %v", err)
    } else {
        log.Printf("[Interceptor] Stream completed successfully")
    }
    return err
}

3. Register Interceptor di Server

Ketika membangun server:

grpcServer := grpc.NewServer(
    grpc.StreamInterceptor(LoggingStreamInterceptor),
)

4. Simulasi: Service Sederhana

Misal, service streaming upload:

service FileService {
    rpc Upload(stream FileChunk) returns (UploadStatus);
}
message FileChunk {
    bytes data = 1;
}
message UploadStatus {
    string message = 1;
}

Handler:

func (s *fileServiceServer) Upload(stream pb.FileService_UploadServer) error {
    var total int
    for {
        chunk, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.UploadStatus{Message: "Received"})
        }
        if err != nil {
            return err
        }
        total += len(chunk.Data)
        // Processing chunk...
    }
}

Ketika client streaming file, setiap chunk akan “di-log” oleh interceptor.

5. Output Simulasi

ActionMessageData
Stream started/FileService/Upload-
Received messageFileChunk{data:…}len: 1024 bytes
Received messageFileChunk{data:…}len: 483 bytes
Sending messageUploadStatus{…}message: “Received”
Stream completed--

Log real-time akan terlihat di konsol server.

Memodifikasi Interceptor: Menambah Fitur Rate Limiting

Ayo kembangkan interceptor kita: implementasi rate limiter berbasis pesan per detik.

1. Rate Limiter Sederhana

type rateLimitStream struct {
    grpc.ServerStream
    tokens chan struct{}
}

func NewRateLimitStream(ss grpc.ServerStream, rate int) *rateLimitStream {
    rl := &rateLimitStream{
        ServerStream: ss,
        tokens:       make(chan struct{}, rate),
    }
    // refill tokens setiap detik
    go func() {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            <-ticker.C
            for i := 0; i < rate; i++ {
                select {
                case rl.tokens <- struct{}{}:
                default:
                }
            }
        }
    }()
    return rl
}

func (w *rateLimitStream) RecvMsg(m interface{}) error {
    <-w.tokens // akan blocking jika quota habis
    return w.ServerStream.RecvMsg(m)
}

2. Integrasi ke Interceptor

func RateLimitStreamInterceptor(rate int) grpc.StreamServerInterceptor {
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        limited := NewRateLimitStream(ss, rate)
        return handler(srv, limited)
    }
}

3. Penggunaan di Server

grpcServer := grpc.NewServer(
    grpc.StreamInterceptor(RateLimitStreamInterceptor(5)), // max 5 pesan/detik
)

Ringkasan: Stream Interceptor adalah “Middleware on Steroids”

Membuat stream interceptor sendiri di gRPC sangat powerful untuk kebutuhan observabilitas, security, atau traffic shaping. Anda bisa membangun fitur logging granular, tracing, rate limiting, custom authentication, quota enforcement, dan berbagai kasus lain—semua dengan single point of change tanpa mengutak-atik handler service.

👇 Tips produksi:

  • Jangan lupa handle context cancellation/error.
  • Jika ingin multi-interceptor, gunakan chaining atau integrasi dengan third party (misal, go-grpc-middleware).
  • Logging hati-hati, jangan log data sensitif.
  • Ukur performa, overhead interceptor bisa memengaruhi throughput stream.

Kapan Anda akan menulis stream interceptor sendiri? Jika ingin kontrol penuh atas semua data yang streaming masuk/keluar server, inilah caranya. Selamat bereksperimen!


Referensi:


Jika artikel ini bermanfaat, silakan bookmark dan share ke rekan developer Anda. Sampai jumpa di eksperimen berikutnya! 🚀

comments powered by Disqus