Commit e9da287c authored by Valentin Reis's avatar Valentin Reis
Browse files

[refactor] Removes json messaging json boilerplate

This removes the second type parameter to the NRMMessage typeclass and
adapts the code generation code to handle the downstream format
nonetheless.
parent b715904a
Pipeline #8981 failed with stages
in 12 seconds
......@@ -262,7 +262,6 @@ let modules =
, "NRM.Orphans.Dhall"
, "NRM.Orphans.NonEmpty"
, "NRM.Types.Messaging.DownstreamEvent"
, "NRM.Types.Messaging.DownstreamEvent.JSON"
, "NRM.Types.Messaging.UpstreamPub"
, "NRM.Types.Messaging.UpstreamReq"
, "NRM.Types.Messaging.UpstreamRep"
......
......@@ -86,7 +86,6 @@ library nrmlib
NRM.Orphans.Dhall
NRM.Orphans.NonEmpty
NRM.Types.Messaging.DownstreamEvent
NRM.Types.Messaging.DownstreamEvent.JSON
NRM.Types.Messaging.UpstreamPub
NRM.Types.Messaging.UpstreamReq
NRM.Types.Messaging.UpstreamRep
......@@ -342,7 +341,6 @@ executable nrm.so
NRM.Orphans.Dhall
NRM.Orphans.NonEmpty
NRM.Types.Messaging.DownstreamEvent
NRM.Types.Messaging.DownstreamEvent.JSON
NRM.Types.Messaging.UpstreamPub
NRM.Types.Messaging.UpstreamReq
NRM.Types.Messaging.UpstreamRep
......@@ -513,7 +511,6 @@ executable nrmstatic
NRM.Orphans.Dhall
NRM.Orphans.NonEmpty
NRM.Types.Messaging.DownstreamEvent
NRM.Types.Messaging.DownstreamEvent.JSON
NRM.Types.Messaging.UpstreamPub
NRM.Types.Messaging.UpstreamReq
NRM.Types.Messaging.UpstreamRep
......
......@@ -7,137 +7,42 @@
-- License : MIT
-- Maintainer : fre@freux.fr
module Codegen.CHeader
( toHeader,
CHeaderGen (toCHeader),
( toCHeader,
)
where
{-import Data.JSON.Schema.Generator as G-}
import Data.Char (toLower)
import qualified Data.Text as T
import GHC.Generics
( C,
S,
)
import Protolude
import Prelude (String)
import qualified Prelude
( undefined,
)
type MessageName = Text
type Key = Text
data HeaderType = HDouble | HInt | HString
toHeaderText :: HeaderType -> Text
toHeaderText HDouble = " %f"
toHeaderText HInt = " %d"
toHeaderText HString = " \\\"%s\\\""
import Data.Aeson as A
import Data.Aeson.Types as AT
import qualified Data.HashMap.Strict as H
import qualified Data.JSON.Schema as S
import Data.Text (toUpper)
import qualified Data.Vector as V (fromList)
import qualified NRM.Classes.Messaging as M
import Protolude hiding (Any)
type CHeaderEntries = [(Key, HeaderType)]
toCHeader :: (M.NRMMessage a) => Proxy a -> Text
toCHeader = toHeader . goToplevel . M.messageSchema
type CHeader = [(MessageName, CHeaderEntries)]
toHeader :: CHeader -> Text
toHeader :: [(Text, AT.Value)] -> Text
toHeader h =
mconcat $ intersperse "\n" $
h <&> \(msgname, khl) ->
"#define NRM_"
<> T.toUpper msgname
<> "_FORMAT \"{\\\""
<> (toS msgname & (\((x : xs) :: String) -> toLower x : xs) & toS)
<> "\\\":{"
<> mconcat (intersperse "," (toField <$> khl))
<> "}}\""
where
toField :: (Key, HeaderType) -> Text
toField (key, tpe) = key <> ":" <> toHeaderText tpe
class CHeaderGen a where
toCHeader :: Proxy a -> CHeader
default toCHeader ::
(Generic a, GCHeaderGen (Rep a)) =>
Proxy a ->
CHeader
toCHeader = genericToCHeader
class GCHeaderGen f where
gToCHeader :: Proxy (f a) -> CHeader
genericToCHeader ::
(Generic a, GCHeaderGen (Rep a)) =>
Proxy a ->
CHeader
genericToCHeader = gToCHeader . fmap from
class GCHeaderGenEntries f where
gToCHeaderEntries :: Proxy (f a) -> CHeaderEntries
class CHeaderType a where
headerType :: Proxy a -> HeaderType
instance
(GCHeaderGen a, GCHeaderGen b) =>
GCHeaderGen (a :+: b)
where
gToCHeader Proxy = gToCHeader x <> gToCHeader y
where
x = Proxy :: Proxy (a p)
y = Proxy :: Proxy (b p)
instance
(GCHeaderGenEntries a, GCHeaderGenEntries b) =>
GCHeaderGenEntries (a :*: b)
where
gToCHeaderEntries _ = gToCHeaderEntries a <> gToCHeaderEntries b
where
a = Proxy :: Proxy (a p)
b = Proxy :: Proxy (b p)
instance
(GCHeaderGen a) =>
GCHeaderGen (D1 meta a)
where
gToCHeader _ = gToCHeader b
where
b = Proxy :: Proxy (a p)
instance
(Selector s, CHeaderType t) =>
GCHeaderGenEntries (M1 S s (Rec0 t))
where
gToCHeaderEntries _ =
[ ( "\\\"" <> toS selname
<> "\\\"",
headerType (Proxy :: Proxy t)
)
]
where
selector = Prelude.undefined :: S1 s a p
selname = selName selector
instance
(Constructor c, GCHeaderGenEntries p) =>
GCHeaderGen (M1 C c p)
where
gToCHeader _ =
[ ( toS conname,
gToCHeaderEntries (Proxy :: Proxy (p x))
)
]
where
conname = conName (Prelude.undefined :: (M1 C c p) x)
instance CHeaderType Text where
headerType _ = HString
instance CHeaderType Int where
headerType _ = HInt
instance CHeaderType Double where
headerType _ = HDouble
<> toUpper msgname
<> "_FORMAT "
<> show (A.encode (AT.Object $ H.singleton msgname khl))
goToplevel :: S.Schema -> [(Text, AT.Value)]
goToplevel (S.Choice schemas) = mconcat $ goToplevel <$> schemas
goToplevel (S.Object [S.Field key _ content]) = [(key, go content)]
goToplevel _ = panic "schema first level malformed"
go :: S.Schema -> AT.Value
go (S.Tuple schemas) = AT.Array (V.fromList $ go <$> schemas)
go (S.Object fields) = AT.Object . H.fromList $
fields <&> \(S.Field key _ content) -> (key, go content)
go (S.Value _) = "%s"
go (S.Number _) = AT.String "%d"
go _ = panic "error: unallowed in message format"
......@@ -83,5 +83,5 @@ toOP (S.Constant aesonValue) =
[("const", mkString $ toS (A.encode aesonValue))]
toOP S.Any = []
generatePretty :: (M.NRMMessage a b) => Proxy a -> Text
generatePretty :: (M.NRMMessage a) => Proxy a -> Text
generatePretty = toS . AP.encodePretty . toAeson . M.messageSchema
......@@ -27,34 +27,25 @@ import Generics.Generic.IsEnum (GIsEnum)
import LMap.Map as LM
import Protolude
class
(Generic b, SG.GJSONSchema (Rep b), AG.GfromJson (Rep b), AG.GtoJson (Rep b), GIsEnum (Rep b), ConNames (Rep b)) =>
NRMMessage a b
| a -> b where
{-# MINIMAL fromJ, toJ #-}
fromJ :: b -> a
toJ :: a -> b
class (Generic a, SG.GJSONSchema (Rep a), AG.GfromJson (Rep a), AG.GtoJson (Rep a), GIsEnum (Rep a), ConNames (Rep a)) => NRMMessage a where
encodePretty :: a -> ByteString
encodePretty = toS . AP.encodePretty . AG.gtoJson . toJ
encodePretty = toS . AP.encodePretty . AG.gtoJson
encode :: a -> ByteString
encode = toS . A.encode . AG.gtoJson . toJ
encode = toS . A.encode . AG.gtoJson
decode :: ByteString -> Maybe a
decode = fmap fromJ . AT.parseMaybe AG.gparseJson <=< A.decodeStrict
decode = AT.parseMaybe AG.gparseJson <=< A.decodeStrict
decodeT :: Text -> Maybe a
decodeT = fmap fromJ . AT.parseMaybe AG.gparseJson <=< A.decodeStrict . toS
decodeT = AT.parseMaybe AG.gparseJson <=< A.decodeStrict . toS
encodeT :: a -> Text
encodeT = toS . A.encode . AG.gtoJson . toJ
encodeT = toS . A.encode . AG.gtoJson
messageSchema :: Proxy a -> Schema
messageSchema (Proxy :: Proxy a) = SG.gSchema (Proxy :: Proxy b)
messageSchema = SG.gSchema
newtype GenericJSON (a :: Type) = GenericJSON {unGenericJSON :: a}
......
......@@ -140,7 +140,7 @@ reqrep ::
reqrep s opts = \case
Protocols.CPD ->
const $ do
ZMQ.receive s <&> decode . toS >>= \case
ZMQ.receive s <&> decodeT . toS >>= \case
Nothing -> putText "Couldn't decode reply"
Just (URep.RepCPD cpd) ->
putText
......@@ -159,7 +159,7 @@ reqrep s opts = \case
liftIO $ hFlush stdout
Protocols.SliceList ->
const $ do
ZMQ.receive s <&> decode . toS >>= \case
ZMQ.receive s <&> decodeT . toS >>= \case
Nothing -> putText "Couldn't decode reply"
Just (URep.RepList (URep.SliceList l)) ->
putText
......@@ -175,7 +175,7 @@ reqrep s opts = \case
Protocols.GetState ->
const $ do
msg <- ZMQ.receive s <&> toS
case decode msg of
case decodeT msg of
Nothing -> putText "Couldn't decode reply"
Just (URep.RepGetState (URep.GetState st)) ->
putText $ pShowOpts opts st
......@@ -183,7 +183,7 @@ reqrep s opts = \case
liftIO $ hFlush stdout
Protocols.GetConfig ->
const $ do
ZMQ.receive s <&> decode . toS >>= \case
ZMQ.receive s <&> decodeT . toS >>= \case
Nothing -> putText "Couldn't decode reply"
Just (URep.RepGetConfig (URep.GetConfig cfg)) ->
if C.jsonPrint opts
......@@ -194,11 +194,11 @@ reqrep s opts = \case
Protocols.SetPower ->
const $ do
msg <- ZMQ.receive s
liftIO . print $ ((decode $ toS msg) :: Maybe URep.Rep)
liftIO . print $ ((decodeT $ toS msg) :: Maybe URep.Rep)
liftIO $ hFlush stdout
Protocols.KillSlice ->
const $ do
ZMQ.receive s <&> decode . toS >>= \case
ZMQ.receive s <&> decodeT . toS >>= \case
Nothing -> putText "Couldn't decode reply"
Just (URep.RepSliceKilled (URep.SliceKilled sliceID)) ->
putText $ "Killed slice ID: " <> C.toText sliceID
......@@ -208,7 +208,7 @@ reqrep s opts = \case
liftIO $ hFlush stdout
Protocols.KillCmd ->
const $ do
ZMQ.receive s <&> decode . toS >>= \case
ZMQ.receive s <&> decodeT . toS >>= \case
Nothing -> putText "Couldn't decode reply"
Just (URep.RepCmdKilled (URep.CmdKilled cmdID)) ->
putText $ "Killed cmd ID: " <> CmdID.toText cmdID
......@@ -228,7 +228,7 @@ reqstream ::
reqstream s c Protocols.Run UReq.Run {..} = do
ZMQ.connect s $ toS (rpcAddress c)
msg <- ZMQ.receive s
case ((decode $ toS msg) :: Maybe URep.Rep) of
case ((decodeT $ toS msg) :: Maybe URep.Rep) of
Nothing -> putText "error: received malformed message(1)."
Just (URep.RepStart (URep.Start _ cmdID)) -> zmqCCHandler (kill cmdID c) >> go
Just (URep.RepStartFailure _) -> putText "Command start failure."
......@@ -237,7 +237,7 @@ reqstream s c Protocols.Run UReq.Run {..} = do
go = do
msg <- ZMQ.receive s
when (C.verbose c == C.Verbose) $ liftIO $ print msg
case ((decode $ toS msg) :: Maybe URep.Rep) of
case ((decodeT $ toS msg) :: Maybe URep.Rep) of
Just (URep.RepStdout (URep.stdoutPayload -> x)) -> putStr x >> go
Just (URep.RepStderr (URep.stderrPayload -> x)) -> hPutStr stderr x >> go
Just (URep.RepThisCmdKilled _) -> putText "Command killed."
......
......@@ -38,7 +38,7 @@ import qualified NRM.Types.Configuration.Yaml as CI (encodeDCfg)
import qualified NRM.Types.Manifest as MI
import qualified NRM.Types.Manifest.Yaml as MI (encodeManifest)
import NRM.Types.Messaging.DownstreamEvent
import qualified NRM.Types.Messaging.DownstreamEvent.JSON as Down (Event (..))
import qualified NRM.Types.Messaging.DownstreamEvent as Down (Event (..))
import NRM.Types.Messaging.UpstreamPub
import NRM.Types.Messaging.UpstreamRep
import NRM.Types.Messaging.UpstreamReq
......@@ -86,7 +86,7 @@ downstreamEventSchema = generatePretty (Proxy :: Proxy Event)
-- | The libnrm C header.
libnrmHeader :: Text
libnrmHeader = toHeader $ toCHeader (Proxy :: Proxy Down.Event)
libnrmHeader = toS $ toCHeader (Proxy :: Proxy Down.Event)
-- | A license for C headers.
licenseC :: Text
......
......@@ -6,16 +6,15 @@
-- License : BSD3
-- Maintainer : fre@freux.fr
module NRM.Types.Behavior
( NRMEvent (..),
-- * The Behavior specification
Behavior (..),
CmdStatus (..),
( NRMEvent (..)
, -- * The Behavior specification
Behavior (..)
, CmdStatus (..)
)
where
import Data.MessagePack
import qualified NRM.Classes.Messaging as M
import NRM.Classes.Messaging as M
import NRM.Types.Cmd
import NRM.Types.CmdID
import NRM.Types.Configuration
......@@ -79,22 +78,22 @@ instance MessagePack Behavior where
toObject (Log Error msg) = toObject ("logError" :: Text, msg)
toObject (Log Info msg) = toObject ("logInfo" :: Text, msg)
toObject (Log Debug msg) = toObject ("logDebug" :: Text, msg)
toObject (Pub msg) = toObject ("publish" :: Text, M.encodeT msg)
toObject (Pub msg) = toObject ("publish" :: Text, encodeT msg)
toObject (Rep clientid msg) =
toObject ("reply" :: Text, clientid, M.encodeT msg)
toObject (StartChild cmdID cmd args env) =
toObject ("cmd" :: Text, cmdID, cmd, args, env)
toObject (KillChildren cmdIDs reps) =
toObject
( "kill" :: Text,
cmdIDs,
second M.encodeT <$> reps
( "kill" :: Text
, cmdIDs
, second M.encodeT <$> reps
)
toObject (ClearChild cmdID maybeRep) =
toObject
( "pop" :: Text,
cmdID,
second M.encodeT <$> Protolude.toList maybeRep
( "pop" :: Text
, cmdID
, second M.encodeT <$> Protolude.toList maybeRep
)
fromObject x = to <$> gFromObject x
{-# LANGUAGE DerivingVia #-}
{-# OPTIONS_GHC -fno-warn-partial-fields #-}
-- |
-- Module : NRM.Types.Messaging.DownstreamEvent
......@@ -13,25 +14,39 @@ where
import Data.Aeson
import Data.JSON.Schema
import Data.Maybe (fromJust)
import Data.MessagePack
import NRM.Classes.Messaging
import qualified NRM.Classes.Messaging as M
import NRM.Types.CmdID as CmdID
import NRM.Types.DownstreamThreadID
import qualified NRM.Types.Messaging.DownstreamEvent.JSON as J
import qualified NRM.Types.Process as P
import NRM.Types.Units
import Protolude
-- | partial record selectors are unfortunately used here.
-- They make the top level of the serialized JSON message format
-- more readable by embedding tags.
data Event
= CmdPerformance CmdID Operations
| CmdPause CmdID
| ThreadProgress DownstreamThreadID Progress
| ThreadPause DownstreamThreadID
| ThreadPhaseContext DownstreamThreadID PhaseContext
| ThreadPhasePause DownstreamThreadID
deriving (Generic, MessagePack)
= CmdPerformance
{ cmdID :: CmdID,
perf :: Operations
}
| CmdPause
{ cmdID :: CmdID
}
| ThreadProgress
{ downstreamThreadID :: DownstreamThreadID,
progress :: Progress
}
| ThreadPause
{ downstreamThreadId :: DownstreamThreadID
}
| ThreadPhaseContext
{ downstreamThreadId :: DownstreamThreadID,
phaseContext :: PhaseContext
}
| ThreadPhasePause
{ downstreamThreadId :: DownstreamThreadID
}
deriving (Generic, MessagePack, NRMMessage)
data PhaseContext
= PhaseContext
......@@ -42,101 +57,3 @@ data PhaseContext
}
deriving (Generic, Show, MessagePack)
deriving (JSONSchema, ToJSON, FromJSON) via GenericJSON PhaseContext
instance M.NRMMessage Event J.Event where
toJ = \case
CmdPerformance cmdID perf ->
J.CmdPerformance
{ cmdID = CmdID.toText cmdID,
perf = fromOps perf
}
CmdPause cmdID ->
J.CmdPause
{ cmdID = CmdID.toText cmdID
}
ThreadProgress
(DownstreamThreadID threadCmdID processID taskID threadID rankID)
threadProgress ->
J.ThreadProgress
{ cmdID = CmdID.toText threadCmdID,
processID = fromIntegral $ P.rawPid processID,
taskID = fromTaskID taskID,
threadID = fromIntegral threadID,
rankID = fromIntegral rankID,
payload = fromProgress threadProgress
}
ThreadPause (DownstreamThreadID threadCmdID processID taskID threadID rankID) ->
J.ThreadPause
{ cmdID = CmdID.toText threadCmdID,
processID = fromIntegral processID,
taskID = fromTaskID taskID,
threadID = fromIntegral threadID,
rankID = fromIntegral rankID
}
ThreadPhaseContext
(DownstreamThreadID threadCmdID processID taskID threadID rankID)
(PhaseContext cpu aggregation computetime totaltime) ->
J.ThreadPhaseContext
{ cmdID = CmdID.toText threadCmdID,
processID = fromIntegral processID,
taskID = fromTaskID taskID,
threadID = fromIntegral threadID,
rankID = fromIntegral rankID,
cpu = cpu,
aggregation = aggregation,
computetime = computetime,
totaltime = totaltime
}
ThreadPhasePause (DownstreamThreadID threadCmdID processID taskID threadID rankID) ->
J.ThreadPhasePause
{ cmdID = CmdID.toText threadCmdID,
processID = fromIntegral processID,
taskID = fromTaskID taskID,
threadID = fromIntegral threadID,
rankID = fromIntegral rankID
}
fromJ = \case
J.CmdPerformance {..} ->
CmdPerformance (fromJust $ fromText cmdID) (Operations perf)
J.CmdPause {..} ->
CmdPause (fromJust $ CmdID.fromText cmdID)
J.ThreadProgress {..} ->
ThreadProgress
( DownstreamThreadID
(fromJust $ CmdID.fromText cmdID)
(fromInteger $ toInteger processID)
(TaskID taskID)
(fromInteger $ toInteger threadID)
(fromInteger $ toInteger rankID)
)
(payload & progress)
J.ThreadPause {..} ->
ThreadPause
( DownstreamThreadID
(fromJust $ CmdID.fromText cmdID)
(fromInteger $ toInteger processID)
(TaskID taskID)
(fromInteger $ toInteger threadID)
(fromInteger $ toInteger rankID)
)
J.ThreadPhaseContext {..} ->
ThreadPhaseContext
( DownstreamThreadID
(fromJust $ CmdID.fromText cmdID)
(fromInteger $ toInteger processID)
(TaskID taskID)
(fromInteger $ toInteger threadID)
(fromInteger $ toInteger rankID)
)
(PhaseContext cpu aggregation computetime totaltime)
J.ThreadPhasePause {..} ->
ThreadPhasePause
( DownstreamThreadID
(fromJust $ CmdID.fromText cmdID)
(fromInteger $ toInteger processID)
(TaskID taskID)
(fromInteger $ toInteger threadID)
(fromInteger $ toInteger rankID)
)
{-# OPTIONS_GHC -fno-warn-partial-fields #-}
-- |
-- Module : NRM.Types.Messaging.DownstreamEvent.JSON
-- Copyright : (c) UChicago Argonne, 2019
-- License : BSD3
-- Maintainer : fre@freux.fr
module NRM.Types.Messaging.DownstreamEvent.JSON
( Event (..),
)
where
import Codegen.CHeader
import Protolude
data Event
= CmdPerformance
{ cmdID :: Text,
perf :: Int
}
| CmdPause
{ cmdID :: Text
}
| ThreadProgress
{ cmdID :: Text,
processID :: Int,
taskID :: Text,
threadID :: Int,
rankID :: Int,
payload :: Int
}
| ThreadPause
{ cmdID :: Text,
processID :: Int,
taskID :: Text,
threadID :: Int,
rankID :: Int
}
| ThreadPhaseContext
{ cmdID :: Text,
processID :: Int,
taskID :: Text,
threadID :: Int,
rankID :: Int,