aboutsummaryrefslogtreecommitdiff
path: root/hsm-stream/Hsm
diff options
context:
space:
mode:
Diffstat (limited to 'hsm-stream/Hsm')
-rw-r--r--hsm-stream/Hsm/Stream.hs116
-rw-r--r--hsm-stream/Hsm/Stream/FFI.hsc48
2 files changed, 67 insertions, 97 deletions
diff --git a/hsm-stream/Hsm/Stream.hs b/hsm-stream/Hsm/Stream.hs
index e0b2b5b..a01eb4b 100644
--- a/hsm-stream/Hsm/Stream.hs
+++ b/hsm-stream/Hsm/Stream.hs
@@ -1,69 +1,87 @@
-{-# LANGUAGE DuplicateRecordFields #-}
+{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
module Hsm.Stream
( Stream
, startStream
, stopStream
+ , isStreaming
, runStream
)
where
-import Control.Monad (void, when)
-import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, liftIO, (:>))
-import Effectful.Dispatch.Static (SideEffects (WithSideEffects), StaticRep, evalStaticRep, getStaticRep, unsafeEff_)
-import Effectful.Exception (finally)
-import Foreign.C.String (withCString)
-import Foreign.Ptr (Ptr, nullPtr)
-import Hsm.Log (Log, Severity (Info), logMsg)
-import Hsm.Stream.FFI
- ( GstElement
- , gstDeinit
- , gstElementSetState
- , gstInit
- , gstObjectUnref
- , gstParseLaunch
- , gstStateNull
- , gstStatePlaying
+import Control.Monad (forever)
+import Control.Monad.Extra (unlessM, whenM)
+import Data.Maybe (fromJust, isJust)
+import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread)
+import Effectful.Dispatch.Static
+ ( SideEffects (WithSideEffects)
+ , StaticRep
+ , evalStaticRep
+ , getStaticRep
+ , putStaticRep
+ , unsafeEff_
)
-import System.Environment (setEnv)
+import Effectful.Exception (IOException, catch, finally)
+import Effectful.Fail (Fail)
+import GHC.IO.Handle (Handle, hGetLine)
+import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg)
+import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out)
data Stream (a :: * -> *) (b :: *)
type instance DispatchOf Stream = Static WithSideEffects
-newtype instance StaticRep Stream
- = Stream (Ptr GstElement)
+data StreamRep = StreamRep
+ { phdl :: ProcessHandle
+ , hout :: Handle
+ , herr :: Handle
+ , tout :: ThreadId
+ , terr :: ThreadId
+ }
-startStream :: (Log "stream" :> es, Stream :> es) => Eff es ()
-startStream = do
- Stream pipeline <- getStaticRep
- logMsg Info "Starting stream"
- unsafeEff_ . void $ gstElementSetState pipeline gstStatePlaying
+newtype instance StaticRep Stream
+ = Stream (Maybe StreamRep)
-stopStream :: (Log "stream" :> es, Stream :> es) => Eff es ()
-stopStream = do
- Stream pipeline <- getStaticRep
- logMsg Info "Stopping stream"
- unsafeEff_ . void $ gstElementSetState pipeline gstStateNull
+-- The following functions manage the GStreamer pipeline as a subprocess.
+-- This ensures:
+-- - Clean resource cleanup on stream restart
+-- - Proper WebSocket connection teardown (prevents browser-side lingering)
+-- - Reliable browser disconnect/reconnect cycles
+--
+-- Direct library integration proved problematic due to resource lifecycle
+-- issues, particularly with `webrtcsink` WebSocket persistence.
+isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool
+isStreaming = do
+ Stream rep <- getStaticRep
+ return $ isJust rep
-runStream :: (IOE :> es, Log "stream" :> es) => Bool -> Eff (Stream : es) a -> Eff es a
-runStream suppressXLogs action = do
- when suppressXLogs $ do
- logMsg Info "Suppressing external loggers"
- liftIO $ setEnv "GST_DEBUG" "none"
- liftIO $ setEnv "LIBCAMERA_LOG_LEVELS" "FATAL"
- liftIO $ setEnv "WEBRTCSINK_SIGNALLING_SERVER_LOG" "none"
- logMsg Info "Initializing gstreamer library"
- liftIO $ gstInit nullPtr nullPtr
- logMsg Info $ "Parsing gstreamer pipeline: " <> pipelineStr
- pipeline <- liftIO . withCString pipelineStr $ \cStr -> gstParseLaunch cStr nullPtr
- evalStaticRep (Stream pipeline) . finally action $ stopStream >> endStream
+startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es ()
+startStream =
+ unlessM isStreaming $ do
+ logMsg @"stream" Info "Initializing gstreamer pipeline"
+ (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl
+ tout <- spEcho hout
+ terr <- spEcho herr
+ putStaticRep . Stream $ Just StreamRep{..}
where
- pipelineStr = "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! webrtcsink run-signalling-server=true"
- endStream = do
- Stream pipeline <- getStaticRep
- logMsg Info "Unrefing gstreamer pipeline"
- liftIO $ gstObjectUnref pipeline
- logMsg Info "De-initializing gstreamer library"
- liftIO gstDeinit
+ spFlags = words "--quiet --no-position"
+ pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true"
+ spArgs = spFlags <> pipeline
+ spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe}
+ spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return ()
+
+stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es ()
+stopStream =
+ whenM isStreaming $ do
+ Stream rep <- getStaticRep
+ logMsg Info "Stopping stream"
+ let StreamRep{..} = fromJust rep
+ unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl)
+ killThread tout
+ killThread terr
+ putStaticRep $ Stream Nothing
+
+runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a
+runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream
diff --git a/hsm-stream/Hsm/Stream/FFI.hsc b/hsm-stream/Hsm/Stream/FFI.hsc
deleted file mode 100644
index 3ef4f98..0000000
--- a/hsm-stream/Hsm/Stream/FFI.hsc
+++ /dev/null
@@ -1,48 +0,0 @@
-{-# LANGUAGE CApiFFI #-}
-
-module Hsm.Stream.FFI
- ( GstElement
- , gstInit
- , gstDeinit
- , gstParseLaunch
- , gstStatePlaying
- , gstStateNull
- , gstElementSetState
- , gstObjectUnref
- )
-where
-
-import Foreign.C.String (CString)
-import Foreign.C.Types (CChar, CInt)
-import Foreign.Ptr (Ptr)
-
-data GstElement
-
-data GError
-
-newtype GStateChangeReturn
- = GStateChangeReturn Int
-
-newtype GState
- = GState Int
-
-foreign import capi safe "gst/gst.h gst_init"
- gstInit :: Ptr CInt -> Ptr (Ptr (Ptr CChar)) -> IO ()
-
-foreign import capi safe "gst/gst.h gst_deinit"
- gstDeinit :: IO ()
-
-foreign import capi safe "gst/gst.h gst_parse_launch"
- gstParseLaunch :: CString -> Ptr GError -> IO (Ptr GstElement)
-
-foreign import capi safe "gst/gst.h value GST_STATE_PLAYING"
- gstStatePlaying :: GState
-
-foreign import capi safe "gst/gst.h value GST_STATE_NULL"
- gstStateNull :: GState
-
-foreign import capi safe "gst/gst.h gst_element_set_state"
- gstElementSetState :: Ptr GstElement -> GState -> IO GStateChangeReturn
-
-foreign import capi safe "gst/gst.h gst_object_unref"
- gstObjectUnref :: Ptr GstElement -> IO ()