diff options
Diffstat (limited to 'hsm-stream')
| -rw-r--r-- | hsm-stream/Hsm/Stream.hs | 116 | ||||
| -rw-r--r-- | hsm-stream/Hsm/Stream/FFI.hsc | 48 | ||||
| -rw-r--r-- | hsm-stream/Test/Stream.hs | 15 | ||||
| -rw-r--r-- | hsm-stream/hsm-stream.cabal | 22 |
4 files changed, 85 insertions, 116 deletions
diff --git a/hsm-stream/Hsm/Stream.hs b/hsm-stream/Hsm/Stream.hs index e0b2b5b..a01eb4b 100644 --- a/hsm-stream/Hsm/Stream.hs +++ b/hsm-stream/Hsm/Stream.hs @@ -1,69 +1,87 @@ -{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Stream ( Stream , startStream , stopStream + , isStreaming , runStream ) where -import Control.Monad (void, when) -import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, liftIO, (:>)) -import Effectful.Dispatch.Static (SideEffects (WithSideEffects), StaticRep, evalStaticRep, getStaticRep, unsafeEff_) -import Effectful.Exception (finally) -import Foreign.C.String (withCString) -import Foreign.Ptr (Ptr, nullPtr) -import Hsm.Log (Log, Severity (Info), logMsg) -import Hsm.Stream.FFI - ( GstElement - , gstDeinit - , gstElementSetState - , gstInit - , gstObjectUnref - , gstParseLaunch - , gstStateNull - , gstStatePlaying +import Control.Monad (forever) +import Control.Monad.Extra (unlessM, whenM) +import Data.Maybe (fromJust, isJust) +import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread) +import Effectful.Dispatch.Static + ( SideEffects (WithSideEffects) + , StaticRep + , evalStaticRep + , getStaticRep + , putStaticRep + , unsafeEff_ ) -import System.Environment (setEnv) +import Effectful.Exception (IOException, catch, finally) +import Effectful.Fail (Fail) +import GHC.IO.Handle (Handle, hGetLine) +import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg) +import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out) data Stream (a :: * -> *) (b :: *) type instance DispatchOf Stream = Static WithSideEffects -newtype instance StaticRep Stream - = Stream (Ptr GstElement) +data StreamRep = StreamRep + { phdl :: ProcessHandle + , hout :: Handle + , herr :: Handle + , tout :: ThreadId + , terr :: ThreadId + } -startStream :: (Log "stream" :> es, Stream :> es) => Eff es () -startStream = do - Stream pipeline <- getStaticRep - logMsg Info "Starting stream" - unsafeEff_ . void $ gstElementSetState pipeline gstStatePlaying +newtype instance StaticRep Stream + = Stream (Maybe StreamRep) -stopStream :: (Log "stream" :> es, Stream :> es) => Eff es () -stopStream = do - Stream pipeline <- getStaticRep - logMsg Info "Stopping stream" - unsafeEff_ . void $ gstElementSetState pipeline gstStateNull +-- The following functions manage the GStreamer pipeline as a subprocess. +-- This ensures: +-- - Clean resource cleanup on stream restart +-- - Proper WebSocket connection teardown (prevents browser-side lingering) +-- - Reliable browser disconnect/reconnect cycles +-- +-- Direct library integration proved problematic due to resource lifecycle +-- issues, particularly with `webrtcsink` WebSocket persistence. +isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool +isStreaming = do + Stream rep <- getStaticRep + return $ isJust rep -runStream :: (IOE :> es, Log "stream" :> es) => Bool -> Eff (Stream : es) a -> Eff es a -runStream suppressXLogs action = do - when suppressXLogs $ do - logMsg Info "Suppressing external loggers" - liftIO $ setEnv "GST_DEBUG" "none" - liftIO $ setEnv "LIBCAMERA_LOG_LEVELS" "FATAL" - liftIO $ setEnv "WEBRTCSINK_SIGNALLING_SERVER_LOG" "none" - logMsg Info "Initializing gstreamer library" - liftIO $ gstInit nullPtr nullPtr - logMsg Info $ "Parsing gstreamer pipeline: " <> pipelineStr - pipeline <- liftIO . withCString pipelineStr $ \cStr -> gstParseLaunch cStr nullPtr - evalStaticRep (Stream pipeline) . finally action $ stopStream >> endStream +startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es () +startStream = + unlessM isStreaming $ do + logMsg @"stream" Info "Initializing gstreamer pipeline" + (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl + tout <- spEcho hout + terr <- spEcho herr + putStaticRep . Stream $ Just StreamRep{..} where - pipelineStr = "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! webrtcsink run-signalling-server=true" - endStream = do - Stream pipeline <- getStaticRep - logMsg Info "Unrefing gstreamer pipeline" - liftIO $ gstObjectUnref pipeline - logMsg Info "De-initializing gstreamer library" - liftIO gstDeinit + spFlags = words "--quiet --no-position" + pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true" + spArgs = spFlags <> pipeline + spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe} + spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return () + +stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es () +stopStream = + whenM isStreaming $ do + Stream rep <- getStaticRep + logMsg Info "Stopping stream" + let StreamRep{..} = fromJust rep + unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl) + killThread tout + killThread terr + putStaticRep $ Stream Nothing + +runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a +runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream diff --git a/hsm-stream/Hsm/Stream/FFI.hsc b/hsm-stream/Hsm/Stream/FFI.hsc deleted file mode 100644 index 3ef4f98..0000000 --- a/hsm-stream/Hsm/Stream/FFI.hsc +++ /dev/null @@ -1,48 +0,0 @@ -{-# LANGUAGE CApiFFI #-} - -module Hsm.Stream.FFI - ( GstElement - , gstInit - , gstDeinit - , gstParseLaunch - , gstStatePlaying - , gstStateNull - , gstElementSetState - , gstObjectUnref - ) -where - -import Foreign.C.String (CString) -import Foreign.C.Types (CChar, CInt) -import Foreign.Ptr (Ptr) - -data GstElement - -data GError - -newtype GStateChangeReturn - = GStateChangeReturn Int - -newtype GState - = GState Int - -foreign import capi safe "gst/gst.h gst_init" - gstInit :: Ptr CInt -> Ptr (Ptr (Ptr CChar)) -> IO () - -foreign import capi safe "gst/gst.h gst_deinit" - gstDeinit :: IO () - -foreign import capi safe "gst/gst.h gst_parse_launch" - gstParseLaunch :: CString -> Ptr GError -> IO (Ptr GstElement) - -foreign import capi safe "gst/gst.h value GST_STATE_PLAYING" - gstStatePlaying :: GState - -foreign import capi safe "gst/gst.h value GST_STATE_NULL" - gstStateNull :: GState - -foreign import capi safe "gst/gst.h gst_element_set_state" - gstElementSetState :: Ptr GstElement -> GState -> IO GStateChangeReturn - -foreign import capi safe "gst/gst.h gst_object_unref" - gstObjectUnref :: Ptr GstElement -> IO () diff --git a/hsm-stream/Test/Stream.hs b/hsm-stream/Test/Stream.hs index 010ebcc..327d2e4 100644 --- a/hsm-stream/Test/Stream.hs +++ b/hsm-stream/Test/Stream.hs @@ -1,8 +1,15 @@ -import Control.Concurrent (threadDelay) import Data.Function ((&)) -import Effectful (liftIO, runEff) -import Hsm.Log (Severity (Info), runLog) +import Effectful (runEff) +import Effectful.Concurrent (runConcurrent, threadDelay) +import Effectful.Fail (runFailIO) +import Hsm.Log (Severity (Trace), runLogs) import Hsm.Stream (runStream, startStream) main :: IO () -main = (startStream >> liftIO (threadDelay $ maxBound @Int)) & runStream True & runLog @"stream" Info & runEff +main = + (startStream >> threadDelay (maxBound @Int)) + & runStream + & runLogs @'["gst", "stream"] Trace + & runConcurrent + & runFailIO + & runEff diff --git a/hsm-stream/hsm-stream.cabal b/hsm-stream/hsm-stream.cabal index 96bca1d..1774ae7 100644 --- a/hsm-stream/hsm-stream.cabal +++ b/hsm-stream/hsm-stream.cabal @@ -6,32 +6,30 @@ version: 0.1.0.0 library build-depends: , base + , effectful , effectful-core , effectful-plugin + , extra , hsm-log + , process default-language: GHC2024 exposed-modules: Hsm.Stream - extra-libraries: gstreamer-1.0 ghc-options: -O2 -Wall -Werror -Wno-star-is-type -Wunused-packages -fplugin=Effectful.Plugin - include-dirs: - /usr/include/gstreamer-1.0 /usr/include/glib-2.0 - /usr/lib/glib-2.0/include - - other-modules: Hsm.Stream.FFI - executable test-stream build-depends: , base + , effectful , effectful-core , effectful-plugin + , extra , hsm-log + , process default-language: GHC2024 - extra-libraries: gstreamer-1.0 ghc-options: -O2 -threaded -Wall -Werror -Wno-star-is-type -Wunused-packages -fplugin=Effectful.Plugin @@ -39,11 +37,5 @@ executable test-stream if !arch(x86_64) ghc-options: -optl=-mno-fix-cortex-a53-835769 - include-dirs: - /usr/include/gstreamer-1.0 /usr/include/glib-2.0 - /usr/lib/glib-2.0/include - main-is: Test/Stream.hs - other-modules: - Hsm.Stream - Hsm.Stream.FFI + other-modules: Hsm.Stream |
