forked from PostgREST/postgrest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAppState.hs
479 lines (419 loc) · 18.4 KB
/
AppState.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
module PostgREST.AppState
( AppState
, AuthResult(..)
, destroy
, getConfig
, getSchemaCache
, getMainThreadId
, getPgVersion
, getNextDelay
, getNextListenerDelay
, getTime
, getJwtCache
, getSocketREST
, getSocketAdmin
, init
, initSockets
, initWithPool
, putNextListenerDelay
, putSchemaCache
, putPgVersion
, putIsListenerOn
, usePool
, readInDbConfig
, schemaCacheLoader
, getObserver
, isLoaded
, isPending
) where
import qualified Data.Aeson as JSON
import qualified Data.Aeson.KeyMap as KM
import qualified Data.ByteString.Char8 as BS
import qualified Data.Cache as C
import Data.Either.Combinators (whenLeft)
import qualified Data.Text as T (unpack)
import qualified Hasql.Pool as SQL
import qualified Hasql.Pool.Config as SQL
import qualified Hasql.Session as SQL
import qualified Hasql.Transaction.Sessions as SQL
import qualified Network.HTTP.Types.Status as HTTP
import qualified Network.Socket as NS
import qualified PostgREST.Error as Error
import qualified PostgREST.Logger as Logger
import qualified PostgREST.Metrics as Metrics
import PostgREST.Observation
import PostgREST.Version (prettyVersion)
import System.TimeIt (timeItT)
import Control.AutoUpdate (defaultUpdateSettings, mkAutoUpdate,
updateAction)
import Control.Debounce
import Control.Retry (RetryPolicy, RetryStatus (..), capDelay,
exponentialBackoff, retrying,
rsPreviousDelay)
import Data.IORef (IORef, atomicWriteIORef, newIORef,
readIORef)
import Data.Time.Clock (UTCTime, getCurrentTime)
import PostgREST.Config (AppConfig (..),
addFallbackAppName,
readAppConfig)
import PostgREST.Config.Database (queryDbSettings,
queryPgVersion,
queryRoleSettings)
import PostgREST.Config.PgVersion (PgVersion (..),
minimumPgVersion)
import PostgREST.SchemaCache (SchemaCache (..),
querySchemaCache,
showSummary)
import PostgREST.SchemaCache.Identifiers (dumpQi)
import PostgREST.Unix (createAndBindDomainSocket)
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
import Data.String (IsString (..))
import Protolude
data AuthResult = AuthResult
{ authClaims :: KM.KeyMap JSON.Value
, authRole :: BS.ByteString
}
data AppState = AppState
-- | Database connection pool
{ statePool :: SQL.Pool
-- | Database server version
, statePgVersion :: IORef PgVersion
-- | Schema cache
, stateSchemaCache :: IORef (Maybe SchemaCache)
-- | The schema cache status
, stateSCacheStatus :: IORef SchemaCacheStatus
-- | State of the LISTEN channel
, stateIsListenerOn :: IORef Bool
-- | starts the connection worker with a debounce
, debouncedSCacheLoader :: IO ()
-- | Config that can change at runtime
, stateConf :: IORef AppConfig
-- | Time used for verifying JWT expiration
, stateGetTime :: IO UTCTime
-- | Used for killing the main thread in case a subthread fails
, stateMainThreadId :: ThreadId
-- | Keeps track of the next delay for db connection retry
, stateNextDelay :: IORef Int
-- | Keeps track of the next delay for the listener
, stateNextListenerDelay :: IORef Int
-- | JWT Cache
, jwtCache :: C.Cache ByteString AuthResult
-- | Network socket for REST API
, stateSocketREST :: NS.Socket
-- | Network socket for the admin UI
, stateSocketAdmin :: Maybe NS.Socket
-- | Observation handler
, stateObserver :: ObservationHandler
, stateLogger :: Logger.LoggerState
, stateMetrics :: Metrics.MetricsState
}
-- | Schema cache status
data SchemaCacheStatus
= SCLoaded
| SCPending
deriving Eq
type AppSockets = (NS.Socket, Maybe NS.Socket)
init :: AppConfig -> IO AppState
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
loggerState <- Logger.init
metricsState <- Metrics.init configDbPoolSize
let observer = liftA2 (>>) (Logger.observationLogger loggerState configLogLevel) (Metrics.observationMetrics metricsState)
observer $ AppStartObs prettyVersion
pool <- initPool conf observer
(sock, adminSock) <- initSockets conf
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
<*> newIORef Nothing
<*> newIORef SCPending
<*> newIORef False
<*> pure (pure ())
<*> newIORef conf
<*> mkAutoUpdate defaultUpdateSettings { updateAction = getCurrentTime }
<*> myThreadId
<*> newIORef 0
<*> newIORef 1
<*> C.newCache Nothing
<*> pure sock
<*> pure adminSock
<*> pure observer
<*> pure loggerState
<*> pure metricsState
deb <-
let decisecond = 100000 in
mkDebounce defaultDebounceSettings
{ debounceAction = retryingSchemaCacheLoad appState
, debounceFreq = decisecond
, debounceEdge = leadingEdge -- runs the worker at the start and the end
}
return appState { debouncedSCacheLoader = deb}
destroy :: AppState -> IO ()
destroy = destroyPool
initSockets :: AppConfig -> IO AppSockets
initSockets AppConfig{..} = do
let
cfg'usp = configServerUnixSocket
cfg'uspm = configServerUnixSocketMode
cfg'host = configServerHost
cfg'port = configServerPort
cfg'adminHost = configAdminServerHost
cfg'adminPort = configAdminServerPort
sock <- case cfg'usp of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path cfg'uspm
Nothing -> do
(_, sock) <-
if cfg'port /= 0
then do
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
pure (cfg'port, sock)
else do
-- explicitly bind to a random port, returning bound port number
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
pure (num, sock)
pure sock
adminSock <- case cfg'adminPort of
Just adminPort -> do
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
pure $ Just adminSock
Nothing -> pure Nothing
pure (sock, adminSock)
initPool :: AppConfig -> ObservationHandler -> IO SQL.Pool
initPool AppConfig{..} observer = do
SQL.acquire $ SQL.settings
[ SQL.size configDbPoolSize
, SQL.acquisitionTimeout $ fromIntegral configDbPoolAcquisitionTimeout
, SQL.agingTimeout $ fromIntegral configDbPoolMaxLifetime
, SQL.idlenessTimeout $ fromIntegral configDbPoolMaxIdletime
, SQL.staticConnectionSettings (toUtf8 $ addFallbackAppName prettyVersion configDbUri)
, SQL.observationHandler $ observer . HasqlPoolObs
]
-- | Run an action with a database connection.
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} sess = do
observer PoolRequest
res <- SQL.use statePool sess
observer PoolRequestFullfilled
whenLeft res (\case
SQL.AcquisitionTimeoutUsageError ->
observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
err@(SQL.ConnectionUsageError e) ->
let failureMessage = BS.unpack $ fromMaybe mempty e in
when (("FATAL: password authentication failed" `isInfixOf` failureMessage) || ("no password supplied" `isInfixOf` failureMessage)) $ do
observer $ ExitDBFatalError ServerAuthError err
killThread mainThreadId
err@(SQL.SessionUsageError (SQL.QueryError tpl _ (SQL.ResultError resultErr))) -> do
case resultErr of
SQL.UnexpectedResult{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
SQL.RowError{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
SQL.UnexpectedAmountOfRows{} -> do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
-- Check for a syntax error (42601 is the pg code) only for queries that don't have `WITH pgrst_source` as prefix.
-- This would mean the error is on our schema cache queries, so we treat it as fatal.
-- TODO have a better way to mark this as a schema cache query
SQL.ServerError "42601" _ _ _ _ ->
unless ("WITH pgrst_source" `BS.isPrefixOf` tpl) $ do
observer $ ExitDBFatalError ServerPgrstBug err
killThread mainThreadId
-- Check for a "prepared statement <name> already exists" error (Code 42P05: duplicate_prepared_statement).
-- This would mean that a connection pooler in transaction mode is being used
-- while prepared statements are enabled in the PostgREST configuration,
-- both of which are incompatible with each other.
SQL.ServerError "42P05" _ _ _ _ -> do
observer $ ExitDBFatalError ServerError42P05 err
killThread mainThreadId
-- Check for a "transaction blocks not allowed in statement pooling mode" error (Code 08P01: protocol_violation).
-- This would mean that a connection pooler in statement mode is being used which is not supported in PostgREST.
SQL.ServerError "08P01" "transaction blocks not allowed in statement pooling mode" _ _ _ -> do
observer $ ExitDBFatalError ServerError08P01 err
killThread mainThreadId
SQL.ServerError{} ->
when (Error.status (Error.PgError False err) >= HTTP.status500) $
observer $ QueryErrorCodeHighObs err
SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ClientError _)) ->
pure ()
)
return res
-- | Flush the connection pool so that any future use of the pool will
-- use connections freshly established after this call.
flushPool :: AppState -> IO ()
flushPool AppState{..} = SQL.release statePool
-- | Destroy the pool on shutdown.
destroyPool :: AppState -> IO ()
destroyPool AppState{..} = SQL.release statePool
getPgVersion :: AppState -> IO PgVersion
getPgVersion = readIORef . statePgVersion
putPgVersion :: AppState -> PgVersion -> IO ()
putPgVersion = atomicWriteIORef . statePgVersion
getSchemaCache :: AppState -> IO (Maybe SchemaCache)
getSchemaCache = readIORef . stateSchemaCache
putSchemaCache :: AppState -> Maybe SchemaCache -> IO ()
putSchemaCache appState = atomicWriteIORef (stateSchemaCache appState)
schemaCacheLoader :: AppState -> IO ()
schemaCacheLoader = debouncedSCacheLoader
getNextDelay :: AppState -> IO Int
getNextDelay = readIORef . stateNextDelay
getNextListenerDelay :: AppState -> IO Int
getNextListenerDelay = readIORef . stateNextListenerDelay
putNextListenerDelay :: AppState -> Int -> IO ()
putNextListenerDelay = atomicWriteIORef . stateNextListenerDelay
getConfig :: AppState -> IO AppConfig
getConfig = readIORef . stateConf
putConfig :: AppState -> AppConfig -> IO ()
putConfig = atomicWriteIORef . stateConf
getTime :: AppState -> IO UTCTime
getTime = stateGetTime
getJwtCache :: AppState -> C.Cache ByteString AuthResult
getJwtCache = jwtCache
getSocketREST :: AppState -> NS.Socket
getSocketREST = stateSocketREST
getSocketAdmin :: AppState -> Maybe NS.Socket
getSocketAdmin = stateSocketAdmin
getMainThreadId :: AppState -> ThreadId
getMainThreadId = stateMainThreadId
isConnEstablished :: AppState -> IO Bool
isConnEstablished appState = do
AppConfig{..} <- getConfig appState
if configDbChannelEnabled then -- if the listener is enabled, we can be sure the connection is up
readIORef $ stateIsListenerOn appState
else -- otherwise the only way to check the connection is to make a query
isRight <$> usePool appState (SQL.sql "SELECT 1")
putIsListenerOn :: AppState -> Bool -> IO ()
putIsListenerOn = atomicWriteIORef . stateIsListenerOn
isLoaded :: AppState -> IO Bool
isLoaded x = do
scacheStatus <- readIORef $ stateSCacheStatus x
connEstablished <- isConnEstablished x
return $ scacheStatus == SCLoaded && connEstablished
isPending :: AppState -> IO Bool
isPending x = do
scacheStatus <- readIORef $ stateSCacheStatus x
connEstablished <- isConnEstablished x
return $ scacheStatus == SCPending || not connEstablished
putSCacheStatus :: AppState -> SchemaCacheStatus -> IO ()
putSCacheStatus = atomicWriteIORef . stateSCacheStatus
getObserver :: AppState -> ObservationHandler
getObserver = stateObserver
-- | Try to load the schema cache and retry if it fails.
--
-- This is done by repeatedly: 1) flushing the pool, 2) querying the version and validating that the postgres version is supported by us, and 3) loading the schema cache.
-- It's necessary to flush the pool:
--
-- + Because connections cache the pg catalog(see #2620)
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
retryingSchemaCacheLoad :: AppState -> IO ()
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
when (rsIterNumber > 0) $ do
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
observer $ ConnectionRetryObs delay
putNextListenerDelay appState delay
flushPool appState
(,) <$> qPgVersion <*> (qInDbConfig *> qSchemaCache)
)
where
qPgVersion :: IO (Maybe PgVersion)
qPgVersion = do
AppConfig{..} <- getConfig appState
pgVersion <- usePool appState (queryPgVersion False) -- No need to prepare the query here, as the connection might not be established
case pgVersion of
Left e -> do
observer $ QueryPgVersionError e
unless configDbPoolAutomaticRecovery $ do
observer ExitDBNoRecoveryObs
killThread mainThreadId
return Nothing
Right actualPgVersion -> do
when (actualPgVersion < minimumPgVersion) $ do
observer $ ExitUnsupportedPgVersion actualPgVersion minimumPgVersion
killThread mainThreadId
observer $ DBConnectedObs $ pgvFullName actualPgVersion
observer $ PoolInit configDbPoolSize
putPgVersion appState actualPgVersion
return $ Just actualPgVersion
qInDbConfig :: IO ()
qInDbConfig = do
AppConfig{..} <- getConfig appState
when configDbConfig $ readInDbConfig False appState
qSchemaCache :: IO (Maybe SchemaCache)
qSchemaCache = do
conf@AppConfig{..} <- getConfig appState
(resultTime, result) <-
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
case result of
Left e -> do
putSCacheStatus appState SCPending
putSchemaCache appState Nothing
observer $ SchemaCacheErrorObs e
return Nothing
Right sCache -> do
-- IMPORTANT: While the pending schema cache state starts from running the above querySchemaCache, only at this stage we block API requests due to the usage of an
-- IORef on putSchemaCache. This is why SCacheStatus is put at SCPending here to signal the Admin server (using isPending) that we're on a recovery state.
putSCacheStatus appState SCPending
putSchemaCache appState $ Just sCache
observer $ SchemaCacheQueriedObs resultTime
(t, _) <- timeItT $ observer $ SchemaCacheSummaryObs $ showSummary sCache
observer $ SchemaCacheLoadedObs t
putSCacheStatus appState SCLoaded
return $ Just sCache
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
shouldRetry _ (pgVer, sCache) = do
AppConfig{..} <- getConfig appState
let itShould = configDbPoolAutomaticRecovery && (isNothing pgVer || isNothing sCache)
return itShould
retryPolicy :: RetryPolicy
retryPolicy =
let delayMicroseconds = 32*oneSecondInUs {-32 seconds-} in
capDelay delayMicroseconds $ exponentialBackoff oneSecondInUs
oneSecondInUs = 1000000 -- one second in microseconds
-- | Reads the in-db config and reads the config file again
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
readInDbConfig :: Bool -> AppState -> IO ()
readInDbConfig startingUp appState@AppState{stateObserver=observer} = do
AppConfig{..} <- getConfig appState
pgVer <- getPgVersion appState
dbSettings <-
if configDbConfig then do
qDbSettings <- usePool appState (queryDbSettings (dumpQi <$> configDbPreConfig) configDbPreparedStatements)
case qDbSettings of
Left e -> do
observer $ ConfigReadErrorObs e
pure mempty
Right x -> pure x
else
pure mempty
(roleSettings, roleIsolationLvl) <-
if configDbConfig then do
rSettings <- usePool appState (queryRoleSettings pgVer configDbPreparedStatements)
case rSettings of
Left e -> do
observer $ QueryRoleSettingsErrorObs e
pure (mempty, mempty)
Right x -> pure x
else
pure mempty
readAppConfig dbSettings configFilePath (Just configDbUri) roleSettings roleIsolationLvl >>= \case
Left err ->
if startingUp then
panic err -- die on invalid config if the program is starting up
else
observer $ ConfigInvalidObs err
Right newConf -> do
putConfig appState newConf
if startingUp then
pass
else
observer ConfigSucceededObs