1use std::sync::Arc;
2
3use crate::{
4 Error,
5 client::Inner,
6 common::response::ResponseStream,
7 transport::endpoint::Endpoint,
8 transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12 BarsRequest, BarsResponse, ChainRequest, ChainResponse, ConditionCodesRequest,
13 ConditionCodesResponse, ExchangeCodesResponse, LatestQuotesRequest, LatestQuotesResponse,
14 LatestTradesRequest, LatestTradesResponse, SnapshotsRequest, SnapshotsResponse, TradesRequest,
15 TradesResponse,
16};
17
18#[derive(Clone, Debug)]
19pub struct OptionsClient {
20 inner: Arc<Inner>,
21}
22
23impl OptionsClient {
24 pub(crate) fn new(inner: Arc<Inner>) -> Self {
25 Self { inner }
26 }
27
28 pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
29 self.ensure_credentials()?;
30 request.validate()?;
31 self.inner
32 .http
33 .get_json(
34 &self.inner.base_url,
35 Endpoint::OptionsBars,
36 &self.inner.auth,
37 request.to_query(),
38 )
39 .await
40 }
41
42 pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
43 self.ensure_credentials()?;
44 let client = self.clone();
45
46 collect_all(request, move |request| {
47 let client = client.clone();
48 async move { client.bars(request).await }
49 })
50 .await
51 }
52
53 pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
54 if let Err(error) = self.ensure_credentials() {
55 return Self::error_stream(error);
56 }
57
58 let client = self.clone();
59 stream_pages(request, move |request| {
60 let client = client.clone();
61 async move { client.bars(request).await }
62 })
63 }
64
65 pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
66 self.ensure_credentials()?;
67 request.validate()?;
68 self.inner
69 .http
70 .get_json(
71 &self.inner.base_url,
72 Endpoint::OptionsTrades,
73 &self.inner.auth,
74 request.to_query(),
75 )
76 .await
77 }
78
79 pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
80 self.ensure_credentials()?;
81 let client = self.clone();
82
83 collect_all(request, move |request| {
84 let client = client.clone();
85 async move { client.trades(request).await }
86 })
87 .await
88 }
89
90 pub fn trades_stream(
91 &self,
92 request: TradesRequest,
93 ) -> ResponseStream<Result<TradesResponse, Error>> {
94 if let Err(error) = self.ensure_credentials() {
95 return Self::error_stream(error);
96 }
97
98 let client = self.clone();
99 stream_pages(request, move |request| {
100 let client = client.clone();
101 async move { client.trades(request).await }
102 })
103 }
104
105 pub async fn latest_quotes(
106 &self,
107 request: LatestQuotesRequest,
108 ) -> Result<LatestQuotesResponse, Error> {
109 self.ensure_credentials()?;
110 request.validate()?;
111 self.inner
112 .http
113 .get_json(
114 &self.inner.base_url,
115 Endpoint::OptionsLatestQuotes,
116 &self.inner.auth,
117 request.to_query(),
118 )
119 .await
120 }
121
122 pub async fn latest_trades(
123 &self,
124 request: LatestTradesRequest,
125 ) -> Result<LatestTradesResponse, Error> {
126 self.ensure_credentials()?;
127 request.validate()?;
128 self.inner
129 .http
130 .get_json(
131 &self.inner.base_url,
132 Endpoint::OptionsLatestTrades,
133 &self.inner.auth,
134 request.to_query(),
135 )
136 .await
137 }
138
139 pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
140 self.ensure_credentials()?;
141 request.validate()?;
142 self.inner
143 .http
144 .get_json(
145 &self.inner.base_url,
146 Endpoint::OptionsSnapshots,
147 &self.inner.auth,
148 request.to_query(),
149 )
150 .await
151 }
152
153 pub async fn snapshots_all(
154 &self,
155 request: SnapshotsRequest,
156 ) -> Result<SnapshotsResponse, Error> {
157 self.ensure_credentials()?;
158 let client = self.clone();
159
160 collect_all(request, move |request| {
161 let client = client.clone();
162 async move { client.snapshots(request).await }
163 })
164 .await
165 }
166
167 pub fn snapshots_stream(
168 &self,
169 request: SnapshotsRequest,
170 ) -> ResponseStream<Result<SnapshotsResponse, Error>> {
171 if let Err(error) = self.ensure_credentials() {
172 return Self::error_stream(error);
173 }
174
175 let client = self.clone();
176 stream_pages(request, move |request| {
177 let client = client.clone();
178 async move { client.snapshots(request).await }
179 })
180 }
181
182 pub async fn chain(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
183 self.ensure_credentials()?;
184 request.validate()?;
185 let endpoint = Endpoint::OptionsChain {
186 underlying_symbol: request.underlying_symbol.clone(),
187 };
188 self.inner
189 .http
190 .get_json(
191 &self.inner.base_url,
192 endpoint,
193 &self.inner.auth,
194 request.to_query(),
195 )
196 .await
197 }
198
199 pub async fn chain_all(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
200 self.ensure_credentials()?;
201 let client = self.clone();
202
203 collect_all(request, move |request| {
204 let client = client.clone();
205 async move { client.chain(request).await }
206 })
207 .await
208 }
209
210 pub fn chain_stream(
211 &self,
212 request: ChainRequest,
213 ) -> ResponseStream<Result<ChainResponse, Error>> {
214 if let Err(error) = self.ensure_credentials() {
215 return Self::error_stream(error);
216 }
217
218 let client = self.clone();
219 stream_pages(request, move |request| {
220 let client = client.clone();
221 async move { client.chain(request).await }
222 })
223 }
224
225 pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
226 self.ensure_credentials()?;
227 self.inner
228 .http
229 .get_json(
230 &self.inner.base_url,
231 Endpoint::OptionsExchangeCodes,
232 &self.inner.auth,
233 Vec::new(),
234 )
235 .await
236 }
237
238 pub async fn condition_codes(
239 &self,
240 request: ConditionCodesRequest,
241 ) -> Result<ConditionCodesResponse, Error> {
242 self.ensure_credentials()?;
243 let endpoint = Endpoint::OptionsConditionCodes {
244 ticktype: request.ticktype(),
245 };
246 self.inner
247 .http
248 .get_json(&self.inner.base_url, endpoint, &self.inner.auth, Vec::new())
249 .await
250 }
251
252 fn ensure_credentials(&self) -> Result<(), Error> {
253 if self.inner.auth.has_credentials() {
254 Ok(())
255 } else {
256 Err(Error::MissingCredentials)
257 }
258 }
259
260 fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
261 where
262 Response: Send + 'static,
263 {
264 Box::pin(futures_util::stream::once(async move { Err(error) }))
265 }
266}