--- title: "nanonext - Scalability Protocols" vignette: > %\VignetteIndexEntry{nanonext - Scalability Protocols} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ``` r library(nanonext) ``` ### 1. Request Reply Protocol `nanonext` implements remote procedure calls (RPC) using NNG's req/rep protocol for distributed computing. Use this for computationally-expensive calculations or I/O-bound operations in separate server processes. **[S] Server process:** `reply()` waits for a message, applies a function, and sends back the result. Started in a background 'mirai' process. ``` r m <- mirai::mirai({ library(nanonext) rep <- socket("rep", listen = "tcp://127.0.0.1:6556") reply(context(rep), execute = rnorm, send_mode = "raw") Sys.sleep(2) # linger period to flush system socket send }) ``` **[C] Client process:** `request()` performs async send/receive, returning immediately with a `recvAio` object. ``` r req <- socket("req", dial = "tcp://127.0.0.1:6556") aio <- request(context(req), data = 1e8, recv_mode = "double") ``` The client can now run additional code while the server processes the request. ``` r # do more... ``` When the result is needed, call the recvAio using `call_aio()` to retrieve the value at `$data`. ``` r call_aio(aio)$data |> str() #> num [1:100000000] -0.63 0.883 1.134 -0.474 -0.237 ... ``` Since `call_aio()` blocks, alternatively query `aio$data` directly, which returns 'unresolved' (logical NA) if incomplete. For server-side operations (e.g., writing to disk), calling or querying the value confirms completion and provides the function's return value (typically NULL or an exit code). The [`mirai`](https://doi.org/10.5281/zenodo.7912722) package () uses `nanonext` as the back-end to provide asynchronous execution of arbitrary R code using the RPC model. ### 2. Publisher Subscriber Protocol `nanonext` implements NNG's pub/sub protocol. Subscribers can subscribe to one or multiple topics broadcast by a publisher. ``` r pub <- socket("pub", listen = "inproc://nanobroadcast") sub <- socket("sub", dial = "inproc://nanobroadcast") sub |> subscribe(topic = "examples") pub |> send(c("examples", "this is an example"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples" "this is an example" pub |> send("examples at the start of a single text message", mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples at the start of a single text message" pub |> send(c("other", "this other topic will not be received"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> 'errorValue' int 8 | Try again # specify NULL to subscribe to ALL topics sub |> subscribe(topic = NULL) pub |> send(c("newTopic", "this is a new topic"), mode = "raw") #> [1] 0 sub |> recv("character") #> [1] "newTopic" "this is a new topic" sub |> unsubscribe(topic = NULL) pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw") #> [1] 0 sub |> recv("character") #> 'errorValue' int 8 | Try again # however the topics explicitly subscribed to are still received pub |> send(c("examples will still be received"), mode = "raw") #> [1] 0 sub |> recv(mode = "character") #> [1] "examples will still be received" ``` The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received. ``` r sub |> subscribe(topic = 1) pub |> send(c(1, 10, 10, 20), mode = "raw") #> [1] 0 sub |> recv(mode = "double") #> [1] 1 10 10 20 pub |> send(c(2, 10, 10, 20), mode = "raw") #> [1] 0 sub |> recv(mode = "double") #> 'errorValue' int 8 | Try again close(pub) close(sub) ``` ### 3. Surveyor Respondent Protocol Useful for service discovery and similar applications. A surveyor broadcasts a survey to all respondents, who may reply within a timeout period. Late responses are discarded. ``` r sur <- socket("surveyor", listen = "inproc://nanoservice") res1 <- socket("respondent", dial = "inproc://nanoservice") res2 <- socket("respondent", dial = "inproc://nanoservice") # sur sets a survey timeout, applying to this and subsequent surveys sur |> survey_time(value = 500) # sur sends a message and then requests 2 async receives sur |> send("service check") #> [1] 0 aio1 <- sur |> recv_aio() aio2 <- sur |> recv_aio() # res1 receives the message and replies using an aio send function res1 |> recv() #> [1] "service check" res1 |> send_aio("res1") # res2 receives the message but fails to reply res2 |> recv() #> [1] "service check" # checking the aio - only the first will have resolved aio1$data #> [1] "res1" aio2$data #> 'unresolved' logi NA # after the survey expires, the second resolves into a timeout error msleep(500) aio2$data #> 'errorValue' int 5 | Timed out close(sur) close(res1) close(res2) ``` `msleep()` is an uninterruptible sleep function (using NNG) that takes a time in milliseconds. The final value resolves to a timeout error (integer 5 classed as 'errorValue'). All error codes are classed as 'errorValue' for easy distinction from integer message values.