{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
module Database.Beam.Postgres.Conduit
( streamingRunSelect
, runInsert
, streamingRunInsertReturning
, runUpdate
, streamingRunUpdateReturning
, runDelete
, streamingRunDeleteReturning
, executeStatement
, streamingRunQueryReturning
, runSelect
, runInsertReturning
, runUpdateReturning
, runDeleteReturning
, runQueryReturning
) where
import Database.Beam hiding (runInsert, runUpdate, runDelete)
import Database.Beam.Postgres.Connection
import Database.Beam.Postgres.Full
import Database.Beam.Postgres.Syntax
import Database.Beam.Postgres.Types
import Control.Concurrent.MVar (takeMVar, putMVar)
import Control.Exception.Base (bracket, throwIO)
import Control.Exception.Lifted (finally)
import qualified Control.Exception.Lifted as Lifted
import qualified Control.Concurrent.MVar.Lifted as Lifted
import Control.Monad.Trans.Control (MonadBaseControl)
import qualified Database.PostgreSQL.LibPQ as Pg hiding
(Connection, escapeStringConn, escapeIdentifier, escapeByteaConn, exec)
import qualified Database.PostgreSQL.LibPQ as Pq
import qualified Database.PostgreSQL.Simple as Pg
import qualified Database.PostgreSQL.Simple.Internal as Pg
import Database.PostgreSQL.Simple.Internal (connectionHandle)
import qualified Database.PostgreSQL.Simple.Types as Pg (Query(..))
import qualified Conduit as C
import Data.Int (Int64)
import Data.Maybe (fromMaybe)
#if !MIN_VERSION_base(4, 11, 0)
import Data.Semigroup
#endif
import qualified Control.Monad.Fail as Fail
#if MIN_VERSION_conduit(1,3,0)
#define CONDUIT_TRANSFORMER C.ConduitT
#else
#define CONDUIT_TRANSFORMER C.ConduitM
#endif
streamingRunSelect :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
=> Pg.Connection -> SqlSelect Postgres a
-> CONDUIT_TRANSFORMER () a m ()
streamingRunSelect :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> SqlSelect Postgres a -> ConduitT () a m ()
streamingRunSelect Connection
conn (SqlSelect (PgSelectSyntax PgSyntax
syntax)) =
Connection -> PgSyntax -> ConduitT () a m ()
forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
syntax
runSelect :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
=> Pg.Connection -> SqlSelect Postgres a
-> (CONDUIT_TRANSFORMER () a m () -> m b) -> m b
runSelect :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
FromBackendRow Postgres a) =>
Connection
-> SqlSelect Postgres a -> (ConduitT () a m () -> m b) -> m b
runSelect Connection
conn (SqlSelect (PgSelectSyntax PgSyntax
syntax)) ConduitT () a m () -> m b
withSrc =
Connection -> PgSyntax -> (ConduitT () a m () -> m b) -> m b
forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
syntax ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runSelect "Use streamingRunSelect" #-}
runInsert :: MonadIO m
=> Pg.Connection -> SqlInsert Postgres tbl -> m Int64
runInsert :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlInsert Postgres tbl -> m Int64
runInsert Connection
_ SqlInsert Postgres tbl
SqlInsertNoRows = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
runInsert Connection
conn (SqlInsert TableSettings tbl
_ (PgInsertSyntax PgSyntax
i)) =
Connection -> PgSyntax -> m Int64
forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
i
streamingRunInsertReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
=> Pg.Connection
-> PgInsertReturning a
-> CONDUIT_TRANSFORMER () a m ()
streamingRunInsertReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgInsertReturning a -> ConduitT () a m ()
streamingRunInsertReturning Connection
_ PgInsertReturning a
PgInsertReturningEmpty = () -> ConduitT () a m ()
forall a. a -> ConduitT () a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
streamingRunInsertReturning Connection
conn (PgInsertReturning PgSyntax
i) =
Connection -> PgSyntax -> ConduitT () a m ()
forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
i
runInsertReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
=> Pg.Connection
-> PgInsertReturning a
-> (CONDUIT_TRANSFORMER () a m () -> m b)
-> m b
runInsertReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
FromBackendRow Postgres a) =>
Connection
-> PgInsertReturning a -> (ConduitT () a m () -> m b) -> m b
runInsertReturning Connection
_ PgInsertReturning a
PgInsertReturningEmpty ConduitT () a m () -> m b
withSrc = ConduitT () a m () -> m b
withSrc (() -> ConduitT () a m ()
forall a. a -> ConduitT () a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runInsertReturning Connection
conn (PgInsertReturning PgSyntax
i) ConduitT () a m () -> m b
withSrc =
Connection -> PgSyntax -> (ConduitT () a m () -> m b) -> m b
forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
i ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runInsertReturning "Use streamingRunInsertReturning" #-}
runUpdate :: MonadIO m
=> Pg.Connection -> SqlUpdate Postgres tbl -> m Int64
runUpdate :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlUpdate Postgres tbl -> m Int64
runUpdate Connection
_ SqlUpdate Postgres tbl
SqlIdentityUpdate = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
runUpdate Connection
conn (SqlUpdate TableSettings tbl
_ (PgUpdateSyntax PgSyntax
i)) =
Connection -> PgSyntax -> m Int64
forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
i
streamingRunUpdateReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a)
=> Pg.Connection
-> PgUpdateReturning a
-> CONDUIT_TRANSFORMER () a m ()
streamingRunUpdateReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgUpdateReturning a -> ConduitT () a m ()
streamingRunUpdateReturning Connection
_ PgUpdateReturning a
PgUpdateReturningEmpty = () -> ConduitT () a m ()
forall a. a -> ConduitT () a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
streamingRunUpdateReturning Connection
conn (PgUpdateReturning PgSyntax
u) =
Connection -> PgSyntax -> ConduitT () a m ()
forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
u
runUpdateReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a)
=> Pg.Connection
-> PgUpdateReturning a
-> (CONDUIT_TRANSFORMER () a m () -> m b)
-> m b
runUpdateReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
FromBackendRow Postgres a) =>
Connection
-> PgUpdateReturning a -> (ConduitT () a m () -> m b) -> m b
runUpdateReturning Connection
_ PgUpdateReturning a
PgUpdateReturningEmpty ConduitT () a m () -> m b
withSrc = ConduitT () a m () -> m b
withSrc (() -> ConduitT () a m ()
forall a. a -> ConduitT () a m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runUpdateReturning Connection
conn (PgUpdateReturning PgSyntax
u) ConduitT () a m () -> m b
withSrc =
Connection -> PgSyntax -> (ConduitT () a m () -> m b) -> m b
forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
u ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runUpdateReturning "Use streamingRunUpdateReturning" #-}
runDelete :: MonadIO m
=> Pg.Connection -> SqlDelete Postgres tbl
-> m Int64
runDelete :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlDelete Postgres tbl -> m Int64
runDelete Connection
conn (SqlDelete TableSettings tbl
_ (PgDeleteSyntax PgSyntax
d)) =
Connection -> PgSyntax -> m Int64
forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
d
streamingRunDeleteReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
=> Pg.Connection -> PgDeleteReturning a
-> CONDUIT_TRANSFORMER () a m ()
streamingRunDeleteReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgDeleteReturning a -> ConduitT () a m ()
streamingRunDeleteReturning Connection
conn (PgDeleteReturning PgSyntax
d) =
Connection -> PgSyntax -> ConduitT () a m ()
forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
d
runDeleteReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
=> Pg.Connection -> PgDeleteReturning a
-> (CONDUIT_TRANSFORMER () a m () -> m b) -> m b
runDeleteReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
FromBackendRow Postgres a) =>
Connection
-> PgDeleteReturning a -> (ConduitT () a m () -> m b) -> m b
runDeleteReturning Connection
conn (PgDeleteReturning PgSyntax
d) ConduitT () a m () -> m b
withSrc =
Connection -> PgSyntax -> (ConduitT () a m () -> m b) -> m b
forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
d ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runDeleteReturning "Use streamingRunDeleteReturning" #-}
executeStatement :: MonadIO m => Pg.Connection -> PgSyntax -> m Int64
executeStatement :: forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
x =
IO Int64 -> m Int64
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> m Int64) -> IO Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
syntax <- Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x
Pg.execute_ conn (Pg.Query syntax)
streamingRunQueryReturning
:: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres r )
=> Pg.Connection -> PgSyntax
-> CONDUIT_TRANSFORMER () r m ()
streamingRunQueryReturning :: forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle :: MVar Connection
connectionHandle}) PgSyntax
x = do
syntax <- IO ByteString -> ConduitT () r m ByteString
forall a. IO a -> ConduitT () r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT () r m ByteString)
-> IO ByteString -> ConduitT () r m ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x
C.bracketP
(takeMVar connectionHandle)
(putMVar connectionHandle)
(\Connection
conn' -> do
success <- IO Bool -> ConduitT () r m Bool
forall a. IO a -> ConduitT () r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ConduitT () r m Bool)
-> IO Bool -> ConduitT () r m Bool
forall a b. (a -> b) -> a -> b
$
if Connection -> Bool
Pg.isNullConnection Connection
conn'
then SqlError -> IO Bool
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SqlError
Pg.disconnectedError
else Connection -> ByteString -> IO Bool
Pg.sendQuery Connection
conn' ByteString
syntax
if success
then do
singleRowModeSet <- liftIO $ Pg.setSingleRowMode conn'
if singleRowModeSet
then
C.bracketP
(pure ())
(\()
_ -> Connection -> IO ()
gracefulShutdown Connection
conn')
(\()
_ -> Connection -> Connection -> Maybe [Field] -> ConduitT () r m ()
forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults Connection
conn Connection
conn' Maybe [Field]
forall a. Maybe a
Nothing)
else Fail.fail "Could not enable single row mode"
else do
errMsg <- fromMaybe "No libpq error provided" <$> liftIO (Pg.errorMessage conn')
Fail.fail (show errMsg))
streamResults :: (Fail.MonadFail m, FromBackendRow Postgres r, MonadIO m) => Pg.Connection -> Pq.Connection -> Maybe [Pg.Field] -> C.ConduitT i r m ()
streamResults :: forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle :: MVar Connection
connectionHandle}) Connection
conn' Maybe [Field]
fields = do
nextRow <- IO (Maybe Result) -> ConduitT i r m (Maybe Result)
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO (Maybe Result)
Pg.getResult Connection
conn')
case nextRow of
Maybe Result
Nothing -> () -> ConduitT i r m ()
forall a. a -> ConduitT i r m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Result
row ->
IO ExecStatus -> ConduitT i r m ExecStatus
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Result -> IO ExecStatus
Pg.resultStatus Result
row) ConduitT i r m ExecStatus
-> (ExecStatus -> ConduitT i r m ()) -> ConduitT i r m ()
forall a b.
ConduitT i r m a -> (a -> ConduitT i r m b) -> ConduitT i r m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\case
ExecStatus
Pg.SingleTuple ->
do fields' <- IO [Field] -> ConduitT i r m [Field]
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Field]
-> ([Field] -> IO [Field]) -> Maybe [Field] -> IO [Field]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Result -> IO [Field]
getFields Result
row) [Field] -> IO [Field]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe [Field]
fields)
parsedRow <- liftIO $ bracket
(putMVar connectionHandle conn')
(\()
_ -> MVar Connection -> IO Connection
forall a. MVar a -> IO a
takeMVar MVar Connection
connectionHandle)
(\()
_ -> Connection
-> Row
-> Result
-> [Field]
-> FromBackendRowM Postgres r
-> IO (Either BeamRowReadError r)
forall a.
Connection
-> Row
-> Result
-> [Field]
-> FromBackendRowM Postgres a
-> IO (Either BeamRowReadError a)
runPgRowReader Connection
conn Row
0 Result
row [Field]
fields' FromBackendRowM Postgres r
forall be a. FromBackendRow be a => FromBackendRowM be a
fromBackendRow)
case parsedRow of
Left BeamRowReadError
err -> IO () -> ConduitT i r m ()
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> Result -> String -> IO ()
forall a. Connection -> Result -> String -> IO a
bailEarly Connection
conn' Result
row (String
"Could not read row: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> BeamRowReadError -> String
forall a. Show a => a -> String
show BeamRowReadError
err))
Right r
parsedRow' ->
do r -> ConduitT i r m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield r
parsedRow'
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults Connection
conn Connection
conn' ([Field] -> Maybe [Field]
forall a. a -> Maybe a
Just [Field]
fields')
ExecStatus
Pg.TuplesOk -> IO () -> ConduitT i r m ()
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO ()
finishQuery Connection
conn')
ExecStatus
Pg.EmptyQuery -> String -> ConduitT i r m ()
forall a. String -> ConduitT i r m a
forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
"No query"
ExecStatus
Pg.CommandOk -> () -> ConduitT i r m ()
forall a. a -> ConduitT i r m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
status :: ExecStatus
status@ExecStatus
Pg.BadResponse -> IO () -> ConduitT i r m ()
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> Result -> ExecStatus -> IO ()
forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
status :: ExecStatus
status@ExecStatus
Pg.NonfatalError -> IO () -> ConduitT i r m ()
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> Result -> ExecStatus -> IO ()
forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
status :: ExecStatus
status@ExecStatus
Pg.FatalError -> IO () -> ConduitT i r m ()
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteString -> Result -> ExecStatus -> IO ()
forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
ExecStatus
_ -> do errMsg <- IO (Maybe ByteString) -> ConduitT i r m (Maybe ByteString)
forall a. IO a -> ConduitT i r m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Result -> IO (Maybe ByteString)
Pg.resultErrorMessage Result
row)
Fail.fail ("Postgres error: " <> show errMsg)
bailEarly :: Pq.Connection -> Pg.Result -> String -> IO a
bailEarly :: forall a. Connection -> Result -> String -> IO a
bailEarly Connection
conn' Result
row String
errorString = do
Result -> IO ()
Pg.unsafeFreeResult Result
row
Connection -> IO ()
cancelQuery Connection
conn'
String -> IO a
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
errorString
cancelQuery :: Pq.Connection -> IO ()
cancelQuery :: Connection -> IO ()
cancelQuery Connection
conn' = do
cancel <- Connection -> IO (Maybe Cancel)
Pg.getCancel Connection
conn'
case cancel of
Maybe Cancel
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Cancel
cancel' -> do
res <- Cancel -> IO (Either ByteString ())
Pg.cancel Cancel
cancel'
case res of
Right () -> IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO ()
finishQuery Connection
conn')
Left ByteString
err -> String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail (String
"Could not cancel: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
err)
finishQuery :: Pq.Connection -> IO ()
finishQuery :: Connection -> IO ()
finishQuery Connection
conn' = do
nextRow <- Connection -> IO (Maybe Result)
Pg.getResult Connection
conn'
case nextRow of
Maybe Result
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just Result
_ -> Connection -> IO ()
finishQuery Connection
conn'
gracefulShutdown :: Pq.Connection -> IO ()
gracefulShutdown :: Connection -> IO ()
gracefulShutdown Connection
conn' = do
sts <- Connection -> IO TransactionStatus
Pg.transactionStatus Connection
conn'
case sts of
TransactionStatus
Pg.TransIdle -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TransactionStatus
Pg.TransInTrans -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TransactionStatus
Pg.TransInError -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TransactionStatus
Pg.TransUnknown -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
TransactionStatus
Pg.TransActive -> Connection -> IO ()
cancelQuery Connection
conn'
runQueryReturning
:: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, Functor m, FromBackendRow Postgres r )
=> Pg.Connection -> PgSyntax
-> (CONDUIT_TRANSFORMER () r m () -> m b)
-> m b
runQueryReturning :: forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle :: MVar Connection
connectionHandle}) PgSyntax
x ConduitT () r m () -> m b
withSrc = do
syntax <- IO ByteString -> m ByteString
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x
Lifted.bracket
(Lifted.takeMVar connectionHandle)
(Lifted.putMVar connectionHandle)
(\Connection
conn' -> do
success <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO Bool
Pg.sendQuery Connection
conn' ByteString
syntax
if success
then do
singleRowModeSet <- liftIO (Pg.setSingleRowMode conn')
if singleRowModeSet
then withSrc (streamResults conn conn' Nothing) `finally` (liftIO $ gracefulShutdown conn')
else Fail.fail "Could not enable single row mode"
else do
errMsg <- fromMaybe "No libpq error provided" <$> liftIO (Pg.errorMessage conn')
Fail.fail (show errMsg))
{-# DEPRECATED runQueryReturning "Use streamingRunQueryReturning" #-}