---
title: "nanonext - Scalability Protocols"
vignette: >
  %\VignetteIndexEntry{nanonext - Scalability Protocols}
  %\VignetteEngine{litedown::vignette}
  %\VignetteEncoding{UTF-8}
---




``` r
library(nanonext)
```

### 1. Request Reply Protocol

`nanonext` implements remote procedure calls (RPC) using NNG's req/rep protocol for distributed computing. Use this for computationally-expensive calculations or I/O-bound operations in separate server processes.

**[S] Server process:** `reply()` waits for a message, applies a function, and sends back the result. Started in a background 'mirai' process.


``` r
m <- mirai::mirai({
  library(nanonext)
  rep <- socket("rep", listen = "tcp://127.0.0.1:6556")
  reply(context(rep), execute = rnorm, send_mode = "raw")
  Sys.sleep(2) # linger period to flush system socket send
})

```

**[C] Client process:** `request()` performs async send/receive, returning immediately with a `recvAio` object.


``` r
req <- socket("req", dial = "tcp://127.0.0.1:6556")
aio <- request(context(req), data = 1e8, recv_mode = "double")

```
The client can now run additional code while the server processes the request.


``` r
# do more...
```

When the result is needed, call the recvAio using `call_aio()` to retrieve the value at `$data`.


``` r
call_aio(aio)$data |> str()
#>  num [1:100000000] -0.63 0.883 1.134 -0.474 -0.237 ...
```

Since `call_aio()` blocks, alternatively query `aio$data` directly, which returns 'unresolved' (logical NA) if incomplete.

For server-side operations (e.g., writing to disk), calling or querying the value confirms completion and provides the function's return value (typically NULL or an exit code).

The [`mirai`](https://doi.org/10.5281/zenodo.7912722) package (<https://mirai.r-lib.org/>) uses `nanonext` as the back-end to provide asynchronous execution of arbitrary R code using the RPC model.

### 2. Publisher Subscriber Protocol

`nanonext` implements NNG's pub/sub protocol. Subscribers can subscribe to one or multiple topics broadcast by a publisher.


``` r
pub <- socket("pub", listen = "inproc://nanobroadcast")
sub <- socket("sub", dial = "inproc://nanobroadcast")

sub |> subscribe(topic = "examples")

pub |> send(c("examples", "this is an example"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples"           "this is an example"

pub |> send("examples at the start of a single text message", mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples at the start of a single text message"

pub |> send(c("other", "this other topic will not be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> 'errorValue' int 8 | Try again

# specify NULL to subscribe to ALL topics
sub |> subscribe(topic = NULL)
pub |> send(c("newTopic", "this is a new topic"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> [1] "newTopic"            "this is a new topic"

sub |> unsubscribe(topic = NULL)
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> 'errorValue' int 8 | Try again

# however the topics explicitly subscribed to are still received
pub |> send(c("examples will still be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples will still be received"
```

The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received.


``` r
sub |> subscribe(topic = 1)
pub |> send(c(1, 10, 10, 20), mode = "raw")
#> [1] 0
sub |> recv(mode = "double")
#> [1]  1 10 10 20
pub |> send(c(2, 10, 10, 20), mode = "raw")
#> [1] 0
sub |> recv(mode = "double")
#> 'errorValue' int 8 | Try again

close(pub)
close(sub)

```

### 3. Surveyor Respondent Protocol

Useful for service discovery and similar applications. A surveyor broadcasts a survey to all respondents, who may reply within a timeout period. Late responses are discarded.


``` r
sur <- socket("surveyor", listen = "inproc://nanoservice")
res1 <- socket("respondent", dial = "inproc://nanoservice")
res2 <- socket("respondent", dial = "inproc://nanoservice")

# sur sets a survey timeout, applying to this and subsequent surveys
sur |> survey_time(value = 500)

# sur sends a message and then requests 2 async receives
sur |> send("service check")
#> [1] 0
aio1 <- sur |> recv_aio()
aio2 <- sur |> recv_aio()

# res1 receives the message and replies using an aio send function
res1 |> recv()
#> [1] "service check"
res1 |> send_aio("res1")

# res2 receives the message but fails to reply
res2 |> recv()
#> [1] "service check"

# checking the aio - only the first will have resolved
aio1$data
#> [1] "res1"
aio2$data
#> 'unresolved' logi NA

# after the survey expires, the second resolves into a timeout error
msleep(500)
aio2$data
#> 'errorValue' int 5 | Timed out

close(sur)
close(res1)
close(res2)

```

`msleep()` is an uninterruptible sleep function (using NNG) that takes a time in milliseconds.

The final value resolves to a timeout error (integer 5 classed as 'errorValue'). All error codes are classed as 'errorValue' for easy distinction from integer message values.
