{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Stream ( Stream , startStream , stopStream , isStreaming , runStream ) where import Control.Monad (forever) import Control.Monad.Extra (unlessM, whenM) import Data.Maybe (fromJust, isJust) import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>)) import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread) import Effectful.Dispatch.Static ( SideEffects (WithSideEffects) , StaticRep , evalStaticRep , getStaticRep , putStaticRep , unsafeEff_ ) import Effectful.Exception (IOException, catch, finally) import Effectful.Fail (Fail) import GHC.IO.Handle (Handle, hGetLine) import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg) import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out) data Stream (a :: * -> *) (b :: *) type instance DispatchOf Stream = Static WithSideEffects data StreamRep = StreamRep { phdl :: ProcessHandle , hout :: Handle , herr :: Handle , tout :: ThreadId , terr :: ThreadId } newtype instance StaticRep Stream = Stream (Maybe StreamRep) -- The following functions manage the GStreamer pipeline as a subprocess. -- This ensures: -- - Clean resource cleanup on stream restart -- - Proper WebSocket connection teardown (prevents browser-side lingering) -- - Reliable browser disconnect/reconnect cycles -- -- Direct library integration proved problematic due to resource lifecycle -- issues, particularly with `webrtcsink` WebSocket persistence. isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool isStreaming = do Stream rep <- getStaticRep return $ isJust rep startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es () startStream = unlessM isStreaming $ do logMsg @"stream" Info "Initializing gstreamer pipeline" (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl tout <- spEcho hout terr <- spEcho herr putStaticRep . Stream $ Just StreamRep{..} where spFlags = words "--quiet --no-position" pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true" spArgs = spFlags <> pipeline spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe} spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return () stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es () stopStream = whenM isStreaming $ do Stream rep <- getStaticRep logMsg Info "Stopping stream" let StreamRep{..} = fromJust rep unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl) killThread tout killThread terr putStaticRep $ Stream Nothing runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream