alpaca_data/stocks/
client.rs

1use std::sync::Arc;
2
3use crate::{
4    Error,
5    client::Inner,
6    common::response::ResponseStream,
7    transport::endpoint::Endpoint,
8    transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12    AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13    BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14    ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15    LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16    LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17    LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18    QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19    TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24    inner: Arc<Inner>,
25}
26
27impl StocksClient {
28    pub(crate) fn new(inner: Arc<Inner>) -> Self {
29        Self { inner }
30    }
31
32    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33        self.ensure_credentials()?;
34        request.validate()?;
35        self.inner
36            .http
37            .get_json(
38                &self.inner.base_url,
39                Endpoint::StocksBars,
40                &self.inner.auth,
41                request.to_query(),
42            )
43            .await
44    }
45
46    pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
47        self.ensure_credentials()?;
48        request.validate()?;
49        self.inner
50            .http
51            .get_json(
52                &self.inner.base_url,
53                Endpoint::StocksAuctions,
54                &self.inner.auth,
55                request.to_query(),
56            )
57            .await
58    }
59
60    pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
61        self.ensure_credentials()?;
62        let client = self.clone();
63
64        collect_all(request, move |request| {
65            let client = client.clone();
66            async move { client.auctions(request).await }
67        })
68        .await
69    }
70
71    pub async fn auctions_single(
72        &self,
73        request: AuctionsSingleRequest,
74    ) -> Result<AuctionsSingleResponse, Error> {
75        self.ensure_credentials()?;
76        request.validate()?;
77        let endpoint = Endpoint::StocksAuctionsSingle {
78            symbol: request.symbol.clone(),
79        };
80        self.inner
81            .http
82            .get_json(
83                &self.inner.base_url,
84                endpoint,
85                &self.inner.auth,
86                request.to_query(),
87            )
88            .await
89    }
90
91    pub async fn auctions_single_all(
92        &self,
93        request: AuctionsSingleRequest,
94    ) -> Result<AuctionsSingleResponse, Error> {
95        self.ensure_credentials()?;
96        let client = self.clone();
97
98        collect_all(request, move |request| {
99            let client = client.clone();
100            async move { client.auctions_single(request).await }
101        })
102        .await
103    }
104
105    pub fn auctions_stream(
106        &self,
107        request: AuctionsRequest,
108    ) -> ResponseStream<Result<AuctionsResponse, Error>> {
109        if let Err(error) = self.ensure_credentials() {
110            return Self::error_stream(error);
111        }
112
113        let client = self.clone();
114        stream_pages(request, move |request| {
115            let client = client.clone();
116            async move { client.auctions(request).await }
117        })
118    }
119
120    pub fn auctions_single_stream(
121        &self,
122        request: AuctionsSingleRequest,
123    ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
124        if let Err(error) = self.ensure_credentials() {
125            return Self::error_stream(error);
126        }
127
128        let client = self.clone();
129        stream_pages(request, move |request| {
130            let client = client.clone();
131            async move { client.auctions_single(request).await }
132        })
133    }
134
135    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
136        self.ensure_credentials()?;
137        let client = self.clone();
138
139        collect_all(request, move |request| {
140            let client = client.clone();
141            async move { client.bars(request).await }
142        })
143        .await
144    }
145
146    pub async fn bars_single(
147        &self,
148        request: BarsSingleRequest,
149    ) -> Result<BarsSingleResponse, Error> {
150        self.ensure_credentials()?;
151        request.validate()?;
152        let endpoint = Endpoint::StocksBarsSingle {
153            symbol: request.symbol.clone(),
154        };
155        self.inner
156            .http
157            .get_json(
158                &self.inner.base_url,
159                endpoint,
160                &self.inner.auth,
161                request.to_query(),
162            )
163            .await
164    }
165
166    pub async fn bars_single_all(
167        &self,
168        request: BarsSingleRequest,
169    ) -> Result<BarsSingleResponse, Error> {
170        self.ensure_credentials()?;
171        let client = self.clone();
172
173        collect_all(request, move |request| {
174            let client = client.clone();
175            async move { client.bars_single(request).await }
176        })
177        .await
178    }
179
180    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
181        if let Err(error) = self.ensure_credentials() {
182            return Self::error_stream(error);
183        }
184
185        let client = self.clone();
186        stream_pages(request, move |request| {
187            let client = client.clone();
188            async move { client.bars(request).await }
189        })
190    }
191
192    pub fn bars_single_stream(
193        &self,
194        request: BarsSingleRequest,
195    ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
196        if let Err(error) = self.ensure_credentials() {
197            return Self::error_stream(error);
198        }
199
200        let client = self.clone();
201        stream_pages(request, move |request| {
202            let client = client.clone();
203            async move { client.bars_single(request).await }
204        })
205    }
206
207    pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
208        self.ensure_credentials()?;
209        request.validate()?;
210        self.inner
211            .http
212            .get_json(
213                &self.inner.base_url,
214                Endpoint::StocksQuotes,
215                &self.inner.auth,
216                request.to_query(),
217            )
218            .await
219    }
220
221    pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
222        self.ensure_credentials()?;
223        let client = self.clone();
224
225        collect_all(request, move |request| {
226            let client = client.clone();
227            async move { client.quotes(request).await }
228        })
229        .await
230    }
231
232    pub async fn quotes_single(
233        &self,
234        request: QuotesSingleRequest,
235    ) -> Result<QuotesSingleResponse, Error> {
236        self.ensure_credentials()?;
237        request.validate()?;
238        let endpoint = Endpoint::StocksQuotesSingle {
239            symbol: request.symbol.clone(),
240        };
241        self.inner
242            .http
243            .get_json(
244                &self.inner.base_url,
245                endpoint,
246                &self.inner.auth,
247                request.to_query(),
248            )
249            .await
250    }
251
252    pub async fn quotes_single_all(
253        &self,
254        request: QuotesSingleRequest,
255    ) -> Result<QuotesSingleResponse, Error> {
256        self.ensure_credentials()?;
257        let client = self.clone();
258
259        collect_all(request, move |request| {
260            let client = client.clone();
261            async move { client.quotes_single(request).await }
262        })
263        .await
264    }
265
266    pub fn quotes_stream(
267        &self,
268        request: QuotesRequest,
269    ) -> ResponseStream<Result<QuotesResponse, Error>> {
270        if let Err(error) = self.ensure_credentials() {
271            return Self::error_stream(error);
272        }
273
274        let client = self.clone();
275        stream_pages(request, move |request| {
276            let client = client.clone();
277            async move { client.quotes(request).await }
278        })
279    }
280
281    pub fn quotes_single_stream(
282        &self,
283        request: QuotesSingleRequest,
284    ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
285        if let Err(error) = self.ensure_credentials() {
286            return Self::error_stream(error);
287        }
288
289        let client = self.clone();
290        stream_pages(request, move |request| {
291            let client = client.clone();
292            async move { client.quotes_single(request).await }
293        })
294    }
295
296    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
297        self.ensure_credentials()?;
298        request.validate()?;
299        self.inner
300            .http
301            .get_json(
302                &self.inner.base_url,
303                Endpoint::StocksTrades,
304                &self.inner.auth,
305                request.to_query(),
306            )
307            .await
308    }
309
310    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
311        self.ensure_credentials()?;
312        let client = self.clone();
313
314        collect_all(request, move |request| {
315            let client = client.clone();
316            async move { client.trades(request).await }
317        })
318        .await
319    }
320
321    pub async fn trades_single(
322        &self,
323        request: TradesSingleRequest,
324    ) -> Result<TradesSingleResponse, Error> {
325        self.ensure_credentials()?;
326        request.validate()?;
327        let endpoint = Endpoint::StocksTradesSingle {
328            symbol: request.symbol.clone(),
329        };
330        self.inner
331            .http
332            .get_json(
333                &self.inner.base_url,
334                endpoint,
335                &self.inner.auth,
336                request.to_query(),
337            )
338            .await
339    }
340
341    pub async fn trades_single_all(
342        &self,
343        request: TradesSingleRequest,
344    ) -> Result<TradesSingleResponse, Error> {
345        self.ensure_credentials()?;
346        let client = self.clone();
347
348        collect_all(request, move |request| {
349            let client = client.clone();
350            async move { client.trades_single(request).await }
351        })
352        .await
353    }
354
355    pub fn trades_stream(
356        &self,
357        request: TradesRequest,
358    ) -> ResponseStream<Result<TradesResponse, Error>> {
359        if let Err(error) = self.ensure_credentials() {
360            return Self::error_stream(error);
361        }
362
363        let client = self.clone();
364        stream_pages(request, move |request| {
365            let client = client.clone();
366            async move { client.trades(request).await }
367        })
368    }
369
370    pub fn trades_single_stream(
371        &self,
372        request: TradesSingleRequest,
373    ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
374        if let Err(error) = self.ensure_credentials() {
375            return Self::error_stream(error);
376        }
377
378        let client = self.clone();
379        stream_pages(request, move |request| {
380            let client = client.clone();
381            async move { client.trades_single(request).await }
382        })
383    }
384
385    pub async fn latest_bars(
386        &self,
387        request: LatestBarsRequest,
388    ) -> Result<LatestBarsResponse, Error> {
389        self.ensure_credentials()?;
390        request.validate()?;
391        self.inner
392            .http
393            .get_json(
394                &self.inner.base_url,
395                Endpoint::StocksLatestBars,
396                &self.inner.auth,
397                request.to_query(),
398            )
399            .await
400    }
401
402    pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
403        self.ensure_credentials()?;
404        request.validate()?;
405        let endpoint = Endpoint::StocksLatestBar {
406            symbol: request.symbol.clone(),
407        };
408        self.inner
409            .http
410            .get_json(
411                &self.inner.base_url,
412                endpoint,
413                &self.inner.auth,
414                request.to_query(),
415            )
416            .await
417    }
418
419    pub async fn latest_quotes(
420        &self,
421        request: LatestQuotesRequest,
422    ) -> Result<LatestQuotesResponse, Error> {
423        self.ensure_credentials()?;
424        request.validate()?;
425        self.inner
426            .http
427            .get_json(
428                &self.inner.base_url,
429                Endpoint::StocksLatestQuotes,
430                &self.inner.auth,
431                request.to_query(),
432            )
433            .await
434    }
435
436    pub async fn latest_quote(
437        &self,
438        request: LatestQuoteRequest,
439    ) -> Result<LatestQuoteResponse, Error> {
440        self.ensure_credentials()?;
441        request.validate()?;
442        let endpoint = Endpoint::StocksLatestQuote {
443            symbol: request.symbol.clone(),
444        };
445        self.inner
446            .http
447            .get_json(
448                &self.inner.base_url,
449                endpoint,
450                &self.inner.auth,
451                request.to_query(),
452            )
453            .await
454    }
455
456    pub async fn latest_trades(
457        &self,
458        request: LatestTradesRequest,
459    ) -> Result<LatestTradesResponse, Error> {
460        self.ensure_credentials()?;
461        request.validate()?;
462        self.inner
463            .http
464            .get_json(
465                &self.inner.base_url,
466                Endpoint::StocksLatestTrades,
467                &self.inner.auth,
468                request.to_query(),
469            )
470            .await
471    }
472
473    pub async fn latest_trade(
474        &self,
475        request: LatestTradeRequest,
476    ) -> Result<LatestTradeResponse, Error> {
477        self.ensure_credentials()?;
478        request.validate()?;
479        let endpoint = Endpoint::StocksLatestTrade {
480            symbol: request.symbol.clone(),
481        };
482        self.inner
483            .http
484            .get_json(
485                &self.inner.base_url,
486                endpoint,
487                &self.inner.auth,
488                request.to_query(),
489            )
490            .await
491    }
492
493    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
494        self.ensure_credentials()?;
495        request.validate()?;
496        self.inner
497            .http
498            .get_json(
499                &self.inner.base_url,
500                Endpoint::StocksSnapshots,
501                &self.inner.auth,
502                request.to_query(),
503            )
504            .await
505    }
506
507    pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
508        self.ensure_credentials()?;
509        request.validate()?;
510        let endpoint = Endpoint::StocksSnapshot {
511            symbol: request.symbol.clone(),
512        };
513        self.inner
514            .http
515            .get_json(
516                &self.inner.base_url,
517                endpoint,
518                &self.inner.auth,
519                request.to_query(),
520            )
521            .await
522    }
523
524    pub async fn condition_codes(
525        &self,
526        request: ConditionCodesRequest,
527    ) -> Result<ConditionCodesResponse, Error> {
528        self.ensure_credentials()?;
529        let ticktype = request.ticktype.as_str();
530        let query = request.to_query();
531        let endpoint = Endpoint::StocksConditionCodes { ticktype };
532        self.inner
533            .http
534            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
535            .await
536    }
537
538    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
539        self.ensure_credentials()?;
540        self.inner
541            .http
542            .get_json(
543                &self.inner.base_url,
544                Endpoint::StocksExchangeCodes,
545                &self.inner.auth,
546                Vec::new(),
547            )
548            .await
549    }
550
551    fn ensure_credentials(&self) -> Result<(), Error> {
552        if self.inner.auth.has_credentials() {
553            Ok(())
554        } else {
555            Err(Error::MissingCredentials)
556        }
557    }
558
559    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
560    where
561        Response: Send + 'static,
562    {
563        Box::pin(futures_util::stream::once(async move { Err(error) }))
564    }
565}