aboutsummaryrefslogtreecommitdiff
path: root/hsm-stream/Hsm/Stream.hs
blob: a01eb4b2474918be5b6fbf6313dc1b1e1b921b26 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}

module Hsm.Stream
  ( Stream
  , startStream
  , stopStream
  , isStreaming
  , runStream
  )
where

import Control.Monad (forever)
import Control.Monad.Extra (unlessM, whenM)
import Data.Maybe (fromJust, isJust)
import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>))
import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread)
import Effectful.Dispatch.Static
  ( SideEffects (WithSideEffects)
  , StaticRep
  , evalStaticRep
  , getStaticRep
  , putStaticRep
  , unsafeEff_
  )
import Effectful.Exception (IOException, catch, finally)
import Effectful.Fail (Fail)
import GHC.IO.Handle (Handle, hGetLine)
import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg)
import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out)

data Stream (a :: * -> *) (b :: *)

type instance DispatchOf Stream = Static WithSideEffects

data StreamRep = StreamRep
  { phdl :: ProcessHandle
  , hout :: Handle
  , herr :: Handle
  , tout :: ThreadId
  , terr :: ThreadId
  }

newtype instance StaticRep Stream
  = Stream (Maybe StreamRep)

-- The following functions manage the GStreamer pipeline as a subprocess.
-- This ensures:
-- - Clean resource cleanup on stream restart
-- - Proper WebSocket connection teardown (prevents browser-side lingering)
-- - Reliable browser disconnect/reconnect cycles
--
-- Direct library integration proved problematic due to resource lifecycle
-- issues, particularly with `webrtcsink` WebSocket persistence.
isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool
isStreaming = do
  Stream rep <- getStaticRep
  return $ isJust rep

startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es ()
startStream =
  unlessM isStreaming $ do
    logMsg @"stream" Info "Initializing gstreamer pipeline"
    (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl
    tout <- spEcho hout
    terr <- spEcho herr
    putStaticRep . Stream $ Just StreamRep{..}
  where
    spFlags = words "--quiet --no-position"
    pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true"
    spArgs = spFlags <> pipeline
    spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe}
    spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return ()

stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es ()
stopStream =
  whenM isStreaming $ do
    Stream rep <- getStaticRep
    logMsg Info "Stopping stream"
    let StreamRep{..} = fromJust rep
    unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl)
    killThread tout
    killThread terr
    putStaticRep $ Stream Nothing

runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a
runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream