Pipes to Conduits part 7: Closing the input end early

Back in part 5, we added the ability to attach arbitrary finalizers to pipes. But when those finalizers actually ran was purely mechanical: when any given pipe finished, it would run all upstream finalizers, and then its own. This behavior can sometimes delay the finalization of an upstream pipe, if the downstream pipe stops awaiting but continues running and possibly yielding.

This time, we’ll add the close primitive, which will allow the programmer to indicate that a pipe will never await again. This should possibly be named unsafeClose, because in this implementation, we will not use the type system to enforce this guarantee.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeClose where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void, absurd)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)
> import Control.Monad.Trans.Resource (MonadResource, allocate, release)

Functors

We’ll not add a new functor this time; we’ll just reuse Then to indicate the "rest" of the computation after a pipe closes its input end.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const
> newtype Leftover l next = Leftover l     -- Const
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
> 
> instance Functor Abort where
>   fmap _f Abort = Abort
> 
> instance Functor (Finalize m) where
>   fmap _f (Finalize m) = Finalize m
> 
> instance Functor (Leftover l) where
>   fmap _f (Leftover l) = Leftover l
> 
> pass :: Monad m => m ()
> pass = return ()
> 
> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer"

The Pipe type

> type LeftoverThen l = Leftover l :&: Then
> type YieldThen o m  = Yield o :&: Finalize m :&: Then
> type AwaitU i u     = Await i :&: Await u :&: Then
> type Close          = Then

Our PipeF type has certainly grown! Remember when it used to be just Await i :|: YieldThen o? At least we’re not adding yet another type parameter this time.

> type PipeF l i o u m =  YieldThen o m
>                     :|: AwaitU i u
>                     :|: Abort
>                     :|: LeftoverThen l
>                     :|: Close
> type Pipe l i o u m r = FreeT (PipeF l i o u m) m r
> 
> type Producer   o   m r = Pipe Void () o    () m r
> type Consumer l i u m r = Pipe l    i  Void u  m r
> type Pipeline       m r = Pipe Void () Void () m r

Working with PipeF

We update the lifts, the smart constructors, and pipeCase as usual.

> liftYield :: YieldThen o m next ->              PipeF l i o u m next
> liftYield = L . L . L . L
> 
> liftAwait :: AwaitU i u next ->                 PipeF l i o u m next
> liftAwait = L . L . L . R
> 
> liftAbort :: Abort next ->                      PipeF l i o u m next
> liftAbort = L . L . R
> 
> liftLeftover :: LeftoverThen l next ->          PipeF l i o u m next
> liftLeftover = L . R
> 
> liftClose :: Close next ->                      PipeF l i o u m next
> liftClose = R
> yieldF :: o -> m () -> next ->                  PipeF l i o u m next
> yieldF o m next = liftYield $ Yield o :&: Finalize m :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF l i o u m next
> awaitF f g next = liftAwait $ Await f :&: Await g :&: Then next
> 
> abortF ::                                       PipeF l i o u m next
> abortF = liftAbort Abort
> 
> leftoverF :: l -> next ->                       PipeF l i o u m next
> leftoverF l next = liftLeftover $ Leftover l :&: Then next
> 
> closeF :: next ->                               PipeF l i o u m next
> closeF next = liftClose $ Then next
> pipeCase :: FreeF (PipeF l i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (next                               -> a) -- Close
>  -> (l -> next                          -> a) -- Leftover
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (L (L (R Abort))))
>   k _ _ _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ _ _ = k r
> pipeCase (Wrap (R (Then next)))
>   _ _ k _ _ _ = k next
> pipeCase (Wrap (L (R (Leftover l :&: Then next))))
>   _ _ _ k _ _ = k l next
> pipeCase (Wrap (L (L (L (L (Yield o :&: Finalize m :&: Then next))))))
>   _ _ _ _ k _ = k o m next
> pipeCase (Wrap (L (L (L (R (Await f :&: Await g :&: Then next))))))
>   _ _ _ _ _ k = k f g next

Pipe primitives

We add a new primitive, as usual.

> tryAwait :: Monad m =>      Pipe l i o u m (Either (Maybe u) i)
> tryAwait = liftF $ awaitF Right (Left . Just) (Left Nothing)
> 
> yield :: Monad m => o ->    Pipe l i o u m ()
> yield b = liftF $ yieldF b pass ()
> 
> abort :: Monad m =>         Pipe l i o u m r
> abort = liftF abortF
> 
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $ leftoverF l ()
> 
> close :: Monad m => Pipe l i o u m ()
> close = liftF $ closeF ()

Pipe composition

