1 package jsonrpc 2 3 import ( 4 "encoding/json" 5 "errors" 6 "io" 7 "sync" 8 ) 9 10 type wait struct { 11 keep bool 12 response func(json.RawMessage) 13 } 14 15 // ReadWriteCloser implements all methods of io.Reader, io.Writer, and io.Closer. 16 type ReadWriteCloser interface { 17 io.ReadWriter 18 io.Closer 19 } 20 21 type clientResponse struct { 22 ID int `json:"id"` 23 Result json.RawMessage `json:"result,omitempty"` 24 Error *Error `json:"error,omitempty"` 25 } 26 27 type clientRequest struct { 28 ID int `json:"id"` 29 Method string `json:"method"` 30 Params any `json:"params,omitempty"` 31 } 32 33 // Client represents a client connection to a JSONRPC server. 34 type Client struct { 35 encoder *json.Encoder 36 decoder *json.Decoder 37 closer io.Closer 38 nextID int 39 40 mu sync.Mutex 41 requests map[int]chan clientResponse 42 waits map[int]*wait 43 } 44 45 // NewClient create a new client from the given connection. 46 func NewClient(rw ReadWriteCloser) *Client { 47 c := &Client{ 48 encoder: json.NewEncoder(rw), 49 decoder: json.NewDecoder(rw), 50 closer: rw, 51 requests: make(map[int]chan clientResponse), 52 waits: make(map[int]*wait), 53 } 54 55 go c.respond() 56 57 return c 58 } 59 60 func (c *Client) respond() { 61 for { 62 var resp clientResponse 63 if err := c.decoder.Decode(&resp); err != nil { 64 return 65 } 66 67 c.mu.Lock() 68 if resp.ID >= 0 { 69 ch, ok := c.requests[resp.ID] 70 if ok { 71 delete(c.requests, resp.ID) 72 ch <- resp 73 } 74 } else { 75 w, ok := c.waits[resp.ID] 76 if ok { 77 if !w.keep { 78 delete(c.waits, resp.ID) 79 } 80 go w.response(resp.Result) 81 } 82 } 83 c.mu.Unlock() 84 } 85 } 86 87 // Request makes an RPC call to the connected server with the given method and 88 // params. 89 // 90 // The params will be JSON encoded. 91 // 92 // Returns the JSON encoded response from the server, or an error. 93 func (c *Client) Request(method string, params any) (json.RawMessage, error) { 94 ch := make(chan clientResponse) 95 c.mu.Lock() 96 id := c.nextID 97 c.nextID++ 98 c.requests[id] = ch 99 c.mu.Unlock() 100 c.encoder.Encode(clientRequest{ 101 ID: id, 102 Method: method, 103 Params: params, 104 }) 105 resp := <-ch 106 if resp.Error != nil { 107 return nil, resp.Error 108 } 109 return resp.Result, nil 110 } 111 112 // RequestValue acts as Request, but will unmarshal the response into the given 113 // value. 114 func (c *Client) RequestValue(method string, params any, response any) error { 115 respData, err := c.Request(method, params) 116 if err != nil { 117 return err 118 } 119 120 return json.Unmarshal(respData, response) 121 } 122 123 // Await will wait for a message pushed from the server with the given ID and 124 // call the given func with the JSON encoded data. 125 // 126 // The id given should be a negative value. 127 func (c *Client) Await(id int, cb func(json.RawMessage)) error { 128 return c.wait(id, cb, false) 129 } 130 131 // Subscribe will wait for all messages pushed from the server with the given 132 // ID and call the given func with the JSON encoded data for each one. 133 // 134 // The id given should be a negative value. 135 func (c *Client) Subscribe(id int, cb func(json.RawMessage)) error { 136 return c.wait(id, cb, true) 137 } 138 139 func (c *Client) wait(id int, cb func(json.RawMessage), keep bool) error { 140 c.mu.Lock() 141 defer c.mu.Unlock() 142 _, ok := c.waits[id] 143 if ok { 144 return ErrExisting 145 } 146 c.waits[id] = &wait{ 147 keep: keep, 148 response: cb, 149 } 150 return nil 151 } 152 153 // Close will stop all client goroutines and close the connection to the server. 154 func (c *Client) Close() error { 155 c.mu.Lock() 156 for _, r := range c.requests { 157 r <- clientResponse{ 158 Error: &Error{ 159 Message: "conn closed", 160 }, 161 } 162 } 163 c.mu.Unlock() 164 return c.closer.Close() 165 } 166 167 // Error 168 var ( 169 ErrExisting = errors.New("existing waiter") 170 ) 171