alpaca_data/options/
client.rs

1use std::sync::Arc;
2
3use crate::{
4    Error,
5    client::Inner,
6    common::response::ResponseStream,
7    transport::endpoint::Endpoint,
8    transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12    BarsRequest, BarsResponse, ChainRequest, ChainResponse, ConditionCodesRequest,
13    ConditionCodesResponse, ExchangeCodesResponse, LatestQuotesRequest, LatestQuotesResponse,
14    LatestTradesRequest, LatestTradesResponse, SnapshotsRequest, SnapshotsResponse, TradesRequest,
15    TradesResponse,
16};
17
18#[derive(Clone, Debug)]
19pub struct OptionsClient {
20    inner: Arc<Inner>,
21}
22
23impl OptionsClient {
24    pub(crate) fn new(inner: Arc<Inner>) -> Self {
25        Self { inner }
26    }
27
28    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
29        self.ensure_credentials()?;
30        request.validate()?;
31        self.inner
32            .http
33            .get_json(
34                &self.inner.base_url,
35                Endpoint::OptionsBars,
36                &self.inner.auth,
37                request.to_query(),
38            )
39            .await
40    }
41
42    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
43        self.ensure_credentials()?;
44        let client = self.clone();
45
46        collect_all(request, move |request| {
47            let client = client.clone();
48            async move { client.bars(request).await }
49        })
50        .await
51    }
52
53    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
54        if let Err(error) = self.ensure_credentials() {
55            return Self::error_stream(error);
56        }
57
58        let client = self.clone();
59        stream_pages(request, move |request| {
60            let client = client.clone();
61            async move { client.bars(request).await }
62        })
63    }
64
65    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
66        self.ensure_credentials()?;
67        request.validate()?;
68        self.inner
69            .http
70            .get_json(
71                &self.inner.base_url,
72                Endpoint::OptionsTrades,
73                &self.inner.auth,
74                request.to_query(),
75            )
76            .await
77    }
78
79    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
80        self.ensure_credentials()?;
81        let client = self.clone();
82
83        collect_all(request, move |request| {
84            let client = client.clone();
85            async move { client.trades(request).await }
86        })
87        .await
88    }
89
90    pub fn trades_stream(
91        &self,
92        request: TradesRequest,
93    ) -> ResponseStream<Result<TradesResponse, Error>> {
94        if let Err(error) = self.ensure_credentials() {
95            return Self::error_stream(error);
96        }
97
98        let client = self.clone();
99        stream_pages(request, move |request| {
100            let client = client.clone();
101            async move { client.trades(request).await }
102        })
103    }
104
105    pub async fn latest_quotes(
106        &self,
107        request: LatestQuotesRequest,
108    ) -> Result<LatestQuotesResponse, Error> {
109        self.ensure_credentials()?;
110        request.validate()?;
111        self.inner
112            .http
113            .get_json(
114                &self.inner.base_url,
115                Endpoint::OptionsLatestQuotes,
116                &self.inner.auth,
117                request.to_query(),
118            )
119            .await
120    }
121
122    pub async fn latest_trades(
123        &self,
124        request: LatestTradesRequest,
125    ) -> Result<LatestTradesResponse, Error> {
126        self.ensure_credentials()?;
127        request.validate()?;
128        self.inner
129            .http
130            .get_json(
131                &self.inner.base_url,
132                Endpoint::OptionsLatestTrades,
133                &self.inner.auth,
134                request.to_query(),
135            )
136            .await
137    }
138
139    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
140        self.ensure_credentials()?;
141        request.validate()?;
142        self.inner
143            .http
144            .get_json(
145                &self.inner.base_url,
146                Endpoint::OptionsSnapshots,
147                &self.inner.auth,
148                request.to_query(),
149            )
150            .await
151    }
152
153    pub async fn snapshots_all(
154        &self,
155        request: SnapshotsRequest,
156    ) -> Result<SnapshotsResponse, Error> {
157        self.ensure_credentials()?;
158        let client = self.clone();
159
160        collect_all(request, move |request| {
161            let client = client.clone();
162            async move { client.snapshots(request).await }
163        })
164        .await
165    }
166
167    pub fn snapshots_stream(
168        &self,
169        request: SnapshotsRequest,
170    ) -> ResponseStream<Result<SnapshotsResponse, Error>> {
171        if let Err(error) = self.ensure_credentials() {
172            return Self::error_stream(error);
173        }
174
175        let client = self.clone();
176        stream_pages(request, move |request| {
177            let client = client.clone();
178            async move { client.snapshots(request).await }
179        })
180    }
181
182    pub async fn chain(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
183        self.ensure_credentials()?;
184        request.validate()?;
185        let endpoint = Endpoint::OptionsChain {
186            underlying_symbol: request.underlying_symbol.clone(),
187        };
188        self.inner
189            .http
190            .get_json(
191                &self.inner.base_url,
192                endpoint,
193                &self.inner.auth,
194                request.to_query(),
195            )
196            .await
197    }
198
199    pub async fn chain_all(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
200        self.ensure_credentials()?;
201        let client = self.clone();
202
203        collect_all(request, move |request| {
204            let client = client.clone();
205            async move { client.chain(request).await }
206        })
207        .await
208    }
209
210    pub fn chain_stream(
211        &self,
212        request: ChainRequest,
213    ) -> ResponseStream<Result<ChainResponse, Error>> {
214        if let Err(error) = self.ensure_credentials() {
215            return Self::error_stream(error);
216        }
217
218        let client = self.clone();
219        stream_pages(request, move |request| {
220            let client = client.clone();
221            async move { client.chain(request).await }
222        })
223    }
224
225    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
226        self.ensure_credentials()?;
227        self.inner
228            .http
229            .get_json(
230                &self.inner.base_url,
231                Endpoint::OptionsExchangeCodes,
232                &self.inner.auth,
233                Vec::new(),
234            )
235            .await
236    }
237
238    pub async fn condition_codes(
239        &self,
240        request: ConditionCodesRequest,
241    ) -> Result<ConditionCodesResponse, Error> {
242        self.ensure_credentials()?;
243        let endpoint = Endpoint::OptionsConditionCodes {
244            ticktype: request.ticktype(),
245        };
246        self.inner
247            .http
248            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, Vec::new())
249            .await
250    }
251
252    fn ensure_credentials(&self) -> Result<(), Error> {
253        if self.inner.auth.has_credentials() {
254            Ok(())
255        } else {
256            Err(Error::MissingCredentials)
257        }
258    }
259
260    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
261    where
262        Response: Send + 'static,
263    {
264        Box::pin(futures_util::stream::once(async move { Err(error) }))
265    }
266}