> (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2
> (<?<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <?< p2 = composeWithFinalizer unreachable p1 p2

Now all uses of pipeCase must have an additional branch for Close.

> composeWithFinalizer :: Monad m => m ()
>                  -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1
>   {- Abort  -} (         lift finalizeUpstream >> abort)
>   {- Return -} (\r ->    lift finalizeUpstream >> return r)
>   {- Close  -} (\next -> lift finalizeUpstream >> (wrap $ closeF (next <+< abort)))

The very reason that we made the Close option was so that the upstream pipe could be finalized early. Once we do that, what do we compose with next? We could compose it with p2, but that would be very unsafe, since p2‘s finalizers have been run. Imagine if p2 were reading from a file, then we close the file, then ask p2 to keep reading! So instead, we compose with abort. Recall that earlier we asserted that right-composing a Producer with abort was the same as the identity function:

\forall p \in Producer, p \circ abort \equiv p

If it is indeed true that the downstream pipe will never await again, then we can rest assured that next <+< abort will behave as we desire.

>   {- L-over -} (\l _next -> absurd l)
>   {- Yield  -} (\o finalizeDownstream next ->
>                       let (<*<) = composeWithFinalizer finalizeUpstream
>                       in wrap $ yieldF o
>                           (finalizeUpstream >> finalizeDownstream)
>                           (next <*< p2))
>   {- Await  -} (\f1 g1 onAbort1 -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2
>     {- Abort  -} (    onAbort1 <+< abort) -- downstream recovers
>     {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers
>     {- Close  -} (\next -> wrap $ closeF (p1' <?< next))

Suppose that the upstream pipe closes its input end. If we have reached this point, then the up-upstream finalizers have already been run, so we need not worry about it. Upstream still has control, so we’ll compose next with the unreachable finalizer, as we do for similar situations.

>     {- L-over -} (\l next -> wrap $ leftoverF l (p1' <?< next))
>     {- Yield  -} (\o newFinalizer next ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)
>     {- Await  -} (\f2 g2 onAbort2 -> wrap $ awaitF
>                           (\i -> p1' <?< f2 i)
>                           (\u -> p1' <?< g2 u)
>                           (      p1' <?< onAbort2)))
> (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r
> (>+>) = flip (<+<)
> 
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

At the level of a pipeline, the close operation is meaningless, since it shouldn’t be awaiting anyways. When we runPipe on a Close, therefore, we will simply move on to the next computation.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (                  return Nothing)
>   {- Return -} (\r             -> return $ Just r)
>   {- Close  -} (\next          -> runPipe next)
>   {- L-over -} (\l _next       -> absurd l)
>   {- Yield  -} (\o _fin _next  -> absurd o)
>   {- Await  -} (\f _g _onAbort -> runPipe $ f ())

Getting rid of leftovers

The adjustment to injectLeftovers is interesting: once we close the input end, what should we do with leftovers? Discard them, since we promised not to look at them? Or keep them, since it doesn’t hurt the upstream pipe if we look at the stuff that we already acquired from it.

> injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
> injectLeftovers = go [] where
>   go ls p = FreeT $ do
>     x <- runFreeT p
>     runFreeT $ pipeCase x
>     {- Abort  -} (abort)
>     {- Return -} (\r -> return r)
>     {- Close  -} (\next -> wrap $ closeF (go [] next))

In the name of garbage collection, I say dump them. This is reflected by the recursive call ignoring ls and instead passing in an empty list.

>     {- L-over -} (\l next -> go (l:ls) next)
>     {- Yield  -} (\o fin next -> wrap $ yieldF o fin (go ls next))
>     {- Await  -} (\f g onAbort -> case ls of
>       [] -> wrap $ awaitF (go [] . f) (go [] . g) (go [] onAbort)
>       l : ls' -> go ls' (f l))

Adding finalizers to a pipe

Close is always an intermediate step of a pipe (even if the next step is merely return ()), so when revisiting cleanupP, we need only make sure that the cleanup procedures are passed on to the next computation.

> cleanupP :: Monad m => m () -> m () -> m () -> Pipe l i o u m r
>          -> Pipe l i o u m r
> cleanupP abortFinalize selfAbortFinalize returnFinalize = go where
>   go p = FreeT $ do
>     x <- runFreeT p
>     runFreeT $ pipeCase x
>     {- Abort  -} (      lift selfAbortFinalize >> abort)
>     {- Return -} (\r -> lift returnFinalize    >> return r)
>     {- Close  -} (\next -> wrap $ closeF (go next))
>     {- L-over -} (\l next -> wrap $ leftoverF l (go next))
>     {- Yield  -} (\o finalizeRest next -> wrap $
>                         yieldF o (finalizeRest >> abortFinalize) (go next))
>     {- Await  -} (\f g onAbort -> wrap $
>                         awaitF (go . f) (go . g) (go onAbort))

Playing with our new primitive

Here’s a simple example using (essentially) printf debugging to illustrate the code execution path. See if you can guess the output of runPipe $ exampleConsumer <+< exampleProducer before looking at it.

> exampleProducer :: Producer Int IO ()
> exampleProducer = finallyP (putStrLn "End producer") $ do
>   lift $ putStrLn "Begin producer"
>   lift $ putStrLn "Producer yielding"
>   yield 1
>   lift $ putStrLn "Producer done yielding"
>   pass
> exampleConsumer :: Consumer Void Int () IO ()
> exampleConsumer = finallyP (putStrLn "End consumer") $ do
>   lift $ putStrLn "Begin consumer"
>   lift $ putStrLn "Consumer awaiting"
>   _ <- await
>   lift $ putStrLn "Consumer done awaiting"
>   close
>   lift $ putStrLn "Consumer continues"
>   pass
ghci> runPipe $ exampleConsumer <+< exampleProducer
  Begin consumer
  Consumer awaiting
  Begin producer
  Producer yielding
  Consumer done awaiting
  End producer
  Consumer continues
  End consumer
  Just ()

As you can see, close caused the producer’s finalizer to be run immediately. What would the output look like if the consumer didn’t close?

Safety

Our close primitive gives programmers a convenient way to indicate that a pipe’s upstream should be finalized, but it is completely up to the programmer to make sure that close is used in a safe way, that is, that it is not followed by an await. We gave pipes the behavior of aborting in such circumstances, which is a decent choice, but can we do better?

Control.Frame from the pipes package provides a different (though similar) implementation of pipes that uses indexed monads to solve this problem. If you close a frame, then it is a type error to await afterwards. This has obvious type-safety benefits, but at the cost of using a relatively new concept that has little syntactic sugar support; see Control.Pipe.Tutorial for details on how this technique works.

Next time

We’ve come a long ways from the simplicity of Control.Pipe. Next time, I’ll take away abort and close, and what we have left will be fairly similar to the current state of Data.Conduit. I’ll guide you through some of the conduit source code and observe which choices were made, and attempt to explain why.

Convenience combinators

> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize finalize
> 
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize finalize pass
> 
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass pass finalize
> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
>          -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $ allocate create destroy 
>   finallyP (release key) (mkPipe val)

Some basic pipes

> idMsg :: String -> Pipe l i i u IO u
> idMsg str = finallyP (putStrLn str) idP
> 
> testPipeR :: Monad m => Pipe Void i o u m r -> m (Maybe r)
> testPipeR p = runPipe $ (await >> abort) <+< p <+< abort
> 
> testPipeL :: Monad m => Pipe Void Int o () m r -> m (Maybe r)
> testPipeL p = runPipe $ (await >> await >> abort) <+< take' 1 <+< p <+< fromList [1 ..]
> 
> testPipe :: Monad m => Pipe Void Int o () m r -> m (Maybe (r, [o]))
> testPipe p = runPipe $ runP <+< p <+< fromList [1..]
> 
> take' :: Monad m => Int -> Pipe l i i u m ()
> take' 0 = pass
> take' n = (await >>= yield) >> take' (pred n)
> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield
> awaitE :: Monad m => Pipe l i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
>   Left Nothing  -> abort
>   Left (Just u) -> return $ Left u
>   Right i       -> return $ Right i
> 
> awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
> 
> pipe :: Monad m => (i -> o) -> Pipe l i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe l i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe l i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer l i u IO u
> printer = awaitForever $ lift . print
> runP :: Monad m => Consumer l i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer l i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer l i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer l i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i
> await :: Monad m => Pipe l i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i
> 
> oldPipe :: Monad m => (i -> o) -> Pipe l i o u m r
> oldPipe f = forever $ await >>= yield . f
> 
> oldIdP :: Monad m => Pipe l i i u m r
> oldIdP = oldPipe id
> 
> oldFilterP :: Monad m => (i -> Bool) -> Pipe l i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> oldPrinter :: Show i => Consumer l i u IO r
> oldPrinter = forever $ await >>= lift . print

You can play with this code for yourself by downloading PipeClose.lhs.

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 6: Leftovers

One important use case of the Conduit library is parsing. In order to perform useful parsing, we need to be able to occasionally consume "too much" input, and then put the "leftovers" back into the input stream, as if they had never been consumed.

Today, we will extend the Pipe type yet again, creating a new primitive, leftover, comparable to that of Data.Conduit.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeLeftover where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void, absurd)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)
> import Control.Monad.Trans.Resource (MonadResource, allocate, release)

Functors

We’ll create yet another synonym for Const, this time called Leftover.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const
> newtype Leftover l next = Leftover l     -- Const
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
> 
> instance Functor Abort where
>   fmap _f Abort = Abort
> 
> instance Functor (Finalize m) where
>   fmap _f (Finalize m) = Finalize m
> 
> instance Functor (Leftover l) where
>   fmap _f (Leftover l) = Leftover l
> 
> pass :: Monad m => m ()
> pass = return ()
> 
> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer"

The Pipe type

The usage of leftover will be much like that of yield, we supply a value, and then carry on with our computation. We will therefore bundle Leftover with Then, as we did with YieldThen.

> type LeftoverThen l = Leftover l :&: Then
> type YieldThen o m  = Yield o :&: Finalize m :&: Then
> type AwaitU i u     = Await i :&: Await u :&: Then

PipeF and Pipe will acquire a new type parameter l which indicates the type of leftovers that a given pipe will supply.

> type PipeF l i o u m =  YieldThen o m :|: AwaitU i u
>                     :|: Abort         :|: LeftoverThen l
> type Pipe l i o u m r = FreeT (PipeF l i o u m) m r
> 
> type Producer   o   m r = Pipe Void () o    () m r
> type Consumer l i u m r = Pipe l    i  Void u  m r
> type Pipeline       m r = Pipe Void () Void () m r

Working with PipeF

Our lifting functions will be adjusted as usual: the pre-existing ones acquire another L, while the new one gets an R.

> liftYield :: YieldThen o m next ->              PipeF l i o u m next
> liftYield = L . L . L
> 
> liftAwait :: AwaitU i u next ->                 PipeF l i o u m next
> liftAwait = L . L . R
> 
> liftAbort :: Abort next ->                      PipeF l i o u m next
> liftAbort = L . R
> 
> liftLeftover :: LeftoverThen l next ->          PipeF l i o u m next
> liftLeftover = R

We add a smart constructor leftoverF in similar fashion to the ones we have already.

> yieldF :: o -> m () -> next ->                  PipeF l i o u m next
> yieldF o m next = liftYield $ Yield o :&: Finalize m :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF l i o u m next
> awaitF f g next = liftAwait $ Await f :&: Await g :&: Then next
> 
> abortF ::                                       PipeF l i o u m next
> abortF = liftAbort Abort
> 
> leftoverF :: l -> next ->                       PipeF l i o u m next
> leftoverF l next = liftLeftover $ Leftover l :&: Then next

And finally we add another branch to pipeCase.

> pipeCase :: FreeF (PipeF l i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (l -> next                          -> a) -- Leftover
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (L (R Abort)))
>   k _ _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ _ = k r
> pipeCase (Wrap (R (Leftover l :&: Then next)))
>   _ _ k _ _ = k l next
> pipeCase (Wrap (L (L (L (Yield o :&: Finalize m :&: Then next)))))
>   _ _ _ k _ = k o m next
> pipeCase (Wrap (L (L (R (Await f :&: Await g :&: Then next)))))
>   _ _ _ _ k = k f g next

Pipe primitives

Now that we’re old pros with liftF, the leftover primitive is a breeze.

> tryAwait :: Monad m =>      Pipe l i o u m (Either (Maybe u) i)
> tryAwait = liftF $ awaitF Right (Left . Just) (Left Nothing)
> 
> yield :: Monad m => o ->    Pipe l i o u m ()
> yield b = liftF $ yieldF b pass ()
> 
> abort :: Monad m =>         Pipe l i o u m r
> abort = liftF abortF
> 
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $ leftoverF l ()

Getting rid of leftovers

Being able to specify leftovers is one thing, but how do we interpret that? What does it mean when a pipe supplies leftovers? The "obvious" meaning is that the rest of the pipe computation should have that leftover value available to it the next time it awaits.

Let’s write an interpreter that will "inject" leftovers into a pipe, making them available to the pipe’s own awaits. The given pipe must therefore bear the restriction that the leftover type is the same as the input type. The resultant pipe will contain no leftover constructs, and so it can therefore be polymorphic in that type parameter.

The situation might arise where two leftovers are supplied in a row. What should we do then? Discard the old and keep the new? If we keep both, then which order should they be supplied back to the subsequent awaits?

Recall that Pipes are a form of stream processing. Suppose we represent the stream as a queue. await and yield are like the operations dequeue (taking from the front of a queue) and enqueue (adding to the back of a queue) respectively. The idea of "leftovers" is that we accidentally took "too much", and we want to reverse our actions. The logical conclusion, therefore, is that the leftover operation should "push" a value back onto the front of the queue.

> injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
> injectLeftovers = go [] where

Our "queue" is going to be represented by a list. An empty list means "please refer to the actual stream". A nonempty list means "I have these values that I took from the stream; please pretend like they’re still there."

>   go ls p = FreeT $ do
>     x <- runFreeT p
>     runFreeT $ pipeCase x
>     {- Abort  -} (abort)
>     {- Return -} (\r -> return r)
>     {- L-over -} (\l next -> go (l:ls) next)

When we encounter a leftover statement, we have yet another value we took from the stream, and we’d like to "put it back". We therefore cons it onto the front.

>     {- Yield  -} (\o fin next -> wrap $ yieldF o fin (go ls next))
>     {- Await  -} (\f g onAbort -> case ls of
>       [] -> wrap $ awaitF (go [] . f) (go [] . g) (go [] onAbort)
>       l : ls' -> go ls' (f l))

When we encounter an await, there are two possibilities: either we have an empty list, and we need to refer to the actual stream, or we have a nonempty list, and we can just take the top value. "Referring to the actual stream" translates to creating another await construct, while "just taking the top value" translates to invoking the f callback with the l value.

Pipe composition

The question arises: how are we supposed to compose two pipes that both might supply leftovers? There are a few possibilities.

If we allow them both to supply leftovers, then should we discard the leftovers from one pipe or the other? Perhaps the resultant pipe could simply have an Either union of the two types of leftovers.

The other option is to disallow leftovers from one or both pipes upon composing them. If we disallow leftovers from one pipe, then the resultant pipe will have the leftover type of the other one. If we disallow leftovers from both pipes, then there is no way for their composition to produce leftovers.

Given the nature of injectLeftovers, which associates leftovers with the "input" type i, and given that the resultant input type i comes from the upstream pipe, the logical choice seems to be to allow leftovers from the upstream pipe, but not the downstream pipe. We "disallow" leftovers by specifying that the type of leftovers for the downstream pipe is Void. It is impossible to construct a value of type Void, unless it is an infinite loop or an exception.

> (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2
> (<?<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <?< p2 = composeWithFinalizer unreachable p1 p2

All we have to change in pipe composition is to add branches for leftover whenever we pipeCase.

> composeWithFinalizer :: Monad m => m ()
>                  -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1
>   {- Abort  -} (      lift finalizeUpstream >> abort)
>   {- Return -} (\r -> lift finalizeUpstream >> return r)
>   {- L-over -} (\l _next -> absurd l)

Since the downstream pipe has a leftover type of Void, we can use absurd to assert that this branch should never happen.

>   {- Yield  -} (\o finalizeDownstream next ->
>                       let (<*<) = composeWithFinalizer finalizeUpstream
>                       in wrap $ yieldF o
>                           (finalizeUpstream >> finalizeDownstream)
>                           (next <*< p2))
>   {- Await  -} (\f1 g1 onAbort1 -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2
>     {- Abort  -} (    onAbort1 <+< abort) -- downstream recovers
>     {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers
>     {- L-over -} (\l next -> wrap $ leftoverF l (p1' <?< next))

If the upstream pipe produced a leftover, then we’ll keep it. Since upstream still has control, there is no reason to expect that the finalizer we provide to pipe composition will be used, so we’ll use the unreachable one. Note that the types make no guarantees about unreachable, rather, it is my own assertion. I arrived at the conclusion that the provided finalizer for this location would be unreachable by reasoning about the code, but I see no convenient way to encode or enforce this it in the type system.

>     {- Yield  -} (\o newFinalizer next ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)
>     {- Await  -} (\f2 g2 onAbort2 -> wrap $ awaitF
>                           (\i -> p1' <?< f2 i)
>                           (\u -> p1' <?< g2 u)
>                           (      p1' <?< onAbort2)))
> (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

Given that a pipeline cannot reasonably use yield or leftover, since those types are constrained to Void, let’s again make use of absurd to discharge us of the obligation to provide code for those branches.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (                  return Nothing)
>   {- Return -} (\r             -> return $ Just r)
>   {- L-over -} (\l _next       -> absurd l)
>   {- Yield  -} (\o _fin _next  -> absurd o)
>   {- Await  -} (\f _g _onAbort -> runPipe $ f ())

Adding finalizers to a pipe

There is little to say about the changes here. The leftover construct promises that there is is a next pipe, so we simply attach the cleanup actions to that next pipe, and that’s it.

> cleanupP :: Monad m => m () -> m () -> m () -> Pipe l i o u m r
>          -> Pipe l i o u m r
> cleanupP abortFinalize selfAbortFinalize returnFinalize = go where
>   go p = FreeT $ do
>     x <- runFreeT p
>     runFreeT $ pipeCase x
>     {- Abort  -} (      lift selfAbortFinalize >> abort)
>     {- Return -} (\r -> lift returnFinalize    >> return r)
>     {- L-over -} (\l next -> wrap $ leftoverF l (go next))
>     {- Yield  -} (\o finalizeRest next -> wrap $
>                         yieldF o (finalizeRest >> abortFinalize) (go next))
>     {- Await  -} (\f g onAbort -> wrap $
>                         awaitF (go . f) (go . g) (go onAbort))

Play time

Let’s give leftovers a spin!

ghci> :set -XNoMonomorphismRestriction


ghci> let p = leftover "hello" >> leftover "world" >> idP


ghci> runPipe $ execP <+< injectLeftovers p <+< fromList ["the", "end"]
  Just ["world","hello","the","end"]

Note that this is a horrible abuse of leftover. The concept of leftovers is that they are made as a way for you to put back onto the stream that which you have taken off.

Here’s perhaps a more sensible use of leftover: FORTH-style programming!

> swap :: Monad m => Pipe i i o u m ()
> swap = do 
>   i1 <- await
>   i2 <- await
>   leftover i1
>   leftover i2
> dup :: Monad m => Pipe i i o u m ()
> dup = do
>   i <- await
>   leftover i
>   leftover i
ghci> :set -XNoMonomorphismRestriction


ghci> let p = injectLeftovers (swap >> dup >> idP)


ghci> runPipe $ execP <+< p <+< fromList [1 .. 5]
  Just [2,2,1,3,4,5]

Perhaps the simplest use of leftovers is the ability to "peek" at the value coming next without consuming it.

> peekE :: Monad m => Pipe i i o u m (Either u i)
> peekE = awaitE >>= \ex -> case ex of
>   Left u  -> return (Left u)
>   Right i -> leftover i >> return (Right i)

Next time

I initially planned for the series to end right around here, but I have decided to extend it to touch on two more topics. Next time, we will extend our Pipe type with a new primitive, close, allowing it to signal that it is finished consuming input, so that upstream finalizers can be run as soon as possible. After that, we’ll take away close and abort, and compare the result to Data.Conduit, which has neither of those two features. Whether that is a "good" or "bad" thing is up for you to decide, but I’ll try to point out a few of the trade-offs.

Convenience combinators

> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize finalize
> 
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize finalize pass
> 
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass pass finalize
> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
>          -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $ allocate create destroy 
>   finallyP (release key) (mkPipe val)

Some basic pipes

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield
> awaitE :: Monad m => Pipe l i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
>   Left Nothing  -> abort
>   Left (Just u) -> return $ Left u
>   Right i       -> return $ Right i
> 
> awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
> 
> pipe :: Monad m => (i -> o) -> Pipe l i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe l i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe l i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer l i u IO u
> printer = awaitForever $ lift . print
> runP :: Monad m => Consumer l i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer l i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer l i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer l i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i
> await :: Monad m => Pipe l i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i
> 
> oldPipe :: Monad m => (i -> o) -> Pipe l i o u m r
> oldPipe f = forever $ await >>= yield . f
> 
> oldIdP :: Monad m => Pipe l i i u m r
> oldIdP = oldPipe id
> 
> oldFilterP :: Monad m => (i -> Bool) -> Pipe l i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> oldPrinter :: Show i => Consumer l i u IO r
> oldPrinter = forever $ await >>= lift . print

You can play with this code for yourself by downloading PipeLeftover.lhs.

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 5: Finalizers

Last time we introduced abort recovery, allowing downstream pipes to recover from an abort. We were able to write the recover combinator, which could attach a recovery pipe to any other pipe.

Today, we’ll look at a different aspect of pipe termination: finalizers. As we have discussed before, downstream pipes may discard upstream pipes when they are done with them, whether the upstream pipe has returned a result or not. That pipe may have unfinished business: for example, open file handles or database connections that need to be closed. We’d like to be able to dictate arbitrary actions which will always be performed before a pipe is discarded.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeFinalize where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)
> import Control.Monad.Trans.Resource (MonadResource, allocate, release)

Functors

We’ll add another Const synonym, Finalize. This one is parameterized by a monad m, and contains an arbitrary action in that monad: m ().

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
> 
> instance Functor Abort where
>   fmap _f Abort = Abort
> 
> instance Functor (Finalize m) where
>   fmap _f (Finalize m) = Finalize m

There will be times when a finalizer is expected, but we have none to give, and we don’t want anything to occur. We’ll just return () in those cases, so how about a nicer name for that idiom.

> pass :: Monad m => m ()
> pass = return ()

There will also come a time when we must supply a finalizer, but we never expect it to be used. We could use pass for that, too, but instead, let’s use an exploding bomb with a message attached. We’d like to be informed if the unreachable is reached.

> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer!"

The Pipe type

We will attach the Finalize information to the Yield information. That way, when upstream yields to downstream, and downstream decides to discard upstream, downstream can use the latest finalizer it acquired from upstream.

That was a lot of up and down so reread that sentence a few times until it becomes clear. It sounds childish, but I find these things tend to make more sense when I wave my hand left when I read "downstream" and right when I read "upstream". It’s also more fun when you add other gestures for verbs.

> type YieldThen o m = Yield o :&: Finalize m :&: Then
> type AwaitU i u    = Await i :&: Await u :&: Then
> type PipeF i o u m = YieldThen o m :|: AwaitU i u :|: Abort
> type Pipe i o u m r = FreeT (PipeF i o u m) m r

Pay special attention to how Pipe is defined here. It makes sure that m is the same m given to both the PipeF functor and to FreeT. See if you can explain why this is significant.

> type Producer o   m r = Pipe () o    () m r
> type Consumer i u m r = Pipe i  Void u  m r
> type Pipeline     m r = Pipe () Void () m r

Working with PipeF

The yieldF smart constructor is extended appropriately, as is pipeCase.

> liftYield :: YieldThen o m next ->              PipeF i o u m next
> liftYield = L . L
> 
> liftAwait :: AwaitU i u next ->                 PipeF i o u m next
> liftAwait = L . R
> 
> liftAbort :: Abort next ->                      PipeF i o u m next
> liftAbort = R
> 
> yieldF :: o -> m () -> next ->                  PipeF i o u m next
> yieldF o fin next = liftYield $ Yield o :&: Finalize fin :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF i o u m next
> awaitF f g next = liftAwait $ Await f :&: Await g :&: Then next
> 
> abortF :: PipeF i o u m next
> abortF = liftAbort Abort
> pipeCase :: FreeF (PipeF i o u m) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (o -> m () -> next                  -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (R Abort))
>   k _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ = k r
> pipeCase (Wrap (L (L (Yield o :&: Finalize fin :&: Then next))))
>   _ _ k _ = k o fin next
> pipeCase (Wrap (L (R (Await f :&: Await g :&: Then next))))
>   _ _ _ k = k f g next

Pipe primitives

The yield primitive should have no finalizer attached, so we just give it pass for that slot.

> tryAwait :: Monad m => Pipe i o u m (Either (Maybe u) i)
> tryAwait = liftF $ awaitF Right (Left . Just) (Left Nothing)
> 
> yield :: Monad m => o -> Pipe i o u m ()
> yield b = liftF $ yieldF b pass ()
> 
> abort :: Monad m => Pipe i o u m r
> abort = liftF abortF

Pipe composition

The type of composition again remains the same, however, we now need to keep track of an additional argument: the most recent upstream finalizer. We can still keep (<+<), but this will just be a synonym for the new composition function, supplying it the empty finalizer, pass.

> (<+<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2

It will also be convenient to define composition using the unreachable finalizer. You’ll see why momentarily.

> (<?<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <?< p2 = composeWithFinalizer unreachable p1 p2
> composeWithFinalizer :: Monad m => m ()
>                  -> Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1

And now the fun begins. Wherever we used to recursively invoke (<+<), we now need to consider: do we need to retain the current upstream finalizer? To maintain some similarity with previous code, whenever we need to invoke composeWithFinalizer recursively, we’ll let-bind a new operator (<*<), which will have some particular finalizer baked in: which one depends on each situation as we will soon see. (Recall that we also have <+< and <?< at our disposal, which have pass and unreachable finalizers baked in, respectively.)

>   {- Abort  -} (      lift finalizeUpstream >> abort)
>   {- Return -} (\r -> lift finalizeUpstream >> return r)

Upon reaching a downstream abort or return, we are going to discard the upstream pipe, so we must run the finalizer. Since Pipe is an instance of MonadTrans (by virtue of being a synonym for a FreeT), we can simply lift the finalizer into a pipe, and then sequence it (>>) with the appropriate result.

>   {- Yield  -} (\o finalizeDownstream next ->
>                       let (<*<) = composeWithFinalizer finalizeUpstream
>                       in wrap $ yieldF o
>                           (finalizeUpstream >> finalizeDownstream)
>                           (next <*< p2))

If the downstream pipe is yielding a result, then both the upstream and the downstream pipe are at peril of being discarded by a pipe further down the line. Fortunately, the yield construct provides an appropriate finalizer for p1, and we already have an appropriate finalizer for p2, so we’ll just bundle them together in a new yield construct. But which of the two should we run first? I chose to run the upstream finalizer first, and I’ll explain why later in this post.

In the event that control returns to our downstream pipe, we need not worry about finalizeDownstream, because p1 is once again in control. Therefore, when we compose next with p2, we only bundle in finalizeUpstream.

>   {- Await  -} (\f1 g1 onAbort1 -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2
>     {- Abort  -} (    onAbort1 <+< abort) -- downstream recovers
>     {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers

In the event that downstream is awaiting, control transfers upstream. If the upstream pipe is returning or aborting, then we no longer need to care about finalizing it: it has already finalized itself by this point. Therefore, we can use the regular (<+<) operator for these cases, and forget about the finalizeUpstream we used to have.

>     {- Yield  -} (\o newFinalizer next ->
>                       let (<*<) = composeWithFinalizer newFinalizer
>                       in f1 o <*< next)

If downstream is awaiting, and upstream is yielding, then that means the upstream pipe has provided a newFinalizer to use instead of the old one.

>     {- Await  -} (\f2 g2 onAbort2 -> wrap $ awaitF
>                           (\i -> p1' <?< f2 i)
>                           (\u -> p1' <?< g2 u)
>                           (      p1' <?< onAbort2)))

When both p1 and p2 are awaiting, well that is an interesting case. Consider: p2 is transferring control further upstream. When control comes back to p2, p1 will still be awaiting. The only way that control will transfer back down to p1 is if p2 decides to abort, return, or yield. If it aborts or returns, then it will have finalized itself. If it yields, then it will supply a brand new finalizer.

So the question is, when we re-compose p1 with either f2 i, g2 u, or onAbort2, what finalizer should we use? From what I just said in the previous paragraph, it should be apparent that no matter what finalizer we provide here, it will never be used. So we’ll just hand it the exploding bomb: unreachable.

> (>+>) :: Monad m => Pipe i i' u m u' -> Pipe i' o u' m r -> Pipe i o u m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Phew, we made it through again. Finalization is tricky: each case requires careful thought and analysis in order to make sure you are doing the right thing. But did we really do the right thing by using unreachable? Are you sure? Review the code, and think about it. Why did we use <+< for upstream Return and Abort cases instead of <?<?

Running a pipeline

A yielded finalizer makes no difference to runPipe.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (                  return Nothing)
>   {- Return -} (\r             -> return $ Just r)
>   {- Yield  -} (\_o _fin next  -> runPipe next)
>   {- Await  -} (\f _g _onAbort -> runPipe $ f ())

Adding finalizers to a pipe

Well being able to compose pipes with finalizers is well and good, but how do we add finalizers to pipes in the first place? Let’s create a new pipe primitive: cleanupP.

> cleanupP :: Monad m => m () -> m () -> m () -> Pipe i o u m r
>          -> Pipe i o u m r
> cleanupP abortFinalize selfAbortFinalize returnFinalize = go where
>   go p = FreeT $ do
>     x <- runFreeT p
>     runFreeT $ pipeCase x

By inspecting a given pipe via pipeCase, we can attach finalizers in three distinct places.

>     {- Abort  -} (      lift selfAbortFinalize >> abort)

Any pipe can decide to abort. For example, in a previous blog post, we created the await pseudo-primitive, which voluntarily aborts if an upstream pipe aborts or returns.

>     {- Return -} (\r -> lift returnFinalize    >> return r)

Any pipe can decide to return. This is another opportunity for finalization.

>     {- Yield  -} (\o finalizeRest next -> wrap $
>                         yieldF o (finalizeRest >> abortFinalize) (go next))

Finally, any pipe can be discarded when it yields control to a downstream pipe. A yield construct may already have finalizers associated with it, so when we add our new one, we’ll just tack it on at the end. We could have just as easily decided to put the new finalizer first; we’ll discuss that decision momentarily.

Notice that we also recursively apply this finalizer to the next pipe after yield. That’s because if control returns to this pipe from downstream, then we still want to finalize it later.

>     {- Await  -} (\f g onAbort -> wrap $
>                         awaitF (go . f) (go . g) (go onAbort))

We anticipate each possibility in the await case, and recursively apply the finalizer to all of them.

More convenient finalization combinators

cleanupP is too general to be useful. Let’s create some convenience combinators to handle typical needs.

> finallyP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r
> finallyP finalize = cleanupP finalize finalize finalize

If we want a given finalizer run no matter what, then just use it for all 3 possibilities.

> catchP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r
> catchP finalize = cleanupP finalize finalize pass

If we only want a finalizer to run if something "goes wrong", then we simply pass on the return option.

> successP :: Monad m => m () -> Pipe i o u m r -> Pipe i o u m r
> successP finalize = cleanupP pass pass finalize

Conversely, we may only want a finalizer to run in the absence of "problems", so we pass on both "problem" cases.

> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe i o u m r)
>          -> Pipe i o u m r
> bracketP create destroy mkPipe = do
>   (key, val) <- lift $ allocate create destroy 
>   finallyP (release key) (mkPipe val)

ResourceT provides allocate and release to help you deal with finalizers, even in the face of thrown exceptions. We can make good use of this by lifting allocate into a Pipe, and then adding the corresponding release as a finalizer!

How do we know which finalizer comes first?

I’ve defined a few quick-n-dirty functions here to help us observe the behavior of pipe finalization.

> idMsg :: String -> Pipe i i u IO u
> idMsg str = finallyP (putStr $ str ++ " ") idP
> 
> take' :: Monad m => Int -> Pipe i i u m ()
> take' 0 = pass
> take' n = (await >>= yield) >> take' (pred n)

testPipeR will test what happens when abort comes from upstream.

> testPipeR :: Monad m => Pipe i o u m r -> m (Maybe r)
> testPipeR p = runPipe $ (await >> abort) <+< p <+< abort

testPipeL will test what happens when abort comes from downstream.

> testPipeL :: Monad m => Pipe Int o () m r -> m (Maybe r)
> testPipeL p = runPipe $ (await >> await >> abort) <+< take' 1 <+< p <+< fromList [1 ..]

testPipe will test what happens when abort comes from within the pipe itself.

> testPipe :: Monad m => Pipe Int o () m r -> m (Maybe (r, [o]))
> testPipe p = runPipe $ runP <+< p <+< fromList [1..]
> examplePipe :: Pipe Int Int u IO ()
> examplePipe = idMsg "one" <+< take' 5 <+< idMsg "two" <+< idMsg "three"

Let’s take this for a spin.

ghci> testPipeR examplePipe
  three two one Nothing

ghci> testPipeL examplePipe
  three two one Nothing

ghci> testPipe examplePipe
  three two one Just ((),[1,2,3,4,5])

Well that’s boring. In each case the finalizers run in order from upstream to downstream: "three two one". But it’s boring on purpose: the way that I have defined for finalizers to behave is that if you are a pipe, and your finalizer is running, you can safely assume that any pipes upstream of you have already been finalized.

I encourage you to download this code , and mess with it (requires Fun.lhs as well, tested on GHC 7.4.1). What happens when you switch the order of finalizers on line 204 (pipe composition)? What happens when you switch the order of finalizers on line 317 (cleanupP)? What if you switch both? Can you think of any circumstances when you’d want a pipe’s finalizer to run before pipes upstream of it are finalized? You can use this command to run the ghci examples and see the difference between the expected output and the actual output:

$ BlogLiterately -g PipeFinalize.lhs > test.html && firefox test.html

Next time

The subtleties of finalization provide us a lot to think about. There is again room for many possible implementations, but logic and seeking consistent behavior can help us narrow the possibilities, and Haskell’s type system often guides us to the "obvious" solution.

Next time, we’ll tackle the "leftovers" feature, using the same style as conduit. I’ll try to point out all of the areas where different implementations are possible, because I feel that the decisions are less clear for leftovers than for previous features.

Some basic pipes

Here’s all of those pipes from previous posts. They remain unchanged: you can ignore the new finalizer capability that we added and go right along writing pipes just like you did before we added this feature.

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield
> 
> awaitE :: Monad m => Pipe i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
>   Left Nothing  -> abort
>   Left (Just u) -> return $ Left u
>   Right i       -> return $ Right i
> 
> awaitForever :: Monad m => (i -> Pipe i o u m r) -> Pipe i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
> 
> pipe :: Monad m => (i -> o) -> Pipe i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer i u IO u
> printer = awaitForever $ lift . print
> 
> runP :: Monad m => Consumer i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i
> await :: Monad m => Pipe i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i
> 
> oldPipe :: Monad m => (i -> o) -> Pipe i o u m r
> oldPipe f = forever $ await >>= yield . f
> 
> oldIdP :: Monad m => Pipe i i u m r
> oldIdP = oldPipe id
> 
> oldFilterP :: Monad m => (i -> Bool) -> Pipe i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> oldPrinter :: Show i => Consumer i u IO r
> oldPrinter = forever $ await >>= lift . print

You can play with this code for yourself by downloading PipeFinalize.lhs.

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 4: Recovering from Abort

Last time, we introduced the abort primitive, which restored the power to write pipes a la Control.Pipe. However, the power for upstream pipes to force those downstream to abort is perhaps too much. This time, we’re going to give downstream pipes the ability to recover from an upstream abort.

The changes made to the code from last time are minimal in this post; in fact it barely qualifies as worthy of its own post. However, once we’ve seen the changes there is something important that I would like to point out. If you’ve been following along with the series, then you can just skim over most of the code.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeRecover where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)

Functors

Nothing new here.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
> 
> instance Functor Abort where
>   fmap _f Abort = Abort

The Pipe type

We will grant downstream pipes the ability to handle an abort in a similar fashion to the way we granted them the ability to handle upstream results: by extending our Await with another callback. This time, there is no input, so we simply use the Then functor:

> type YieldThen o = Yield o :&: Then
> type AwaitU i u = Await i :&: Await u :&: Then
> type PipeF i o u = YieldThen o :|: AwaitU i u :|: Abort
> type Pipe i o u  = FreeT (PipeF i o u)
> 
> type Producer o   = Pipe () o    ()
> type Consumer i u = Pipe i  Void u
> type Pipeline     = Pipe () Void ()

Working with PipeF

Little changes in this section. We merely enhance awaitF to accept the third input, and pipeCase to similarly handle the extra callback.

> liftYield :: YieldThen o next ->                PipeF i o u next
> liftYield = L . L
> 
> liftAwait :: AwaitU i u next ->                 PipeF i o u next
> liftAwait = L . R
> 
> liftAbort :: Abort next ->                      PipeF i o u next
> liftAbort = R
> 
> yieldF :: o -> next ->                          PipeF i o u next
> yieldF o next = liftYield $ Yield o :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF i o u next
> awaitF f g next = liftAwait $ Await f :&: Await g :&: Then next
> 
> abortF :: PipeF i o u next
> abortF = liftAbort Abort
> pipeCase :: FreeF (PipeF i o u) r next
>  ->                                        a  -- Abort
>  -> (r                                  -> a) -- Return
>  -> (o -> next                          -> a) -- Yield
>  -> ((i -> next) -> (u -> next) -> next -> a) -- Await
>                                         -> a
> pipeCase (Wrap (R Abort))
>   k _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ = k r
> pipeCase (Wrap (L (L (Yield o :&: Then next))))
>   _ _ k _ = k o next
> pipeCase (Wrap (L (R (Await f :&: Await g :&: Then next))))
>   _ _ _ k = k f g next

Pipe primitives

awaitE is no longer sufficient for our needs, we need to extend our await primitive yet again. Where before we promised Either u i, we must now add a third possibility: upstream abort. There is no additional information associated with this, so let’s use a Maybe: Nothing will signal that an abort has occurred upstream.

We therefore have 3 choices: Maybe (Either u i), Either (Maybe u) i, or Either u (Maybe i). I like the second choice, because we can preserve Left as signaling upstream termination, whether that be an abort or a return.

> tryAwait :: Monad m => Pipe i o u m (Either (Maybe u) i)
> tryAwait = liftF $ awaitF Right (Left . Just) (Left Nothing)
> 
> yield :: Monad m => o -> Pipe i o u m ()
> yield b = liftF $ yieldF b ()
> 
> abort :: Monad m => Pipe i o u m r
> abort = liftF abortF

Pipe composition

The changes to pipe composition are minimal: when downstream awaits on upstream, and upstream aborts, then make use of the provided callback.

> (<+<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <+< p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1
>   {- Abort  -} (abort)               -- upstream discarded
>   {- Return -} (\r      -> return r) -- upstream discarded
>   {- Yield  -} (\o next -> wrap $ yieldF o (next <+< p2))
>   {- Await  -} (\f1 g1 onAbort1 -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2

v This is the part that changed

>     {- Abort  -} (        onAbort1 <+< abort) -- downstream recovers

^ This is the part that changed

>     {- Return -} (\u'     -> g1 u' <+< abort) -- downstream recovers
>     {- Yield  -} (\o next -> f1 o  <+< next)
>     {- Await  -} (\f2 g2 onAbort2 -> wrap $ awaitF
>                                            (\i -> p1' <+< f2 i)
>                                            (\u -> p1' <+< g2 u)
>                                            (      p1' <+< onAbort2)))
> (>+>) :: Monad m => Pipe i i' u m u' -> Pipe i' o u' m r -> Pipe i o u m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

When running a pipe, the new callback makes no difference.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (                  return Nothing)
>   {- Return -} (\r             -> return $ Just r)
>   {- Yield  -} (\_o next       -> runPipe next)
>   {- Await  -} (\f _g _onAbort -> runPipe $ f ())

Some basic pipes

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield

We can easily write awaitE from our last post: we simply give up our chance of recovering from an abort in the Left Nothing case.

> awaitE :: Monad m => Pipe i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
>   Left Nothing  -> abort
>   Left (Just u) -> return $ Left u
>   Right i       -> return $ Right i

From there, all of the code from the last post is possible. (Skim past if you’ve seen it already.)

> awaitForever :: Monad m => (i -> Pipe i o u m r) -> Pipe i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
> 
> pipe :: Monad m => (i -> o) -> Pipe i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer i u IO u
> printer = awaitForever $ lift . print
> runP :: Monad m => Consumer i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i
> await :: Monad m => Pipe i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i
> 
> oldPipe :: Monad m => (i -> o) -> Pipe i o u m r
> oldPipe f = forever $ await >>= yield . f
> 
> oldIdP :: Monad m => Pipe i i u m r
> oldIdP = oldPipe id
> 
> oldFilterP :: Monad m => (i -> Bool) -> Pipe i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> oldPrinter :: Show i => Consumer i u IO r
> oldPrinter = forever $ await >>= lift . print

Primitives for recovering from an abort

We can enhance any pipe by giving it new instructions whenever it or its upstream connection aborts. We’ll create a new primitive called recover to accomplish this. Anything that is written with pipeCase, or that uses either wrap or liftF, I call a "primitive". If it can be written in terms of other primitives, without using pipeCase, liftF, or wrap (and obviously without digging into a pipe’s internals any other way) then I don’t consider it to be a "primitive".

> recover :: Monad m => Pipe i o u m r -> Pipe i o u m r -> Pipe i o u m r
> originalP `recover` newP = FreeT $ do
>   x <- runFreeT originalP
>   runFreeT $ pipeCase x
>   {- Abort  -} (newP)
>   {- Return -} (\r -> return r)
>   {- Yield  -} (\o next -> wrap $ yieldF o (next `recover` newP))
>   {- Await  -} (\f g onAbort -> let go p = p `recover` newP in
>                            wrap $ awaitF (go . f) (go . g) (go onAbort))
> 
> recoverWith :: Monad m => Pipe i o u m r -> r -> Pipe i o u m r
> p `recoverWith` r = p `recover` return r

Here’s a little ghci session comparing and contrasting uses of "old pipe" programming and recovery with "new pipes".

ghci> let idThen r = oldIdP `recoverWith` r


ghci> runPipe $ runP <+< idThen 5 <+< fromList [1..3]
  Just (5,[1,2,3])

ghci> runPipe $ runP <+< oldIdP <+< fromList [1..3]
  Nothing

ghci> runPipe $ runP <+< fmap (const 5) oldIdP <+< fromList [1..3]
  Nothing

ghci> runPipe $ runP <+< fmap (const 5) idP <+< fromList [1..3]
  Just (5,[1,2,3])

Next time, We’ll be doing something very similar to this, in order to add arbitrary finalizers to pipes. Go back and review the implementation of recover. Do you understand how it works? Could we have written recover using the code from last time? How does this version of recover make use of the change we made to the PipeF functor?

Maybe r versus Abort

Now that we’ve given downstream pipes the ability to recover from an upstream abort again, aren’t we back where we were at two blog posts ago before we ever had abort?

Consider back when we didn’t have abort. What if we simply constrained Pipe to have a result type of Maybe r?

type Part2PipeF i o u = YieldThen o :|: AwaitU i u
type Part2Pipe i o u  = FreeT (PipeF i o u)

type Pipe i o u m r = Part2Pipe i o u m (Maybe r)

(Also consider that MaybeT (Part2Pipe i o u m) r is isomorphic to Part2Pipe i o u m (Maybe r), so the following applies similarly to sticking a MaybeT on top.)

There would be a few consequences.

  • The awaitE primitive would produce a Either (Maybe u) r instead of Either u r.
  • runPipe would produce m (Maybe r) instead of plain old m r.
  • The pipe that trivially returns Nothing could be completely polymorphic, and would be able to fill any hole shaped like a Pipe.
  • Pipe composition would have to deal with Maybes. It could constantly return an upstream result of Nothing after the first time of returning a meaningful result.

These should all sound extremely familiar, because they are nearly identical to the code in this very file!

What’s the point of Abort?

Recall that last time, we added abort with the motivation that we wanted to write pipes the old way like we used to with Control.Pipe. By allowing upstream pipes to abort the pipeline, we were able to write code using the blissful await primitive that simply relied on the automatic termination properties of pipes.

Notice how Conduit (referring to version 0.5.x) does not provide any "untainted" await primitive. The ones it provides are

await :: Pipe l i o u m (Maybe i)
awaitE :: Pipe l i o u m (Either u i)
awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r

Always await with a caveat. But this allows the conduit version of runPipe to not deal with Maybes. It’s a trade-off: where do you want to deal with the possibility that a pipe has shut down? If you aren’t forced to deal with it inside your pipes code, then you are instead forced to deal with it at the level of runPipe. Or, you can just revert to the old-school Control.Pipe way, and retrieve the result from whichever pipe in a pipeline produces the first result.

What if we want the best of both worlds, and want pipe composition without constraining result types to be the same, but nevertheless want to provide an additional parameter e that any pipe can abort to, so that runPipe is guaranteed to have something instead of the possibility of Nothing. Sounds like a job for Either! Paolo, in his pipes-core fork of pipes, provides an EitherT e on top of (essentially) Part2Pipe, but with the additional constraint that e be the same as u!

As you can see, when designing a pipes library, there are a lot of potential trade-offs; it’s not very black-and-white which ones are the "best" ones to choose. We’ll just have to keep gaining practical experience with and proving properties of the various options. Keep an eye on pipes and pipes-core, because it seems that Paolo and Gabriel are starting to agree more than usual! conduit has undergone massive changes since its inception around 8 months ago (while nevertheless surprisingly retaining much of the same API); I wouldn’t be surprised to see these three packages merge into one within the next year or two.

Next time

I’ll continue from here with abort still intact, but at the end of the series we will remove it, just to show how our end result is identical to conduit. At least I think it will be. We’ll see when we get there.

This time we created the recover combinator, which affords us some amount of fault tolerance. Next time, we’ll add proper pipe finalization hooks. By combining these with ResourceT, we can provide convenient, guaranteed, exception-safe resource finalization.

newtype Finalize m next = Finalize (m ())
type YieldThen o m = Yield o :&: Finalize m :&: Then

You can play with this code for yourself by downloading PipeRecover.lhs.

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 3: Abort

Last time, we enhanced the await primitive, making it aware of when the upstream pipe returned a value. However, the change forced us to modify our style of programming. This is not necessarily a bad thing, but today, we’ll recover the old capabilities we had by adding a new primitive: abort. This will restore the ability for upstream pipes to shut down the pipeline.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeAbort where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)

Functors

We finally revisit our fourth old friend, the Empty functor, and give it the name Abort. Recall that the Empty functor allows us to short circuit computation without providing any other information.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort                  -- Empty
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)
> 
> instance Functor Abort where
>   fmap _f Abort = Abort

The Pipe type

> type YieldThen o = Yield o :&: Then
> type AwaitU i u = Await i :&: Await u

With our shiny new Abort functor in hand, we just union it in with the other options in a PipeF.

> type PipeF i o u = YieldThen o :|: AwaitU i u :|: Abort
> type Pipe i o u  = FreeT (PipeF i o u)
> 
> type Producer o   = Pipe () o    ()
> type Consumer i u = Pipe i  Void u
> type Pipeline     = Pipe () Void ()

Working with PipeF

I’ve defined :|: to be left-associative, which means that we can simply union another thing onto the right side, and wrap everything we used to have in a big L. This change is reflected in the lifting functions.

> liftYield :: YieldThen o next ->        PipeF i o u next
> liftYield = L . L
> 
> liftAwait :: AwaitU i u next ->         PipeF i o u next
> liftAwait = L . R
> 
> liftAbort :: Abort next ->              PipeF i o u next
> liftAbort = R
> 
> yieldF :: o -> next ->                  PipeF i o u next
> yieldF o next = liftYield $ Yield o :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> PipeF i o u next
> awaitF f g = liftAwait $ Await f :&: Await g
> 
> abortF :: PipeF i o u next
> abortF = liftAbort Abort

I’ve added a smart constructor for Abort, which is entirely straightforward. We’ll need to add another branch to our pipeCase construct. pipeCase must be prepared with a default a, because Abort provides absolutely no information.

> pipeCase :: FreeF (PipeF i o u) r next
>          ->                                a  -- Abort
>          -> (r                          -> a) -- Return
>          -> (o -> next                  -> a) -- Yield
>          -> ((i -> next) -> (u -> next) -> a) -- Await
>                                         -> a
> pipeCase (Wrap (R Abort))
>   k _ _ _ = k
> pipeCase (Return r)
>   _ k _ _ = k r
> pipeCase (Wrap (L (L (Yield o :&: Then next))))
>   _ _ k _ = k o next
> pipeCase (Wrap (L (R (Await f :&: Await g))))
>   _ _ _ k = k f g

Pipe primitives

> awaitE :: Monad m => Pipe i o u m (Either u i)
> awaitE  = liftF $ awaitF Right Left
> 
> yield :: Monad m => o -> Pipe i o u m ()
> yield b = liftF $ yieldF b ()
> 
> abort :: Monad m => Pipe i o u m r
> abort = liftF abortF

Our primitives remain unchanged. We add the abort primitive; notice that it is polymorphic in its return type. In fact, it’s polymorphic in, well, everything. Its complete lack of information means that it can be used to fill any hole that has the shape of a Pipe.

Pipe composition

The type of pipe composition does not change with this modification.

> (<+<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
> p1 <+< p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1

Everywhere we used pipeCase, we’ll need to add the extra branch for the Abort case. If the downstream pipe aborted, then everything upstream is discarded, as it is when downstream returns a value.

>   {- Abort  -} (abort)               -- upstream discarded
>   {- Return -} (\r      -> return r) -- upstream discarded
>   {- Yield  -} (\o next -> wrap $ yieldF o (next <+< p2))
>   {- Await  -} (\f1 g1  -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2

If the upstream pipe aborted, then downstream is forcibly aborted as well, meaning that the downstream pipe is discarded.

>     {- Abort  -} (abort)             -- downstream discarded

When the upstream pipe produces a result, we’ll give that result to the appropriate downstream handler. We used to then regurgitate the same result over and over to the downstream pipe every time it asked.

{- Return -} (\u' -> g1 u' <+< return u')

We’re going to change that behavior now. Instead, we will cause an abort if downstream ever awaits after receiving the upstream’s final result.

>     {- Return -} (\u'     -> g1 u' <+< abort) -- downstream gets one last shot

The rest remains as before.

>     {- Yield  -} (\o next -> f1 o  <+< next)
>     {- Await  -} (\f2 g2  -> wrap $ awaitF (\i -> p1' <+< f2 i)
>                                            (\u -> p1' <+< g2 u)))

If idP is like multiplying by 1, then abort is like multiplying by 0. Sort of. As always, downstream drives, so if the upstream pipe is abort, but the downstream never consults upstream, then downstream can continue on its merry way for as long as it wants.

\displaystyle \forall p \in Pipe, abort \circ p \equiv abort

\displaystyle \forall p \in Producer, p \circ abort \equiv p

Note that our current Producer type is not strong enough to actually guarantee this: it only restricts the input type to (), rather than preventing awaits altogether.

> (>+>) :: Monad m => Pipe i i' u m u' -> Pipe i' o u' m r -> Pipe i o u m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

Now that a pipeline might abort at any time without a result, we need to adjust runPipe to take this possibility of failure into account. Instead of producing m r, we’ll produce a m (Maybe r). If the pipeline is aborted, Nothing is produced as the result.

> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Abort  -} (return Nothing)
>   {- Return -} (\r       -> return $ Just r)
>   {- Yield  -} (\_o next -> runPipe next)
>   {- Await  -} (\f _g    -> runPipe $ f ())

Some basic pipes

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield

We can still write the same pipes as before. awaitForever never asks for input after it gets the upstream result, so it will never be the source of an abort.

> awaitForever :: Monad m => (i -> Pipe i o u m r) -> Pipe i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go
> 
> pipe :: Monad m => (i -> o) -> Pipe i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer i u IO u
> printer = awaitForever $ lift . print
> runP :: Monad m => Consumer i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i

Bringing back the good(?) stuff

Now that we are equipped with both the abort and awaitE primitives, we can reproduce the good ol’ await that we had from before:

> await :: Monad m => Pipe i o u m i
> await = awaitE >>= \ex -> case ex of
>   Left _u -> abort
>   Right i -> return i

That means that we can resurrect the old style of pipe programming right alongside the new style:

> oldPipe :: Monad m => (i -> o) -> Pipe i o u m r
> oldPipe f = forever $ await >>= yield . f
> 
> oldIdP :: Monad m => Pipe i i u m r
> oldIdP = oldPipe id
> 
> oldFilterP :: Monad m => (i -> Bool) -> Pipe i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> oldPrinter :: Show i => Consumer i u IO r
> oldPrinter = forever $ await >>= lift . print

This code is identical to the code we had from part 1. Neat, huh? Notice how these versions of id, filter, etc, do not bear the restriction that u = r. However, they doesn’t behave exactly the same as before, because abort causes the pipeline to fail without any result.

ghci> runPipe $ (printer >> return "not hijacked") <+< return "hijacked"
  Just "not hijacked"

ghci> runPipe $ (oldPrinter >> return "not hijacked") <+< return "hijacked"
  Nothing

Next time

We’ve granted upstream pipes the power to abort downstream pipes that await on them, but is this too much power? What if downstream doesn’t want to go down? Next time, we’ll up the granularity of control once more by allowing downstream pipes to provide a handler for the case of an aborted upstream. Once we have that in place, we can start thinking about guaranteed finalizers.

You can play with this code for yourself by downloading PipeAbort.lhs.

Posted in Uncategorized | 1 Comment

Pipes to Conduits part 2: Upstream Results

Last time, we reimplemented Control.Pipe, with basic await and yield functionality. However, in order to compose two pipes, their result types had to be the same, and whenever any pipe in a pipeline reached its return, it would bring down all the other pipes composed with it.

This time, we’ll modify the await primitive, forcing the user to deal with the possibility that the upstream pipe has completed and returned a value.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module PipeU where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void)
> import Control.Monad (when)
> import Control.Monad.Trans.Class (lift)

Functors

We’ll use all the same functors as before. You can compare this code with the code from last time to see exactly which changes have taken place. We’ll add one more convention, which is to use the type variable u to describe the return type of an upstream pipe.

> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun
> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)

The Pipe type

We’re going to modify Await so that it also considers the possibility of a completed upstream pipe. That means that anytime you await, you could get an i or a u, and you need to be prepared to handle both situations.

> type YieldThen o = Yield o :&: Then
> type AwaitU i u = Await i :&: Await u

Because AwaitU demands a new type variable u, so must PipeF and Pipe.

> type PipeF i o u = YieldThen o :|: AwaitU i u
> type Pipe i o u  = FreeT (PipeF i o u)

We’ll add u to the Consumer type, because consumers must now be aware of the upstream result of the pipe they are composed with. We’ll use the trivial upstream result () for Producers and Pipelines, since they will never get one anyways.

> type Producer o   = Pipe () o    ()
> type Consumer i u = Pipe i  Void u
> type Pipeline     = Pipe () Void ()

Remember: Consumers are always as far downstream as possible, while Producers are always as far upstream as possible. A Pipeline is neither up nor down, since it is self-contained and therefore cannot be sensibly composed with any other pipes, except trivial ones such as idP.

Working with PipeF

Our "lifting" helpers remain the same, except we must add the type variable u everywhere we have an AwaitU, PipeF, or Pipe. Notice how awaitF now has two inputs: the function to deal with a regular yielded value f :: i -> next, and the function to deal with a returned result g :: u -> next.

> liftYield :: YieldThen o next ->        PipeF i o u next
> liftYield = L
> 
> liftAwait :: AwaitU i u next ->         PipeF i o u next
> liftAwait = R
> 
> yieldF :: o -> next ->                  PipeF i o u next
> yieldF o next = liftYield $ Yield o :&: Then next
> 
> awaitF :: (i -> next) -> (u -> next) -> PipeF i o u next
> awaitF f g = liftAwait $ Await f :&: Await g

We also update pipeCase to reflect the new function that is bundled with an AwaitU.

> pipeCase :: FreeF (PipeF i o u) r next
>          -> (r                          -> a) -- Return
>          -> (o -> next                  -> a) -- Yield
>          -> ((i -> next) -> (u -> next) -> a) -- Await
>                                         -> a
> pipeCase (Return r) k _ _ = k r
> pipeCase (Wrap (L (Yield o :&: Then next)))
>                     _ k _ = k o next
> pipeCase (Wrap (R (Await f :&: Await g)))
>                     _ _ k = k f g

Now stop; let’s have a little chat about awaitF. We expect the user to somehow indirectly supply a function g :: u -> Pipe i o u m r whenever they await, to handle the possibility that the upstream pipe has completed. But if that’s the case, wouldn’t it make more sense to shut off the input and upstream ends of the pipe afterwards? g :: u -> Pipe () o () m r, or using a synonym, g :: u -> Producer o m r. Well perhaps it would, but that would mean that the type of g does not fit into the pattern u -> next, which means we lose some amount of convenience whenever we deal with the await primitive.

For this blog series, I have chosen to proceed with not shutting off the input ends, to stay closer to Conduit behavior. The reason for this is simply convenience. If we didn’t do it this way, we wouldn’t be able to use x <- await monadic sugar any more. Not even Control.Frame forcibly closes the input end for you: you are expected to manually close the input end yourself after receiving the upstream termination signal or else experience automatic pipeline shutdown if you await again; this is presumably for the same sugar/convenience reasons.

In part 3 of this series, we will add the ability to behave like Frame in this regard: automatic shutdown after receiving the termination signal. Then in part 4 we will also provide the downstream pipe the opportunity to continue even after an upstream shutdown. Spoilers! For now, just forget I said all that. ;)

Pipe primitives

We can no longer write await, because we have to deal with the possibility of a returned result. So we’ll have to settle with awaitE, which provides Either a u or an i.

> awaitE :: Monad m => Pipe i o u m (Either u i)
> awaitE  = liftF $ awaitF Right Left

Notice, where we used to have id, we now have Right and Left. We need a next of type Either u i, so the first argument to awaitF must be i -> Either u i, and the second must be u -> Either u i. The types make the choice obvious.

Yield remains the same:

> yield :: Monad m => o -> Pipe i o u m ()
> yield b = liftF $ yieldF b ()

Pipe composition

Time to tweak the way pipes are composed. We no longer want the upstream pipe to hijack the downstream one when it has a result. Let’s dive in and see how it pans out

> (<+<) :: Monad m => Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r

Notice how composition not only moves input/output pairs

(i => i') >+> (i' => o) = (i => o)

but also result types

(u => u') >+> (u' => r) = (u => r)

What does that mean? Well, for one thing, we can no longer simply rearrange the type variables and write a Category instance, unless we constrain u, u', and r to all be the same. Even then, will it still follow category laws? Or if we don’t constrain the upstream/result types, will it follow category-esque laws? (What does that even mean?) More on this later.

> p1 <+< p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1
>   {- Return -} (\r      -> return r)
>   {- Yield  -} (\o next -> wrap $ yieldF o (next <+< p2))

Up until this point, everything is the same.

>   {- Await  -} (\f1 g1  -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2

The await case now has two functions at its disposal, the new one g1 is for handling the possibility that the upstream pipe has returned a value.

Now, here was my first impulse for handling the upstream return:

{- Return -} (\u' -> g1 u')

Simple, right? If you have an upstream result, then guess what, we have a function that is waiting for that input. But here’s the problem. g1 came from p1 :: Pipe i' o u' m r. That means that it has the type u' -> Pipe i' o u' m r, and therefore, when we apply a u', we get a Pipe i' o u' m r. Well that’s a problem, see, because our result type is supposed to be Pipe i o u m r: that’s i and u not i' and u'. So what do we do? Well, we could compose it with an exploding bomb to get the correct type:

{- Return -} (\u' -> g1 u' <+< error "kaboom!")

That’s not very nice. We could write in the docs that you should never await after you’ve gotten an upstream result, but using error like this is just gross.

How about something more sensible: just compose it with a pipe that will return the same upstream result all over again, in case you forgot what it was.

>     {- Return -} (\u'     -> g1 u' <+< return u')

Now we can safely say in the docs that once you get an upstream result, you will just keep getting that same result every time you await. That seems a lot less evil, though still a bit odd.

>     {- Yield  -} (\o next -> f1 o <+< next)
>     {- Await  -} (\f2 g2  -> wrap $ awaitF (\i -> p1' <+< f2 i)
>                                            (\u -> p1' <+< g2 u)))

Yield looks the same, and to handle an upstream await, we just extend what we had before by mirroring the treatment of f2 to extend to g2 in like manner.

Well there, we made it through again! Although once again, we get sort of a sour taste from the implementation we were forced to write. If only we had a way to deal even more explicitly with pipe termination… hrm… I’m feeling another blog post coming on…

> (>+>) :: Monad m => Pipe i i' u m u' -> Pipe i' o u' m r -> Pipe i o u m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

The extra function g has no significance when "running" a pipeline, so we will just ignore it and retain essentially the same runPipe as before:

> runPipe :: Monad m => Pipeline m r -> m r
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Return -} (\r       -> return r)
>   {- Yield  -} (\_o next -> runPipe next)
>   {- Await  -} (\f _g    -> runPipe $ f ())

Some basic pipes

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield

Since we no longer have the same await, we’ll have to rethink the way that we write pipe code. We often used the idiom forever $ await >>= foo, and it turns out that we can still simulate something like that:

> awaitForever :: Monad m => (i -> Pipe i o u m r) -> Pipe i o u m u
> awaitForever f = go where
>   go = awaitE >>= \ex -> case ex of
>     Left u  -> return u
>     Right i -> f i >> go

Conduit users may recognize awaitForever: it loops and loops until the upstream pipe returns a result, and then it just passes that upstream result right along, therefore it has the same result type as upstream.

We can write many of the functions we had before, but they will now bear the restriction of returning the upstream result type.

> pipe :: Monad m => (i -> o) -> Pipe i o u m u
> pipe f = awaitForever $ yield . f
> 
> idP :: Monad m => Pipe i i u m u
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer i u IO u
> printer = awaitForever $ lift . print

Due to this limitation, perhaps we want to provide a different result for a particular pipe.

> mapResult :: Monad m => (r -> r') -> Pipe i o u m r -> Pipe i o u m r'
> mapResult f p = do
>   r <- p
>   return (f r)
> 
> overwriteResult :: Monad m => r' -> Pipe i o u m r -> Pipe i o u m r'
> overwriteResult r p = p >> return r

My mental hlint is going BEEP BEEP BEEP right now, because mapOutput is just fmap, and overwriteResult is just fmap . const.

Newfound power

Now that our pipe composition works better with result types, we can write combinators that have nontrivial result types!

> runP :: Monad m => Consumer i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
>   Left  u -> return (u, [])
>   Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
> 
> evalP :: Monad m => Consumer i u m u
> evalP = fst `fmap` runP
> 
> execP :: Monad m => Consumer i u m [i]
> execP = snd `fmap` runP
> 
> fold :: Monad m => (r -> i -> r) -> r -> Consumer i u m r
> fold f = go where
>   go r = awaitE >>= \ex -> case ex of
>     Left _u -> return r
>     Right i -> go $! f r i

Play around in ghci and see for yourself:

ghci> runPipe $ runP <+< (overwriteResult "foo" $ fromList [1 .. 10])
  ("foo",[1,2,3,4,5,6,7,8,9,10])

ghci> runPipe $ fold (+) 0 <+< fromList [10, 20, 100]
  130

ghci> runPipe $ (printer >> return "not hijacked") <+< return "hijacked"
  "not hijacked"

But… is it a Category?

Consider what happens if we restrict the upstream and result types to be the same type.

newtype PipeC m u i o = PipeC (Pipe i o u m u)

instance Category (PipeC m u) where
    id :: PipeC m u i i
 -- id :: Pipe i i u m u
    id = PipeC idP

    (.) :: PipeC m u i' o -> PipeC m u i i' -> PipeC m u i o
 -- (.) :: Pipe i' o u m u -> Pipe i i' u m u -> Pipe i o u m u
    (PipeC p1) . (PipeC p2) = PipeC (p1 <+< p2)

Notice how the idP we wrote already had that restriction! However, notice that this restriction throws away some of our "newfound power": we can no longer use runP or execP. This raises suspicion about whether evalP and fold are well-behaved.

Well hold that thought for a second, and consider the following pseudo-haskell, where we provide a less restrictive Cateogry instance by "bundling" the input with the upstream type, and the output with the result type:

newtype PipeC m (i,u) (o,r) = PipeC (Pipe i o u m r)

instance Category (PipeC m) where
    id :: PipeC m (i,u) (i,u)
 -- id :: Pipe i i u m u
    id = PipeC idP

    (.) :: PipeC m (i',u') (o,r) -> PipeC m (i,u) (i',u') -> PipeC m (i,u) (o,r)
 -- (.) :: Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r
    (PipeC p1) . (PipeC p2) = PipeC (p1 <+< p2)

Note that, again, idP bears the exact restriction given in the type. However, this time, (.) captures the full meaning of (<+<) without any superfluous restriction!

But wait, we weren’t even sure if the restricted version was a category… how will we know if this is a category? Or… a category-like… thing, since we’re bending the rules of Haskell in the first place.

On a huge tangent, "type bundling" in this manner would also allow us to express a Category instance for lens families as well.

-- the types look backwards for LensFamily composition
-- so we'll just swap them in the first place
newtype LensC f (b,b') (a,a') = LensFamily f a a' b b'

instance Category (LensC f) where
  id :: LensC f (a,a') (a,a')
  id = LensC id

  (.) :: LensC f (b,b') (c,c') -> LensC f (a,a') (b,b') -> LensC f (a,a') (c,c')
  (LensC l1) . (LensC l2) = LensC (l1 . l2)

This really does work out soundly. See for yourself: (cabal install lens-family)

ghci> :m +Lens.Family.Stock


ghci> newtype LensC f b b' a a' = LensC (LensFamily f a a' b b')


ghci> let (LensC l1) `lcompose` (LensC l2) = LensC (l1 . l2)


ghci> :t lcompose
  lcompose
    :: LensC f t t1 a a' -> LensC f b b' t t1 -> LensC f b b' a a'

ghci> :t LensC id
  LensC id :: LensC f a a' a a'

Well back to the point at hand: the answer is I don’t know. Do you? Perhaps sometime later I’ll add an addendum to this blog series with a deeper investigation of the Category laws, but for now, we’re just going to plow ahead, and not promise anything about whether or not our Pipe is still a Category. I am going to conjecture that it is, but nevertheless, buyer beware! I dare you to find a counterexample.

Next time

The new powers that our enhanced pipe composition give us are nice, but we had to give up the ability to not care, and instead we have to write code in a slightly different style. What’s more, upstream pipes now have no power over their downstream counterparts; once an upstream pipe returns, it is forever doomed to just keep returning that same result, which is sort of weird.

Next time, we’ll explore a new primitive, abort, and restore the ability for any pipe to abort the entire pipeline.

abort :: Monad m => Pipe i o u m r

You can play with this code for yourself by downloading PipeU.lhs.

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 1: Yield and Await

Last time we quickly reviewed several basic Functors in Haskell, and various ways to combine them. Today, we will put these functors to good use, and rewrite Control.Pipe (not that it needs rewriting; we’re just doing this for fun).

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module Pipe where
> 
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
> 
> import Data.Void (Void)
> import Control.Monad (forever, when)
> import Control.Monad.Trans.Class (lift)

Functors

I’m going to give new names to three old friends. These new names will be more convenient and helpful when dealing with pipe-related concepts.

I’ll use a few conventions throughout this code:

  • A pipe’s input is referred to by the type variable i
  • A pipe’s output is referred to by the type variable o
  • Monads, as usual, are referred to by the type variable m
  • A pipe’s return type is referred to as r
  • The final type variable of a functor will usually be called next
> newtype Then next = Then next            -- Identity
> newtype Yield o next = Yield o           -- Const
> newtype Await i next = Await (i -> next) -- Fun

Then embodies knowledge of what to do next, while Await represents the need of an i in order to determine what’s next. Yield provides an o, which is presumably the i that someone else is awaiting.

The Functor instances are the same as in the last post.

> instance Functor Then where
>   fmap f (Then next) = Then (f next)
> 
> instance Functor (Yield o) where
>   fmap _f (Yield o) = Yield o
> 
> instance Functor (Await i) where
>   fmap f (Await g) = Await (f . g)

The Pipe type

For our pipe primitive yield, we want to be able to continue computation afterwards, so we will bundle Yield o with Then to accomplish this.

> type YieldThen o = Yield o :&: Then

At its heart, a Pipe can either Yield(Then) or Await. We can encode this directly with the :|: functor combiner.

> type PipeF i o = YieldThen o :|: Await i

Now we have assembled our functor, let’s create a Free Monad out of it.

> type Pipe i o = FreeT (PipeF i o)

This type is intended to work just like Pipe from Control.Pipe. It has four type parameters: Pipe i o m r (the final two are implied by the partial application of FreeT). We’ll also provide the same convenience synonyms as Control.Pipe (again, with implied type parameters m and r):

> type Producer o = Pipe () o
> type Consumer i = Pipe i  Void
> type Pipeline   = Pipe () Void

Working with PipeF

Unfortunately, FreeT introduces some extra layers of cruft that we have to work through. Our functor-based approach using (:|:) and (:&:) introduces even more cruft. Fear not, it is all very straightforward, and it is an entirely mechanical process to deal with the cruft.

First, let’s define some lifting functions and smart constructors to help us put the right puzzle pieces in the right places:

> liftYield :: YieldThen o next -> PipeF i o next
> liftYield = L
> 
> liftAwait :: Await i next ->     PipeF i o next
> liftAwait = R
> 
> yieldF :: o -> next ->           PipeF i o next
> yieldF o next = liftYield $ Yield o :&: Then next
> 
> awaitF :: (i -> next) ->         PipeF i o next
> awaitF f = liftAwait $ Await f

Now, to cut down on pattern-matching cruft, we’ll make a case assessment function.

First, consider the FreeT type:

newtype FreeT f m r = FreeT
  { runFreeT :: m (FreeF f r (FreeT f m r)) }

Ugh! It looks daunting, but it’s really quite straightforward. First, it is wrapped in a monad, m next. Second, it is wrapped in a FreeF f r next, which can be either Return r or Wrap (f next). Finally, next is another FreeT f m r all over again.

Because we will, to some extent, be mimicking Control.Pipe code, we will be performing case analysis at the level of FreeF.

> pipeCase :: FreeF (PipeF i o) r next
>          -> (r           -> a) -- Return
>          -> (o -> next   -> a) -- Yield
>          -> ((i -> next) -> a) -- Await
>                          -> a
> pipeCase (Return r) k _ _ = k r
> pipeCase (Wrap (L (Yield o :&: Then next)))
>                     _ k _ = k o next
> pipeCase (Wrap (R (Await f)))
>                     _ _ k = k f

Pipe primitives

We already created smart constructors awaitF and yieldF, which take the appropriate arguments, and plug them into the correct slots to create a PipeF.

By making use of these and liftF, writing the pipe primitives await and yield is trivial.

> await :: Monad m => Pipe i o m i
> await   = liftF $ awaitF id
> yield :: Monad m => o -> Pipe i o m ()
> yield b = liftF $ yieldF b ()

The trick to using liftF is you simply provide whatever argument to the constructor that makes sense. For await, we need to provide a function i -> next, such that next is the result type i. The obvious function i -> i is id. For yield, we need to come up with something such that next is the result type (), so () is what we plug in. liftF won’t always suit our needs, but once you grasp the little intuition, it is quite elegant for the places where it does work.

Pipe composition

The fundamental thing that you do with pipes is you connect them. By rearranging the type variables, you can make a Category, but I won’t bother doing that here.

Pipe composition is driven by the downstream pipe. The arrows point downstream, so p1 <+< p2 means that p1 is downstream of p2, and p2 is upstream of p1.

> (<+<) :: Monad m => Pipe i' o m r -> Pipe i i' m r -> Pipe i o m r
> p1 <+< p2 = FreeT $ do
>   x1 <- runFreeT p1
>   let p1' = FreeT $ return x1
>   runFreeT $ pipeCase x1

We begin by running the downstream monadic action (recall that FreeT is just a newtype around m (FreeF ...)) and performing case analysis on the resultant FreeF.

>   {- Return -} (\r      -> return r)
>   {- Yield  -} (\o next -> wrap $ yieldF o (next <+< p2))

If the downstream pipe is a Return, then we discard the upstream pipe and return the result. (The expressions on the right-hand side of each -> are Pipes, so when we say return r, we are saying "create the pipe that trivially returns r".)

If the downstream pipe is yielding, then we create a yield action, and suspend the composition of whatever comes next after the yield with the upstream pipe. Remember, yield transfers control downstream, so if the downstream of two composed pipes is yielding, then they are both giving up control to a pipe farther down the line. Also recall that wrap will take a PipeF and make a Pipe out of it, and that yieldF makes a PipeF.

>   {- Await  -} (\f1     -> FreeT $ do
>     x2 <- runFreeT p2
>     runFreeT $ pipeCase x2

If the downstream pipe is Awaiting, then control transfers upstream. We perform the same trick of extracting and casing on the upstream pipe.

>     {- Return -} (\r      -> return r)
>     {- Yield  -} (\o next -> f1 o <+< next)
>     {- Await  -} (\f2     -> wrap $ awaitF (\i -> p1' <+< f2 i)))

If the upstream pipe is a Return, then we discard the downstream pipe and return the result. That’s right: whichever pipe returns first wins, and shuts down anyone that is composed with it as soon as they give it control.

If the upstream pipe is yielding, then great! We’re in the branch where we happen to know that downstream is Awaiting, so just pass the yielded information along, and compose the new downstream pipe with the "next" part of the upstream one.

If the upstream pipe is awaiting, well, then both upstream and downstream are awaiting, so we transfer control further upstream by combining the two into a single await construct, deferring their composition until an up-upstream value is available.

And that’s it! That wasn’t so hard, was it?

> (>+>) :: Monad m => Pipe i i' m r -> Pipe i' o m r -> Pipe i o m r
> (>+>) = flip (<+<)
> infixr 9 <+<
> infixr 9 >+>

Running a pipeline

Running a pipeline is very straightforward. We simply provide an endless supply of () fuel, and keep cranking the pipeline until it returns something. We ignore anything it yields; its type is constrained to Void so it shouldn’t be yielding anything in the first place.

> runPipe :: Monad m => Pipeline m r -> m r
> runPipe p = do
>   e <- runFreeT p
>   pipeCase e
>   {- Return -} (\r       -> return r)
>   {- Yield  -} (\_o next -> runPipe next)
>   {- Await  -} (\f       -> runPipe $ f ())

Note that runPipe and <+< could be considered two different "interpreters" for pipes. The Free monad just gives us a convenient way to assemble the puzzle pieces, but it is up to us to give the final result meaning by interpreting it. Conduit’s "connect and resume" operator could be considered yet another "interpreter".

Some basic pipes

Here’s just a few pipes to play with. Fire up ghci and make sure they work as expected.

> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield

Pay attention to how the following are implemented, because next time we’re going to have to change them.

> pipe :: Monad m => (i -> o) -> Pipe i o m r
> pipe f = forever $ await >>= yield . f
> 
> idP :: Monad m => Pipe i i m r
> idP = pipe id
> 
> filterP :: Monad m => (i -> Bool) -> Pipe i i m r
> filterP test = forever $ await >>= \x -> when (test x) (yield x)
> 
> printer :: Show i => Consumer i IO r
> printer = forever $ await >>= lift . print

Testing…

ghci> runPipe $ printer <+< pipe (+1) <+< filterP even <+< fromList [1 .. 5]  
  3
  5

ghci> runPipe $ idP <+< idP <+< return "Hello, pipes" <+< idP <+< idP  
  "Hello, pipes"

ghci> runPipe $ return "Downstream drives" <+< return "Upstream doesn't"  
  "Downstream drives"

ghci> runPipe $ (printer >> return "not hijacked") <+< return "hijacked"  
  "hijacked"

Next time

Await and yield are great, but the greedy return shutdown behavior is somewhat disturbing. Next time, we’ll tweak the Pipe type, giving it an "upstream result" type parameter. With that, result types will be composed, too, and that way an upstream pipe won’t be able to hijack a downstream pipe!

type PipeF i o u = ???
type Pipe i o u = FreeT (PipeF i o u)
(<+<) :: Pipe i' o u' m r -> Pipe i i' u m u' -> Pipe i o u m r

Play around with this code by downloading Pipe.lhs. (You’ll need Fun.lhs from last time in the same directory).

Posted in Uncategorized | Leave a comment

Pipes to Conduits part 0: Combining Functors

A Functor in Haskell is a type of kind f :: * -> *, which supports the operation fmap :: (a -> b) -> f a -> f b. Many "container" types are Functors, including the List type. But we’re not going to talk about "containers", per se. We’re going to explore a few of the simplest functors that we can squeeze out of the Haskell type system. Of course, I don’t know the actual name of some of these, so you’ll have to forgive me for giving them pet names instead.

Our end goal in exporing these functors is to reproduce the Conduit library by assembling pieces of it, one functor at a time. For this post, we’re just going to survey various functors, and ways to compose them. I’ll also touch lightly on how they play with the Free Monad Transformer, though serious discussion of such will be left to later posts.

> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
> 
> module Fun where

The Identity functor

> newtype Identity next = Identity next

The Identity functor trivially wraps a value. In order to implement fmap, it just applies the function directly to the value inside.

> instance Functor Identity where
>   fmap f (Identity next) = Identity (f next)

When used with the Free Monad Transformer, the Identity monad trivially grants you the ability to represent "the thing that comes next". This will be convenient for us sometime around part 5 of this series.

The Empty functor

> data Empty next = Empty

The Empty functor contains no information. It admits the type variable, but is otherwise nothing but the trivial value, ().

> instance Functor Empty where
>   fmap _f Empty = Empty

When used with the Free Monad Transformer, the Empty functor allows you to short-circuit computation. The Free Monad Transformer works by stacking functors up, but as you can see, the Empty functor has no room for other functors to live inside of it.

The Const functor

> newtype Const a next = Const a

The Const functor is very similar to the Empty functor, except that it contains some actual value, which remains untouched by functor operations.

> instance Functor (Const a) where
>   fmap _f (Const a) = Const a

When used with the Free Monad Transformer, the Const functor allows you to terminate computation while providing some information. Joining this functor with the Identity functor will allow us to supply information without terminating computation (because the Identity functor gives a space for the "next" computation), which will be the heart of our yield functionality.

The (a ->) functor

> newtype Fun a next = Fun (a -> next)

Functions, as you may know, are functors. In order to fmap onto a function, simply wait until the function has acquired input and produced an output, and then map onto the function’s output.

> instance Functor (Fun a) where
>   fmap f (Fun g) = Fun (\x -> f (g x))

When used with the Free Monad Transformer, this allows us to represent inversion of control, or acquiring information from some outside source, in order to determine what to do next. This will be the heart of our await functionality.

Composing functors

> newtype (f :.: g) x = Composed (f (g x))

Functors can be composed, and the result is a functor.

> instance (Functor f, Functor g) => Functor (f :.: g) where
>    fmap f (Composed x) = Composed $ fmap (fmap f) x

I won’t be using this particular form of functor composition for future posts, but it was worth noting. Instead, let’s take a look at two other ways to combine functors:

Combining functors (tagged union)

> infixl 5 :|:
> data (f :|: g) x = L (f x) | R (g x)

If I have two functors f and g, then their tagged union is also a functor. We can just tag the f x values with L and the g x values with R so that whenever we come across some data, we know which of the two functors it actually was.

By case analysis, we can make a tagged union of functors also be a functor:

> instance (Functor f, Functor g) => Functor (f :|: g) where
>   fmap f (L x) = L (fmap f x)
>   fmap f (R x) = R (fmap f x)

This will be very useful. While in normal code you would just use Haskell’s plain old tagged unions to define a data type:

data List a = Nil | Cons a (List a)

we’re not going to do that, because it’s more fun to take advantage of functory goodness.

You could define a left-only or right-only functor for a tagged union if you wanted to.

Combining functors (product)

> infixl 7 :&:
> data (f :&: g) x = f x :&: g x

If I have two functors f and g, then their product is also a functor: just perform the fmap on them both simultaneously.

> instance (Functor f, Functor g) => Functor (f :&: g) where
>   fmap f (l :&: r) = fmap f l :&: fmap f r

Similar to how in Haskell you can provide multiple pieces of data to a constructor, we can use :&: to bundle information together.

type Cons a = Const a :&: Identity
type Nil = Empty
type ListF a = Nil :|: Cons a
type List a = FreeT (ListF a)

Again, left-only or right-only Functor instances are possible, but unnecessary for my needs.

Next time

Next time, we’ll start by creating an implementation of Control.Pipe from the pipes package. Our Pipe type will be able to yield and await.

Exercise to the reader: try it yourself before you read the next post! Here’s a little bit to get you started.

{-# LANGUAGE TypeOperators #-}
module Pipe where

-- "cabal update && cabal install pipes" for this Free module
import Control.Monad.Trans.Free
import Fun

newtype Then    next = Then next         -- Identity
newtype Yield o next = Yield o           -- Const
newtype Await i next = Await (i -> next) -- Fun

type PipeF i o = (??? :&: ???) :|: ???
type Pipe i o m r = ??? PipeF i o ???

yield :: o -> Pipe i o m ()
yield o = ???

await :: Pipe i o m i
await = ???

To play with this code, download Fun.lhs.

Posted in Uncategorized | Leave a comment

Breaking from a Loop with ContT

Sort of in response to Breaking from a Loop.

Sometimes I wish Haskell culture embraced ContT more.

> import Control.Monad.Cont
> import Control.Monad.State
> import Control.Monad.IO.Class

loopForever

Suppose I wanted to break out of loops using a user-defined label for "break". First, assume I am working with some data type LoopT:

data LoopT m a = ...
runLoopT :: Monad m => LoopT m a -> m a

And suppose I wanted to write something like the following:

> getNonNullLine :: IO String
> getNonNullLine = runLoopT $ loopForever $ \break -> do
>   str <- liftIO getLine
>   when (not $ null str) $ break str

What type would loopForever have to have in order to make this work?

loopForever :: Monad m
            => (  (a -> LoopT m ())  -- label
               -> LoopT m b          -- loop body
               )         
            -> LoopT m a             -- type of whole expression

Hrm… now how are we going to implement LoopT, runLoopT, and loopForever? Well gee, the type of loopForever sure looks familiar… in fact, it’s nearly identical to callCC!

callCC :: MonadCont m => ((a -> m b) -> m b) -> m a

It turns out that implementing it in terms of callCC and forever is trivial:

> loopForever :: MonadCont m => ((a -> m c) -> m b) -> m a
> loopForever f = callCC (forever . f)
> runLoopT :: Monad m => ContT a m a -> m a
> runLoopT = flip runContT (\a -> return a)

The reasoning is straightforward: I want to callCC on f, but first, I want to apply "forever" to the "body" of f, hence callCC (forever . f).

Another silly example using State:

> untilS :: MonadState s m => (s -> Bool) -> (s -> s) -> m s
> untilS test inc = runLoopT $ loopForever $ \break -> do
>   s <- get
>   when (test s) $ break s
>   put $! inc s

Testing:

ghci> flip runState 3 $ untilS (==5) (+1)  
  (5,5)

Foreach

Breaking out of ‘foreach’ loops can be written just as easily:

> foreachExample :: IO ()
> foreachExample = runLoopT_ $ do
>   foreach_ [1 .. 3] $ \breakI i -> do
>     foreach_ [4 .. 6] $ \breakJ j -> do
>       when (j == 6) $ breakJ ()
>       liftIO $ print (i, j)
> foreach_ :: MonadCont m => [i] -> ((() -> m c) -> i -> m b) -> m ()
> foreach_ is f = callCC (forM_ is . f)
> runLoopT_ :: Monad m => ContT () m a -> m ()
> runLoopT_ = flip runContT (\_ -> return ())

Notice how similar foreach_ is to loopForever. We just modify the body of f in a different way, this time applying forM_ is. The plumbing is slightly different, since foreach_ loops are used exclusively for their side effects. Writing a version of foreach that uses forM instead of forM_ is left as an exercise for the reader.

Testing:

ghci> foreachExample  
  (1,4)
  (1,5)
  (2,4)
  (2,5)
  (3,4)
  (3,5)

There are obviously different tradeoffs to using ContT, and I certainly do endorse using EitherT and MaybeT for such things. I just find it lamentable that ContT is treated like the ugly duckling. ContT is not to be feared.

To play with this code, download loop.lhs.

Posted in Uncategorized | Leave a comment

The Long and Epic Journey of LambdaCase

On December 16, 2005, Haskell Prime trac ticket #41 was born, with the humble title add LambdaCase. The description field contained a pointer to the LambdaCase wiki article, whose contents are also quite humble:

case statements as first order

a simple desugaring

case of
   arms

gets desugared to

\x -> case x of
    arms

That’s right. Six and a half years ago, Isaac Jones was thinking about how Haskell would be better if it had LambdaCase. Some of you readers might have been right there with him six years ago — or more — wishing for this simple extension to Haskell syntax.

Well the truth is, it almost certainly goes back much farther than that. Trac history records that Haskell Prime tickets 13 through 74 were all created that very same day by ijones; presumably they were copied over from some pre-existing system. As someone who only started learning Haskell in late 2010, anything that predates GHC 6.12 is ancient history for me, so you’ll have to ask one of the old sages for details about the true origins of the idea of LambdaCase.

Fast forward to October 2, 2010, about two months before I started reading Learn You a Haskell and falling in love with a new language. batterseapower opened GHC trac ticket #4359 entitled Implement lambda-case/lambda-if. He provided a patch implementing behavior similar to that described in Haskell Prime trac ticket #41, bundled with a similar feature for if-then-else syntax.

Prelude> (if then "Haskell" else "Cafe") False
"Cafe"
Prelude> (case of 1 -> "One"; _ -> "Not-one") 1
"One"

Discussion ensued. Would merely providing mcase be a better option? Should this feature serve as "a standard lambda that can discriminate patterns on the value being bound"? Simon Marlow impulsively supported lambda-case and lambda-if at first, but then revoked his support, stating that:

the downside is that we have to adjust our mental parsers/typecheckers to recognise if then and case of as a lambda, and I’m not sure the gain in brevity is worth the loss of readability.

Simon Peyton-Jones joined with Marlow in scepticism of the initial proposed syntax, stating that:

My own gut feel is that it’s a lot better to have an initial symbol to say "here comes a function". So I’m keener on the multi-clause lambda idea, if someone can devise a syntax that works.

The Simons put their heads together and suggested the following syntax:

\case { blah } ==> \x -> case x of { blah }

Peyton-Jones suggested \of as a potential alternative to \case, which would avoid creating a new "layout herald", given of is already a layout herald. The \of idea seems to be popular in a more recent discussion; we’ll get to that later.

All the while, discussions on mailing lists were taking place. Max Bolingbroke notified Haskell Cafe of the ticket, resulting in an explosion of ideas and discussion. Back on trac, it was suggested that LambdaCase could support multiple arguments:

\case { (Just x) (Just y) -> x + y; _ _ -> 1 } 
  ==>
\a b -> case (a, b) of { (Just x, Just y) -> x +y; (_, _) -> 1 }

Now, I believe there were two grave mistakes that caused this ticket to be derailed. First, it introduced a patch that implemented two separate features: lambda-case and lambda-if. Second, as it was discussed, the scope of it never seemed to stop growing. People kept dreaming up new features that could be added into the original idea, causing it to swell in complexity, making it difficult to pinpoint what exactly should be implemented and whether it was a good idea at all.

Fast-forward again to April 2011. Following a few recent touches to the ticket, Mikhail Vorozhtov introduced anoter patch, implementing single-argument \case syntax. The Simons again chimed in, this time less helpful than the last, they both led conversation to tangential topics. Marlow expressed interest in multi-argument \case, while Peyton-Jones suggested simply using \ instead of \case, and therefore making \ a layout herald. The \ idea received mostly negative feedback, given that it would cause backwards incompatibilities in the way \ works.

Nevertheless, Peyton-Jones made an important comment in the midst of this portion of the discussion:

… every part of GHC from the parser onwards already implements lambda-case! … All that lambda-case does is provide concrete syntax to let you get at the abstract syntax that is already there. … So I think lambda-case is a fine candidate to fill the syntactic gap for now, leaving something more ambitious for the future.

"Something more ambitious" referred to a compositional approach to pattern matching, and I couldn’t agree more: that we need to fill the gap with lambda-case now, and leave the ambitious, compositional solution for the future.

On May 12, 2011, Simon Marlow gave a deep stamp of approval to the newly-submitted patch when he said:

Patch looks great, thanks Mikhail! We just need a test, and we can put it into 7.2.1.

Despite this, the milestone got pushed back from 7.2.1 to 7.4.1. At this point, despite the negative feedback, several people seemed to be pushing for the simpler \ syntax. SPJ suggested that to avoid conflicts, the feature could be made available only with explicit layout (using curly braces and semicolons). However, Simon Marlow expressed dislike for explicit layout only, and concluded:

Perhaps the only way to do this is to have a new keyword.

A few more suggestions floated about, and the milestone was punted from 7.4.1 to 7.6.1.

Fast forward to May 2012. SPJ downgraded the priority "until we can find a better solution". Mikhail, ever the champion of the cause, wisely suggested splitting multi-arg \case into a separate ticket. SPJ noted that "Simon and I are deeply under water with other aspects of GHC, which makes it hard to devote attention to new features." Simon Marlow requested "a summary of the previous proposals and their pros and cons, so that we don’t have to rediscover all this." (Note that this document is not geared towards achieving that goal. Rather, my goal with this document is to merely expose the long and epic journey of LambdaCase to the public eye.) Marlow also suggested:

Another alternative is to introduce a new keyword for anonymous multi-clause multi-argument functions, e.g. fun

Fast forward again to July 5, 2012. Mikhail created a wiki page, LambdasVsPatternMatching, essentially to fulfill Marlow’s request of reviewing the pros and cons of each of the major suggestions. Notably, Mikhail concludes from his personal experience that multi-arg \case was entirely unnecessary; single-arg \case was entirely sufficient, and in those few rare circumstances where multiple matches were desirable, curry $ \case ... was sufficient.

Mikhail provided new patches, and started a thread on the GHC Users mailing list. The usage of parens were discussed, and Simon’s \of idea came to light again, gathering strong support from various people. A few (including myself) find this particular idea acceptable, though "a little weird", and Mikhail stated:

Do you think that adding "\" + "case" as a layout herald would complicate the language spec and/or confuse users? Because it certainly does not complicate the implementation (there is a patch for \case already). IMO \case is more descriptive, "of" is just a preposition after all.

Additional details regarding layout rules and "comma sugar" are discussed in that thread.

Fast forward to today. The discussion is ongoing. You have the opportunity to let your voice be heard. Chime in on the mailing list. Code review the current proposed patch. But promise me one thing. Do you want LambdaCase, in any shape or form, to make it into GHC 7.6.1? Then make yourself heard. Make it happen. We’re in the process of writing the latest chapter in LambdaCase history. Let’s not punt it to the next GHC release. Let’s make this chapter the one where it actually gets released. And let’s not stop there. It’s been over six years. It’s time for dreams of Haskell Prime to start coming true. What will it take to get this feature into the Haskell language itself? Let’s keep moving forward, and see that happen for ourselves.

Posted in Uncategorized | 4 Comments