This tutorial was adapted from the grpc-go tutorial. It provides a basic introduction to working with gRPC and OCaml. By working through this example you will learn how to:
- Define a service in a
.proto
file - Generate server and client code
- Write a simple client and server
It assumes you are familiar with protocol buffers and basic OCaml. Note that the example in this tutorial uses the proto3 version of the protocol buffers language, you can find out more in the proto3 language guide.
Our example is a simple route mapping application that lets clients get information about features on their route, create a summary of their route, and exchange route information such as traffic updates with the server and other clients.
With gRPC we can define our service once in a .proto
file, and implement clients and servers in any of gRPC's supported languages, which in turn can be run in environments ranging from servers inside Google to your own tablet - all the complexity of communication between different languages and environments is handled for you by gRPC. We also get all the advantages of working with protocol buffers, including efficient serialization, a simple IDL, and easy interface updating.
To run the sample code and walk through this tutorial, the only prerequisite is OCaml itself. To get OCaml insalled follow the Up and Running guide on ocaml.org.
Clone the OCaml gRPC repository:
$ git clone https://github.com/dialohq/ocaml-grpc
$ cd ocaml-grpc
Run the server
$ dune exec -- routeguide-server ./examples/routeguide/data/route_guide_db.json
In a separate shell, run the client
$ dune exec -- routeguide-client
You should see some logging output in both terminal windows. On the shell where you ran the client binary, you should see the output of the bi-directional streaming rpc, printing 1 line per second.
NOTE = RouteNote { location: Some(Point { latitude: 409146139, longitude: -746188906 }), message: "at 1.000319208s" }
If you scroll up you should see the output of the other three request types; simple rpc, server-side streaming, and client-side streaming.
For this tutorial, we will start by creating a new OCaml project with Dune:
$ dune init project routeguide
$ cd routeguide
$ opam switch create . 5.0.0 --deps-only --with-test -y
ocaml-grpc
works on OCaml 4.11
and above, the latest 5.0.0
version will give the best results. This tutorial uses EIO, a concurrent programming library for OCaml, other options exist like Async or LWT, example code for both exists in examples.
Our first step is to define the gRPC service and the method request and response types using protocol buffers. We will keep our .proto
files in a directory in our project's root. There is no requirement where the .proto
definitions live, Dune will build them regardless.
$ mkdir proto && touch proto/route_guide.proto
You can see the complete .proto
file in examples/routeguide/proto/route_guide.proto.
To define a service, first we define a *service` in your .proto file:
service RouteGuide {
...
}
Then you define rpc
methods inside this service definition, specifying their request and response types. gRPC lets you define four kinds of service methods, all of which will be used in the RouteGuide
service:
- A simple RPC where the client sends a request to the server and waits for a response to come back. Just like a normal function call.
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- A server-side streaming RPC where the client sends a request to the server and gets a stream to read back a seqeuence of messages. The client reads from the returned stream until there are no more messages. In the example, it specifies a server-side streaming method by placing the
stream
keyword before the response type.
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- A client-side streaming RPC where the client writes a sequence of messages and sends them to the server. Once the client has finished writing the messages, it waits for the server to read them all and return its response. To use client-side streaming, the
stream
keword is placed before the request type.
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- A bi-directional streaming RPC where both sides send a sequence of messages. The two streams operate independently, so clients and servers can read and write in whatever order they like. For example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each steam is preserved. You specify this type of method by placing the
stream
keyword before both the request and response types.
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
The .proto
file also contains protocol buffer message type defintions for the request and response types used in the service methods. For example here is the Point
message type:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
OCaml gRPC can be configured to generate code as part of dune's normal build process. This is very convenient because once we've set everything up, there is no extra step to keep the generated code and our .proto
definitions in sync.
OCaml gRPC provides a pluggable approach to protocol buffer serialisation and code generation. For this tutorial we use ocaml-protoc-plugin
which has good coverage of the protobuf spec. Consult the comparison with other OCaml protobuf implementations for more details.
Edit the dune-project
to add ocaml-protoc-plugin
as a dependency. Then add a dune
file into the proto
directory alongside the route_guide.proto
.
(library
(name routeguide)
(package grpc-examples)
(preprocess
(pps ppx_deriving.show ppx_deriving.eq))
(libraries ocaml-protoc-plugin))
(rule
(targets route_guide.ml)
(deps
(:proto route_guide.proto))
(action
(run
protoc
-I
.
"--ocaml_out=annot=[@@deriving show { with_path = false }, eq]:."
%{proto})))
$ dune build proto
That's it. The generated code contains
- Modules for the message types
Point
,Rectangle
,Feature
,RouteNote
andRouteSummary
. - A module for the
RouteGuide
service with the RPC operations defined as modules.
Don't worry about the generated API, we will cover it in detail as we implement RouteGuide
service. If you are curious as to where the generated files are, keep reading. The mystery will be revealed soon! We can now move on to the fun part.
First let's look at how we create a RouteGuide
server. If you are only interested in creating gRPC clients, you can skip this section and go straight to Creating the client (though you might find it interesting anyway!).
There are two parts to making out RouteGuide
service do its job:
- Implementing the service interface generated from our service definition, doing the actual work of our service.
- Running a gRPC server to listen for requests from clients and dispatch them to the right service implementation.
You can find our example RouteGuide
server in examples/routeguide/src/server.ml. Let's take a closer look at how it works.
As you can see, our server uses the Service
module from Grpc_eio
to build up a service implementation.
The individual service functions from our proto definition are implemented using add_rpc
with matching names and rpc types, which must match the route_guide.proto
definitions.
let route_guide_service clock =
Server.Service.(
v ()
|> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature)
|> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features)
|> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock))
|> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat)
|> handle_request)
let server clock =
Server.(
v ()
|> add_service ~name:"routeguide.RouteGuide"
~service:(route_guide_service clock))
Let's look at the simplest type first, GetFeature
which just gets a Point
from the client and returns the corresponding feature information from its database in a Feature
.
let get_feature (buffer : string) =
let decode, encode = Service.make_service_functions RouteGuide.getFeature in
(* Decode the request. *)
let point =
Reader.create buffer |> decode |> function
| Ok v -> v
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s" (Result.show_error e))
in
Eio.traceln "GetFeature = {:%s}" (Point.show point);
(* Lookup the feature and if found return it. *)
let feature =
List.find_opt
(fun (f : Feature.t) ->
match (f.location, point) with
| Some p1, p2 -> Point.equal p1 p2
| _, _ -> false)
!features
in
Eio.traceln "Found feature %s"
(feature |> Option.map Feature.show |> Option.value ~default:"Missing");
match feature with
| Some feature ->
(Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents))
| None ->
(* No feature was found, return an unnamed feature. *)
( Grpc.Status.(v OK),
Some (Feature.make ~location:point () |> encode |> Writer.contents) )
The method is passed the client's Point
protocol buffer request. It decodes the request into a Point.t
and uses that to look up the feature. It returns a Feature
protocol buffer object with the response information indicating the successful response, based on the feature found or an unnamed default feature.
Now let's look at one of our streaming RPCs. list_features
is a server-side streaming RPC, so we need to send back multiple Feature
s to our client.
let list_features (buffer : string) (f : string -> unit) =
(* Decode request. *)
let decode, encode = Service.make_service_functions RouteGuide.listFeatures in
let rectangle =
Reader.create buffer |> decode |> function
| Ok v -> v
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s" (Result.show_error e))
in
(* Lookup and reply with features found. *)
let () =
List.iter
(fun (feature : Feature.t) ->
if in_range (Option.get feature.location) rectangle then
encode feature |> Writer.contents |> f
else ())
!features
in
Grpc.Status.(v OK)
Like get_feature
list_feature
's input is a single message. A Rectangle
that is decoded from a string buffer. The f: (string -> unit)
function is for writing the encoded responses back to the client. In the function we decode the request, lookup any matching features and stream them back to the client as we find them using f
. Once we've looked at all the features
we respond with an OK
indicating the streaming has finished successfully.
Now let's look at something a little more complicated: the client-side streaming function RecordRoute
, where we get a stream of Point
s from the client and return a single RouteSummary
with information about their trip. As you can see, this time the method gets a string Seq.t
representing the stream of points from the client. It decodes the stream of points, performs some calculations while accumulating the result, and finally responds with a route summary.
let record_route (clock : _ Eio.Time.clock) (stream : string Seq.t) =
Eio.traceln "RecordRoute";
let last_point = ref None in
let start = Eio.Time.now clock in
let decode, encode = Service.make_service_functions RouteGuide.recordRoute in
let point_count, feature_count, distance =
Seq.fold_left
(fun (point_count, feature_count, distance) i ->
let point =
Reader.create i |> decode |> function
| Ok v -> v
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error e))
in
Eio.traceln " ==> Point = {%s}" (Point.show point);
(* Increment the point count *)
let point_count = point_count + 1 in
(* Find features *)
let feature_count =
List.find_all
(fun (feature : Feature.t) ->
Point.equal (Option.get feature.location) point)
!features
|> fun x -> List.length x + feature_count
in
(* Calculate the distance *)
let distance =
match !last_point with
| Some last_point -> calc_distance last_point point
| None -> distance
in
last_point := Some point;
(point_count, feature_count, distance))
(0, 0, 0) stream
in
let stop = Eio.Time.now clock in
let elapsed_time = int_of_float (stop -. start) in
let summary =
RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time ()
in
Eio.traceln "RecordRoute exit\n";
(Grpc.Status.(v OK), Some (encode summary |> Writer.contents))
Finally, let's look at our bidirectional streaming RPC route_chat
, which receives a stream of RouteNote
s and returns a stream of RouteNote
s.
let route_chat (stream : string Seq.t) (f : string -> unit) =
Printf.printf "RouteChat\n";
let decode, encode = Service.make_service_functions RouteGuide.routeChat in
Seq.iter
(fun i ->
let note =
Reader.create i |> decode |> function
| Ok v -> v
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error e))
in
Printf.printf " ==> Note = {%s}\n" (RouteNote.show note);
encode note |> Writer.contents |> f)
stream;
Printf.printf "RouteChat exit\n";
Grpc.Status.(v OK)
route_chat
receives a string Seq.t
of requests which it decodes, logs to stdout to show it has received the note, and then encodes again to send back to the client. Finally it responds with an OK
indicating it has finished. The logic is we receive one RouteNote
and respond directly with the same RouteNote
using the f
function supplied.
Once we've implemented all our functions, we also need to startup a gRPC server so that clients can actually use our service. This is how our it looks:
let serve server env =
let port = 8080 in
let net = Eio.Stdenv.net env in
let clock = Eio.Stdenv.clock env in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in
Eio.Switch.run @@ fun sw ->
let handler = connection_handler ~sw (server clock) in
let server_socket =
Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr
in
let rec listen () =
Eio.Net.accept_fork ~sw server_socket
~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn))
handler;
listen ()
in
Eio.traceln "Listening on port %i for grpc requests\n" port;
listen ()
let () =
let path =
if Array.length Sys.argv > 1 then Sys.argv.(1)
else failwith "Path to datafile required."
in
(* Load features. *)
features := load path;
Eio_main.run (serve server)
To handle requests we use h2-lwt-unix
, an implementation of the HTTP/2 specification entirely in OCaml. What that means is we can swap in other h2 implementations like MirageOS to run in a Unikernel or Async to use JaneStreet's alternatve async implementation. Furthermore we can add TLS or SSL encryptionon to our HTTP/2 stack.
In this section, we will look at creating a gRPC client for our RouteGuide
service. You can see our complete example client code in examples/routeguide/src/client.ml.
To call service methods, we first need to create a H2 connection to communicate with the server. This connection will get reused by all calls to the server.
let client ~sw host port network =
let inet, port =
Eio_unix.run_in_systhread (fun () ->
Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ])
|> List.filter_map (fun (addr : Unix.addr_info) ->
match addr.ai_addr with
| Unix.ADDR_UNIX _ -> None
| ADDR_INET (addr, port) -> Some (addr, port))
|> List.hd
in
let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in
let socket = Eio.Net.connect ~sw network addr in
H2_eio.Client.create_connection ~sw ~error_handler:ignore socket
To call service methods, we take the H2 connection and build up a gRPC call for the service method using Client.call
from the Client module.
Calling the simple RPC get_feature
requires building up a Client.call
representation that matches the proto defintion, filling in the labelled arguments ~service
and ~rpc
with the matching service implementations. ~do_request
gets the H2 function for writing to a Http/2 Body. The real work is done in the ~handler
function using Client.Rpc.unary
to setup a Unary RPC response handler, which sends the encoded Point
and calls the function ~f
with the response. Here we decode the response and log it to the console.
let call_get_feature connection point =
let encode, decode = Service.make_client_functions RouteGuide.getFeature in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary
(encode point |> Writer.contents)
~f:(fun response ->
match response with
| Some response -> (
Reader.create response |> decode |> function
| Ok feature -> feature
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error e)))
| None -> Feature.make ()))
()
in
match response with
| Ok (res, _ok) -> Printf.printf "RESPONSE = {%s}" (Feature.show res)
| Error _ -> Printf.printf "an error occurred"
Here we call the server-side streaming method list_features
, which returns a stream of geographical Feature
s.
let print_features connection =
let rectangle =
Rectangle.make
~lo:(Point.make ~latitude:400000000 ~longitude:(-750000000) ())
~hi:(Point.make ~latitude:420000000 ~longitude:(-730000000) ())
()
in
let encode, decode = Service.make_client_functions RouteGuide.listFeatures in
let stream =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.server_streaming
(encode rectangle |> Writer.contents)
~f:(fun responses ->
let stream =
Seq.map
(fun str ->
Reader.create str |> decode |> function
| Ok feature -> feature
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error e)))
responses
in
stream))
()
in
match stream with
| Ok (results, _ok) ->
Seq.iter
(fun f -> Printf.printf "RESPONSE = {%s}" (Feature.show f))
results
| Error e ->
failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e))
As in the simple RPC we pass a single request value. However, instead of getting back a single value we get a stream of Feature
s. We use Seq.map
to iterate over the stream and decode each into a Feature.t
and then print out the features when they are all decoded. Equally we could have printed the features as they are being decoded inside the Seq.map
rather than gathering them all into a List and printing them at the end. Notice that the type signature for Client.RPC.server_streaming
is similar to unary
in that we provide an encoded request and provide a handler function to consume the response.
The client-side streaming method record_route
takes a stream of Point
s and returns a single RouteSummary
value.
let random_point () : Point.t =
let latitude = (Random.int 180 - 90) * 10000000 in
let longitude = (Random.int 360 - 180) * 10000000 in
Point.make ~latitude ~longitude ()
We create a function for generating random points and then use that to generate a sequence of 100 points to record on our route.
let run_record_route connection =
let points =
Random.int 100
|> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1))
in
let encode, decode = Service.make_client_functions RouteGuide.recordRoute in
let response =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.client_streaming ~f:(fun f response ->
(* Stream points to server. *)
Seq.iter
(fun point ->
encode point |> Writer.contents |> fun x -> Seq.write f x)
points;
(* Signal we have finished sending points. *)
Seq.close_writer f;
(* Decode RouteSummary responses. *)
Eio.Promise.await response |> function
| Some str -> (
Reader.create str |> decode |> function
| Ok feature -> feature
| Error err ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error err)))
| None -> failwith (Printf.sprintf "No RouteSummary received.")))
()
in
match response with
| Ok (result, _ok) ->
Printf.printf "SUMMARY = {%s}" (RouteSummary.show result)
| Error e ->
failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e))
With this stream of points we setup another handler using Client.Rpc.client_streaming
. The type of the callback arguments is important to understand, f
is the function for sending data down the gRPC stream to the server. Calling it with f (Some value)
will send the value to the server, while calling it with f None
signals that we have finished streaming. Here you can see we iterate over all the points and call f
with Some value, and when we have sent everything we call f None
to signal we are finished. Then we decode the response
provided and print it out.
Finally, let's look at our bidirectional streaming RPC. THe route_chat
method takes a stream of RouteNotes
and returns either another stream of RouteNotes
or an error.
let run_route_chat clock connection =
(* Generate locations. *)
let location_count = 5 in
Printf.printf "Generating %i locations\n" location_count;
let route_notes =
location_count
|> Seq.unfold (function
| 0 -> None
| x ->
Some
( RouteNote.make ~location:(random_point ())
~message:(Printf.sprintf "Random Message %i" x)
(),
x - 1 ))
in
We start by generating a short sequence of locations, similar to how we did for record_route
.
let encode, decode = Service.make_client_functions RouteGuide.routeChat in
let rec go writer reader notes =
match Seq.uncons notes with
| None ->
Seq.close_writer writer (* Signal no more notes from the client. *)
| Some (route_note, xs) -> (
encode route_note |> Writer.contents |> fun x ->
Seq.write writer x;
(* Yield and sleep, waiting for server reply. *)
Eio.Time.sleep clock 1.0;
Eio.Fiber.yield ();
match Seq.uncons reader with
| None -> failwith "Expecting response"
| Some (response, reader') ->
let route_note =
Reader.create response |> decode |> function
| Ok route_note -> route_note
| Error e ->
failwith
(Printf.sprintf "Could not decode request: %s"
(Result.show_error e))
in
Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note);
go writer reader' xs)
in
let result =
Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat"
~do_request:(H2_eio.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.bidirectional_streaming ~f:(fun writer reader ->
go writer reader route_notes))
()
in
match result with
| Ok ((), _ok) -> ()
| Error e ->
failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e))
Then we again use the Client.Rpc
module to setup a bidirectional_streaming
function with an interesting type signature val bidirectional_streaming f:(string Seq.writer -> string Seq.t -> 'a) -> 'a handler
. Somewhat intimidating but hopefully understandable in context. The function f
represents the writer function for sending notes to the server, with the same semantics as before. Calling it with Some value
represents sending a value to the stream and f None
means there is no more data to write. The string Seq.t
is the stream of record_note
responses coming back from the server, which we need to decode and print out. We define a recursive function go
to fold over the list, sending route_notes
, sleeping to wait for a server response, and printing out that response. When we run out of route_notes
to send we call Seq.close_writer f
to tell the server we are done and it can stop listening.
Other combinations of sending and receiving are possible, the reader is encouraged to try them out.
$ dune exec -- routeguide-server ./examples/routeguide/data/route_guide_db.json
$ dune exec -- routeguide-client