10
10
import aioredis
11
11
import requests
12
12
from aioredis .exceptions import ResponseError
13
+ from fastapi import BackgroundTasks
13
14
from fastapi import Depends
14
15
from fastapi import FastAPI
15
16
from pydantic import BaseSettings
18
19
DEFAULT_KEY_PREFIX = 'is-bitcoin-lit'
19
20
SENTIMENT_API_URL = 'HTTps://api.senticrypt.com/v1/history/bitcoin-{time}.json'
20
21
TIME_FORMAT_STRING = '%Y-%m-%d_%H'
22
+ TWO_MINUTES = 60 * 60
21
23
22
24
23
25
def prefixed_key (f ):
@@ -65,12 +67,17 @@ class Config(BaseSettings):
65
67
66
68
def make_summary (data ):
67
69
"""Take a series of averages and summarize them as means of means."""
68
- return {
70
+ summary = {
69
71
'time' : datetime .now ().timestamp (),
70
72
'mean_of_means_sentiment' : sum (d ['mean' ] for d in data ) / len (data ),
71
73
'mean_of_means_price' : sum (float (d ['btc_price' ]) for d in data ) / len (data ),
72
74
}
73
75
76
+ summary ['lit' ] = '1' if float (
77
+ summary ['mean_of_means_sentiment' ],
78
+ ) > 0 else '0'
79
+ return summary
80
+
74
81
75
82
async def add_many_to_timeseries (
76
83
key_pairs : Iterable [Tuple [str , str ]],
@@ -87,7 +94,7 @@ async def add_many_to_timeseries(
87
94
for datapoint in data :
88
95
for key , attr in key_pairs :
89
96
partial = functools .partial (
90
- partial , key , datapoint ['timestamp' ], datapoint [attr ],
97
+ partial , key , int ( datapoint ['timestamp' ]) , datapoint [attr ],
91
98
)
92
99
return await partial ()
93
100
@@ -96,37 +103,57 @@ def make_keys():
96
103
return Keys ()
97
104
98
105
106
+ async def persist (keys , data , summary ):
107
+ # TODO: Only add timeseries data that we don't already have -- how?
108
+ ts_price_key = keys .timeseries_price_key ()
109
+ ts_sentiment_key = keys .timeseries_sentiment_key ()
110
+ summary_key = keys .summary_key ()
111
+
112
+ await redis .hset (summary_key , mapping = summary )
113
+ await redis .expire (summary_key , TWO_MINUTES )
114
+ await add_many_to_timeseries (
115
+ (
116
+ (ts_price_key , 'btc_price' ),
117
+ (ts_sentiment_key , 'mean' ),
118
+ ), data ,
119
+ )
120
+
121
+
99
122
@app .get ('/is-bitcoin-lit' )
100
- async def bitcoin (keys : Keys = Depends (make_keys )):
123
+ async def bitcoin (background_tasks : BackgroundTasks , keys : Keys = Depends (make_keys )):
101
124
sentiment_time = datetime .now ().strftime (TIME_FORMAT_STRING )
102
125
summary_key = keys .summary_key ()
103
- ts_price_key = keys .timeseries_price_key ()
104
- ts_sentiment_key = keys .timeseries_sentiment_key ()
105
126
url = SENTIMENT_API_URL .format (time = sentiment_time )
106
127
107
128
summary = await redis .hgetall (summary_key )
108
129
109
- if not summary :
110
- # TODO: Only add timeseries data that we don't already have -- how?
130
+ if summary :
131
+ summary ['lit' ] = True if summary ['lit' ] == '1' else False
132
+ else :
111
133
data = requests .get (url ).json ()
112
134
summary = make_summary (data )
113
- await redis .hset (summary_key , mapping = summary )
114
- await redis .expire (summary_key , 60 )
115
- await add_many_to_timeseries (
116
- (
117
- (ts_price_key , 'btc_price' ),
118
- (ts_sentiment_key , 'mean' ),
119
- ), data ,
120
- )
135
+ background_tasks .add_task (persist , keys , data , summary )
121
136
122
137
return summary
123
138
124
139
125
140
@app .on_event ('startup' )
126
- async def startup_event (keys : Keys = Depends (make_keys )):
141
+ async def startup_event ():
142
+ keys = Keys ()
143
+ # When we create our timeseries, we'll use the "first" duplicate policy,
144
+ # which ignores duplicate pairs of timestamp and values if we add them.
145
+ #
146
+ # Because of this, we don't worry about handling this logic ourselves --
147
+ # but note that there is a performance cost to writes using this policy.
127
148
try :
128
- redis .execute_command ('TS.CREATE' , keys .timeseries_sentiment_key ())
129
- redis .execute_command ('TS.CREATE' , keys .timeseries_price_key ())
149
+ await redis .execute_command (
150
+ 'TS.CREATE' , keys .timeseries_sentiment_key (),
151
+ 'DUPLICATE_POLICY' , 'first' ,
152
+ )
153
+ await redis .execute_command (
154
+ 'TS.CREATE' , keys .timeseries_price_key (),
155
+ 'DUPLICATE_POLICY' , 'first' ,
156
+ )
130
157
except ResponseError :
131
158
# Time series already exists
132
159
pass
0 commit comments