1use std::sync::Arc;
2
3use crate::{
4 Error,
5 client::Inner,
6 common::response::ResponseStream,
7 transport::endpoint::Endpoint,
8 transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12 AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13 BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14 ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15 LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16 LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17 LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18 QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19 TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24 inner: Arc<Inner>,
25}
26
27impl StocksClient {
28 pub(crate) fn new(inner: Arc<Inner>) -> Self {
29 Self { inner }
30 }
31
32 pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33 self.ensure_credentials()?;
34 request.validate()?;
35 self.inner
36 .http
37 .get_json(
38 &self.inner.base_url,
39 Endpoint::StocksBars,
40 &self.inner.auth,
41 request.to_query(),
42 )
43 .await
44 }
45
46 pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
47 self.ensure_credentials()?;
48 request.validate()?;
49 self.inner
50 .http
51 .get_json(
52 &self.inner.base_url,
53 Endpoint::StocksAuctions,
54 &self.inner.auth,
55 request.to_query(),
56 )
57 .await
58 }
59
60 pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
61 self.ensure_credentials()?;
62 let client = self.clone();
63
64 collect_all(request, move |request| {
65 let client = client.clone();
66 async move { client.auctions(request).await }
67 })
68 .await
69 }
70
71 pub async fn auctions_single(
72 &self,
73 request: AuctionsSingleRequest,
74 ) -> Result<AuctionsSingleResponse, Error> {
75 self.ensure_credentials()?;
76 request.validate()?;
77 let endpoint = Endpoint::StocksAuctionsSingle {
78 symbol: request.symbol.clone(),
79 };
80 self.inner
81 .http
82 .get_json(
83 &self.inner.base_url,
84 endpoint,
85 &self.inner.auth,
86 request.to_query(),
87 )
88 .await
89 }
90
91 pub async fn auctions_single_all(
92 &self,
93 request: AuctionsSingleRequest,
94 ) -> Result<AuctionsSingleResponse, Error> {
95 self.ensure_credentials()?;
96 let client = self.clone();
97
98 collect_all(request, move |request| {
99 let client = client.clone();
100 async move { client.auctions_single(request).await }
101 })
102 .await
103 }
104
105 pub fn auctions_stream(
106 &self,
107 request: AuctionsRequest,
108 ) -> ResponseStream<Result<AuctionsResponse, Error>> {
109 if let Err(error) = self.ensure_credentials() {
110 return Self::error_stream(error);
111 }
112
113 let client = self.clone();
114 stream_pages(request, move |request| {
115 let client = client.clone();
116 async move { client.auctions(request).await }
117 })
118 }
119
120 pub fn auctions_single_stream(
121 &self,
122 request: AuctionsSingleRequest,
123 ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
124 if let Err(error) = self.ensure_credentials() {
125 return Self::error_stream(error);
126 }
127
128 let client = self.clone();
129 stream_pages(request, move |request| {
130 let client = client.clone();
131 async move { client.auctions_single(request).await }
132 })
133 }
134
135 pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
136 self.ensure_credentials()?;
137 let client = self.clone();
138
139 collect_all(request, move |request| {
140 let client = client.clone();
141 async move { client.bars(request).await }
142 })
143 .await
144 }
145
146 pub async fn bars_single(
147 &self,
148 request: BarsSingleRequest,
149 ) -> Result<BarsSingleResponse, Error> {
150 self.ensure_credentials()?;
151 request.validate()?;
152 let endpoint = Endpoint::StocksBarsSingle {
153 symbol: request.symbol.clone(),
154 };
155 self.inner
156 .http
157 .get_json(
158 &self.inner.base_url,
159 endpoint,
160 &self.inner.auth,
161 request.to_query(),
162 )
163 .await
164 }
165
166 pub async fn bars_single_all(
167 &self,
168 request: BarsSingleRequest,
169 ) -> Result<BarsSingleResponse, Error> {
170 self.ensure_credentials()?;
171 let client = self.clone();
172
173 collect_all(request, move |request| {
174 let client = client.clone();
175 async move { client.bars_single(request).await }
176 })
177 .await
178 }
179
180 pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
181 if let Err(error) = self.ensure_credentials() {
182 return Self::error_stream(error);
183 }
184
185 let client = self.clone();
186 stream_pages(request, move |request| {
187 let client = client.clone();
188 async move { client.bars(request).await }
189 })
190 }
191
192 pub fn bars_single_stream(
193 &self,
194 request: BarsSingleRequest,
195 ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
196 if let Err(error) = self.ensure_credentials() {
197 return Self::error_stream(error);
198 }
199
200 let client = self.clone();
201 stream_pages(request, move |request| {
202 let client = client.clone();
203 async move { client.bars_single(request).await }
204 })
205 }
206
207 pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
208 self.ensure_credentials()?;
209 request.validate()?;
210 self.inner
211 .http
212 .get_json(
213 &self.inner.base_url,
214 Endpoint::StocksQuotes,
215 &self.inner.auth,
216 request.to_query(),
217 )
218 .await
219 }
220
221 pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
222 self.ensure_credentials()?;
223 let client = self.clone();
224
225 collect_all(request, move |request| {
226 let client = client.clone();
227 async move { client.quotes(request).await }
228 })
229 .await
230 }
231
232 pub async fn quotes_single(
233 &self,
234 request: QuotesSingleRequest,
235 ) -> Result<QuotesSingleResponse, Error> {
236 self.ensure_credentials()?;
237 request.validate()?;
238 let endpoint = Endpoint::StocksQuotesSingle {
239 symbol: request.symbol.clone(),
240 };
241 self.inner
242 .http
243 .get_json(
244 &self.inner.base_url,
245 endpoint,
246 &self.inner.auth,
247 request.to_query(),
248 )
249 .await
250 }
251
252 pub async fn quotes_single_all(
253 &self,
254 request: QuotesSingleRequest,
255 ) -> Result<QuotesSingleResponse, Error> {
256 self.ensure_credentials()?;
257 let client = self.clone();
258
259 collect_all(request, move |request| {
260 let client = client.clone();
261 async move { client.quotes_single(request).await }
262 })
263 .await
264 }
265
266 pub fn quotes_stream(
267 &self,
268 request: QuotesRequest,
269 ) -> ResponseStream<Result<QuotesResponse, Error>> {
270 if let Err(error) = self.ensure_credentials() {
271 return Self::error_stream(error);
272 }
273
274 let client = self.clone();
275 stream_pages(request, move |request| {
276 let client = client.clone();
277 async move { client.quotes(request).await }
278 })
279 }
280
281 pub fn quotes_single_stream(
282 &self,
283 request: QuotesSingleRequest,
284 ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
285 if let Err(error) = self.ensure_credentials() {
286 return Self::error_stream(error);
287 }
288
289 let client = self.clone();
290 stream_pages(request, move |request| {
291 let client = client.clone();
292 async move { client.quotes_single(request).await }
293 })
294 }
295
296 pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
297 self.ensure_credentials()?;
298 request.validate()?;
299 self.inner
300 .http
301 .get_json(
302 &self.inner.base_url,
303 Endpoint::StocksTrades,
304 &self.inner.auth,
305 request.to_query(),
306 )
307 .await
308 }
309
310 pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
311 self.ensure_credentials()?;
312 let client = self.clone();
313
314 collect_all(request, move |request| {
315 let client = client.clone();
316 async move { client.trades(request).await }
317 })
318 .await
319 }
320
321 pub async fn trades_single(
322 &self,
323 request: TradesSingleRequest,
324 ) -> Result<TradesSingleResponse, Error> {
325 self.ensure_credentials()?;
326 request.validate()?;
327 let endpoint = Endpoint::StocksTradesSingle {
328 symbol: request.symbol.clone(),
329 };
330 self.inner
331 .http
332 .get_json(
333 &self.inner.base_url,
334 endpoint,
335 &self.inner.auth,
336 request.to_query(),
337 )
338 .await
339 }
340
341 pub async fn trades_single_all(
342 &self,
343 request: TradesSingleRequest,
344 ) -> Result<TradesSingleResponse, Error> {
345 self.ensure_credentials()?;
346 let client = self.clone();
347
348 collect_all(request, move |request| {
349 let client = client.clone();
350 async move { client.trades_single(request).await }
351 })
352 .await
353 }
354
355 pub fn trades_stream(
356 &self,
357 request: TradesRequest,
358 ) -> ResponseStream<Result<TradesResponse, Error>> {
359 if let Err(error) = self.ensure_credentials() {
360 return Self::error_stream(error);
361 }
362
363 let client = self.clone();
364 stream_pages(request, move |request| {
365 let client = client.clone();
366 async move { client.trades(request).await }
367 })
368 }
369
370 pub fn trades_single_stream(
371 &self,
372 request: TradesSingleRequest,
373 ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
374 if let Err(error) = self.ensure_credentials() {
375 return Self::error_stream(error);
376 }
377
378 let client = self.clone();
379 stream_pages(request, move |request| {
380 let client = client.clone();
381 async move { client.trades_single(request).await }
382 })
383 }
384
385 pub async fn latest_bars(
386 &self,
387 request: LatestBarsRequest,
388 ) -> Result<LatestBarsResponse, Error> {
389 self.ensure_credentials()?;
390 request.validate()?;
391 self.inner
392 .http
393 .get_json(
394 &self.inner.base_url,
395 Endpoint::StocksLatestBars,
396 &self.inner.auth,
397 request.to_query(),
398 )
399 .await
400 }
401
402 pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
403 self.ensure_credentials()?;
404 request.validate()?;
405 let endpoint = Endpoint::StocksLatestBar {
406 symbol: request.symbol.clone(),
407 };
408 self.inner
409 .http
410 .get_json(
411 &self.inner.base_url,
412 endpoint,
413 &self.inner.auth,
414 request.to_query(),
415 )
416 .await
417 }
418
419 pub async fn latest_quotes(
420 &self,
421 request: LatestQuotesRequest,
422 ) -> Result<LatestQuotesResponse, Error> {
423 self.ensure_credentials()?;
424 request.validate()?;
425 self.inner
426 .http
427 .get_json(
428 &self.inner.base_url,
429 Endpoint::StocksLatestQuotes,
430 &self.inner.auth,
431 request.to_query(),
432 )
433 .await
434 }
435
436 pub async fn latest_quote(
437 &self,
438 request: LatestQuoteRequest,
439 ) -> Result<LatestQuoteResponse, Error> {
440 self.ensure_credentials()?;
441 request.validate()?;
442 let endpoint = Endpoint::StocksLatestQuote {
443 symbol: request.symbol.clone(),
444 };
445 self.inner
446 .http
447 .get_json(
448 &self.inner.base_url,
449 endpoint,
450 &self.inner.auth,
451 request.to_query(),
452 )
453 .await
454 }
455
456 pub async fn latest_trades(
457 &self,
458 request: LatestTradesRequest,
459 ) -> Result<LatestTradesResponse, Error> {
460 self.ensure_credentials()?;
461 request.validate()?;
462 self.inner
463 .http
464 .get_json(
465 &self.inner.base_url,
466 Endpoint::StocksLatestTrades,
467 &self.inner.auth,
468 request.to_query(),
469 )
470 .await
471 }
472
473 pub async fn latest_trade(
474 &self,
475 request: LatestTradeRequest,
476 ) -> Result<LatestTradeResponse, Error> {
477 self.ensure_credentials()?;
478 request.validate()?;
479 let endpoint = Endpoint::StocksLatestTrade {
480 symbol: request.symbol.clone(),
481 };
482 self.inner
483 .http
484 .get_json(
485 &self.inner.base_url,
486 endpoint,
487 &self.inner.auth,
488 request.to_query(),
489 )
490 .await
491 }
492
493 pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
494 self.ensure_credentials()?;
495 request.validate()?;
496 self.inner
497 .http
498 .get_json(
499 &self.inner.base_url,
500 Endpoint::StocksSnapshots,
501 &self.inner.auth,
502 request.to_query(),
503 )
504 .await
505 }
506
507 pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
508 self.ensure_credentials()?;
509 request.validate()?;
510 let endpoint = Endpoint::StocksSnapshot {
511 symbol: request.symbol.clone(),
512 };
513 self.inner
514 .http
515 .get_json(
516 &self.inner.base_url,
517 endpoint,
518 &self.inner.auth,
519 request.to_query(),
520 )
521 .await
522 }
523
524 pub async fn condition_codes(
525 &self,
526 request: ConditionCodesRequest,
527 ) -> Result<ConditionCodesResponse, Error> {
528 self.ensure_credentials()?;
529 let ticktype = request.ticktype.as_str();
530 let query = request.to_query();
531 let endpoint = Endpoint::StocksConditionCodes { ticktype };
532 self.inner
533 .http
534 .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
535 .await
536 }
537
538 pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
539 self.ensure_credentials()?;
540 self.inner
541 .http
542 .get_json(
543 &self.inner.base_url,
544 Endpoint::StocksExchangeCodes,
545 &self.inner.auth,
546 Vec::new(),
547 )
548 .await
549 }
550
551 fn ensure_credentials(&self) -> Result<(), Error> {
552 if self.inner.auth.has_credentials() {
553 Ok(())
554 } else {
555 Err(Error::MissingCredentials)
556 }
557 }
558
559 fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
560 where
561 Response: Send + 'static,
562 {
563 Box::pin(futures_util::stream::once(async move { Err(error) }))
564 }
565}