Resolvers
Resolvers are at the core of gql; a resolver Resolver[F, I, O]
takes an I
and produces an O
in effect F
.
Resolvers are embedded in fields and act as continuations.
When gql executes a query it first constructs a tree of continueations from your schema and the supplied GraphQL query.
Resolver
s act and compose like functions with combinators such as andThen
and compose
.
Resolver
forms an Arrow
and Choice
.
Lets start off with some imports:
import gql._
import gql.dsl.all._
import gql.resolver._
import gql.ast._
import cats.effect._
import cats.implicits._
import cats.data._
Resolvers
Resolver
is a collection of high-level combinators that constructs a tree of Step
.
If you are familiar with the relationship between fs2.Stream
and fs2.Pull
, then the relationship between Resolver
and Step
should be familiar.
Lift
Resolver.lift
lifts a function I => O
into Resolver[F, I, O]
.
lift
's method form is map
, which for any resolver Resolver[F, I, O]
produces a new resolver Resolver[F, I, O2]
given a function O => O2
.
val r = Resolver.lift[IO, Int](_.toLong)
// r: Resolver[IO, Int, Long] = gql.resolver.Resolver@58ed210d
r.map(_.toString())
// res0: Resolver[IO, Int, String] = gql.resolver.Resolver@7323460c
Effect
effect
like lift
lifts a function, but instead an effectful one like I => F[O]
into Resolver[F, I, O]
.
effect
's method form is evalMap
(like Resource
and fs2.Stream
).
val r = Resolver.effect[IO, Int](i => IO(i.toLong))
// r: Resolver[IO, Int, Long] = gql.resolver.Resolver@57d4c1ed
r.evalMap(l => IO(l.toString()))
// res1: Resolver[[x]IO[x], Int, String] = gql.resolver.Resolver@6cb777eb
Arguments
Arguments in gql are provided through resolvers.
A resolver Resolver[F, I, A]
can be constructed from an argument Arg[A]
, through either argument
or arg
in method form.
lazy val ageArg = arg[Int]("age")
val r = Resolver.argument[IO, Nothing, String](arg[String]("name"))
// r: Resolver[IO, Nothing, String] = gql.resolver.Resolver@78f32bc2
val r2 = r.arg(ageArg)
// r2: Resolver[IO, Nothing, (Int, String)] = gql.resolver.Resolver@409d5d8a
r2.map{ case (age, name) => s"$name is $age years old" }
// res2: Resolver[IO, Nothing, String] = gql.resolver.Resolver@620eff95
Arg
also has an applicative defined for it, so multi-argument resolution can be simplified to.
val r = Resolver.argument[IO, Nothing, (String, Int)](
(arg[String]("name"), arg[Int]("age")).tupled
)
// r: Resolver[IO, Nothing, (String, Int)] = gql.resolver.Resolver@5ccc16bb
r.map{ case (age, name) => s"$name is $age years old" }
// res3: Resolver[IO, Nothing, String] = gql.resolver.Resolver@42482503
Meta
The meta
resolver provides metadata regarding query execution, such as the position of query execution, field aliasing and the provided arguments.
It also allows the caller to inspect the query ast such that more exotic operations become possible. For instance, arguments can dynamically be inspected.
lazy val a = arg[Int]("age")
Resolver.meta[IO, String].map(meta => meta.astNode.arg(a))
// res4: Resolver[IO, String, Option[Int]] = gql.resolver.Resolver@44690ff
The relational integration makes heavy use of this feature.
Errors
Errors are reported in cats.data.Ior
.
An Ior
is a non-exclusive Either
.
The Ior
datatype's left side must be String
and acts as an optional error that will be present in the query result.
gql can return an error and a result for the same path, given that Ior
has both it's left and right side defined.
Errors are embedded into resolvers via rethrow
.
The extension method rethrow
is present on any resolver of type Resolver[F, I, Ior[String, O]]
:
val r = Resolver.lift[IO, Int](i => Ior.Both("I will be in the errors :)", i))
// r: Resolver[IO, Int, Ior.Both[String, Int]] = gql.resolver.Resolver@13dd8b16
r.rethrow
// res5: Resolver[[A]IO[A], Int, Int] = gql.resolver.Resolver@327b6e17
We can also use emap
to map the current value into an Ior
.
val r = Resolver.id[IO, Int].emap(i => Ior.Both("I will be in the errors :)", i))
// r: Resolver[IO, Int, Int] = gql.resolver.Resolver@6473f28d
First
Resolver
also implements first
(Resolver[F, A, B] => Resolver[F, (A, C), (B, C)]
) which can be convinient for situations where one would usually have to trace a value through an entire computation.
Since a Resolver
does not form a Monad
, first
is necessary to implement non-trivial resolver compositions.
For instance, maybe your program contains a general resolver compositon that is used many places, like say verifying credentials, but you'd like to trace a value through it without having to keep track of tupling output with input.
Assume we'd like to implement a resolver, that when given a person's name, can get a list of the person's friends.
case class PersonId(value: Int)
case class Person(id: PersonId, name: String)
def getFriends(id: PersonId, limit: Int): IO[List[Person]] = ???
def getPerson(name: String): IO[Person] = ???
def getPersonResolver = Resolver.effect[IO, String](getPerson)
def limitResolver = Resolver.argument[IO, Person, Int](arg[Int]("limit"))
def limitArg = arg[Int]("limit")
getPersonResolver
// 'arg' tuples the input with the argument value
.arg(limitArg)
.evalMap{ case (limit, p) => getFriends(p.id, limit) }
// res6: Resolver[[x]IO[x], String, List[Person]] = gql.resolver.Resolver@5da74465
Batch
Like most other GraphQL implementations, gql also supports batching.
Unlike most other GraphQL implementations, gql's batching implementation features a global query planner that lets gql delay field execution until it can be paired with another field.
Batch declaration and usage occurs as follows:
- Declare a function
Set[K] => F[Map[K, V]]
. - Give this function to gql and get back a
Resolver[F, Set[K], Map[K, V]]
in aState
monad (for unique id generation). - Use this new resolver where you want batching.
And now put into practice:
def getPeopleFromDB(ids: Set[PersonId]): IO[List[Person]] = ???
Resolver.batch[IO, PersonId, Person]{ keys =>
getPeopleFromDB(keys).map(_.map(x => x.id -> x).toMap)
}
// res7: State[SchemaState[IO], Resolver[IO, Set[PersonId], Map[PersonId, Person]]] = cats.data.IndexedStateT@22cf8730
Whenever gql sees this resolver in any composition, it will look for similar resolvers during query planning.
Note, however, that you should only declare each batch resolver variant once, that is, you should build your schema in State
.
gql consideres different batch instantiations incompatible regardless of any type information.
State has Monad
(and transitively Applicative
) defined for it, so it composes well.
Here is an example of multiple batchers:
def b1 = Resolver.batch[IO, Int, Person](_ => ???)
def b2 = Resolver.batch[IO, Int, String](_ => ???)
(b1, b2).tupled
// res8: State[SchemaState[IO], (Resolver[IO, Set[Int], Map[Int, Person]], Resolver[IO, Set[Int], Map[Int, String]])] = cats.data.IndexedStateT@45ba7bec
Even if your field doesn't benefit from batching, batching can still do duplicate key elimination.
Batch resolver syntax
When a resolver in a very specific form Resolver[F, Set[K], Map[K, V]]
, then the gql dsl provides some helper methods.
For instance, a batcher may be embedded in a singular context (K => V
).
Here is a showcase of some of the helper methods:
def pb: Resolver[IO, Set[Int], Map[Int, Person]] =
// Stub implementation
Resolver.lift(_ => Map.empty)
// None if a key is missing
pb.all[List]
// res9: Resolver[[A]IO[A], List[Int], List[Option[Person]]] = gql.resolver.Resolver@701969df
// Every key must have an associated value
// or else raise an error via a custom show-like typeclass
implicit lazy val showMissingPersonId =
ShowMissingKeys.showForKey[Int]("not all people could be found")
pb.traversable[List]
// res10: Resolver[[A]IO[A], List[Int], List[Person]] = gql.resolver.Resolver@710edadf
// Maybe there is one value for one key?
pb.opt
// res11: Resolver[[A]IO[A], Int, Option[Person]] = gql.resolver.Resolver@2c0ebb8f
// Same as opt
pb.all[cats.Id]
// res12: Resolver[[A]IO[A], cats.package.Id[Int], cats.package.Id[Option[Person]]] = gql.resolver.Resolver@330d854d
// There is always one value for one key
pb.one
// res13: Resolver[[A]IO[A], Int, Person] = gql.resolver.Resolver@221d2afa
// You can be more explicit via the `batch` method
pb.batch.all[NonEmptyList]
// res14: Resolver[[A]IO[A], NonEmptyList[Int], NonEmptyList[Option[Person]]] = gql.resolver.Resolver@23310489
Using batch
aids with better compiler error messages.
Resolver.lift[IO, Int](_.toString()).batch.all
// error: Cannot prove that Set[K] =:= Int.
// Resolver.lift[IO, Int](_.toString()).batch.all
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For larger programs, consider declaring all your batchers up-front and putting them into some type of collection:
case class MyBatchers(
personBatcher: Resolver[IO, Set[Int], Map[Int, Person]],
intStringBatcher: Resolver[IO, Set[Int], Map[Int, String]]
)
(b1, b2).mapN(MyBatchers.apply)
// res16: State[SchemaState[IO], MyBatchers] = cats.data.IndexedStateT@5b633949
For most batchers it is likely that you eventually want to pre-compose them in various ways, for instance requsting args, which this pattern promotes.
Sometimes you have multiple groups of fields in the same object where each group have different performance overheads.
Say you had a Person
object in your database.
This Person
object also exists in a remote api.
This remote api can tell you, the friends of a Person
given the object's id and name.
Written out a bit more structured we have that:
PersonId => PersonId
(identity)PersonId => PersonDB
(database query)PersonDB => PersonRemoteAPI
(remote api call)PersonId => PersonRemoteAPI
(composition of database query and remote api call)
And now put into code:
// We have a trivial id field for our person id
def pureFields = fields[IO, PersonId](
"id" -> lift(id => id)
)
// If we query our database with a person id, we get a person database object
case class PersonDB(
id: PersonId,
name: String,
remoteApiId: String
)
// SELECT id, name, remote_api_id FROM person WHERE id in (...)
def dbBatchResolver: Resolver[IO, PersonId, PersonDB] = ???
// From the db we can get the name and the remote api id
def dbFields = fields[IO, PersonDB](
"name" -> lift(_.name),
"apiId" -> lift(_.remoteApiId)
)
// The remote api data can be found given the result of a db query
case class PersonRemoteAPI(
id: PersonId,
friends: List[PersonId]
)
// Given a PersonDB we can call the api (via a batched GET or something)
def personBatchResolver: Resolver[IO, PersonDB, PersonRemoteAPI] = ???
// We can get the friends from the remote api
def remoteApiFields = fields[IO, PersonRemoteAPI](
"friends" -> lift(_.friends)
)
// Now we can start composing our fields
// We can align the types of the db and remote api data to the PersonDB type
// by composing the remote api resolver on the remote api fields
def dbFields2: Fields[IO, PersonDB] =
remoteApiFields.compose(personBatchResolver) ::: dbFields
// Given a PersonId we have every field
// If "friends" is selected, gql will first run `dbBatchResolver` and then `personBatchResolver`
def allFields = dbFields2.compose(dbBatchResolver) ::: pureFields
implicit def person: Type[IO, PersonId] = tpeNel[IO, PersonId](
"Person",
allFields
)
The general pattern for this decomposition revolves around figuring out what the most basic description of your object is.
In this example, every fields can (eventually through various side-effects) be resolved from just PersonId
.
Batchers from elsewhere
Most batching implementations have compatible signatures and can be adapted into a gql batcher.
For instance, converting fetch
to gql:
import fetch._
object People extends Data[PersonId, Person] {
def name = "People"
def source: DataSource[IO, PersonId, Person] = ???
}
Resolver
.batch[IO, PersonId, Person](_.toList.toNel.traverse(People.source.batch).map(_.getOrElse(Map.empty)))
// res17: State[SchemaState[IO], Resolver[IO, Set[PersonId], Map[PersonId, Person]]] = cats.data.IndexedStateT@422b63b3
Inline batch
A batch resolver can also be defined inline with some notable differences to the regular batch resolver:
- It does not need to be defined in state.
- It is not subject to global query planning, and is only ever called with inputs from the same selection.
The inline batch resolver has the same signature as a regular batch resolver; Set[K] => F[Map[K, V]]
.
Resolver.inlineBatch[IO, PersonId, Person](
_.toList.toNel.traverse(People.source.batch).map(_.getOrElse(Map.empty))
)
// res18: Resolver[IO, Set[PersonId], Map[PersonId, Person]] = gql.resolver.Resolver@6d1a5917
Choice
Resolvers also implement Choice
via (Resolver[F, A, C], Resolver[F, B, D]) => Resolver[F, Either[A, B], Either[C, D]]
.
On the surface, this combinator may have limited uses, but with a bit of composition we can perform tasks such as caching.
For instance, a combinator derived from Choice
is skippable: Resolver[F, I, O] => Resolver[F, Either[I, O], O]
, which acts as a variant of "caching".
If the right side is present we skip the underlying resolver (Resolver[F, I, O]
) altogether.
For any resolver in the form Resolver[F, I, Either[L, R]]
we modify the left side with leftThrough
and the right with rightThrough
.
For Instance we can implement caching.
def getPersonForId(id: PersonId): IO[Person] = ???
type CachedPerson = Either[PersonId, Person]
def cachedPerson = tpe[IO, CachedPerson](
"Person",
"id" -> lift(_.map(_.id).merge.value),
// We'll align the left and right side of the choice and then merge the `Either`
"name" -> build[IO, CachedPerson](_.leftThrough(_.evalMap(getPersonForId)).map(_.merge.name))
)
We can also use some of the compose
tricks from the batch resolver syntax section if we have a lot of fields that depend on Person
.
The query planner treats the choice branches as parallel, such that for two instances of a choice, resolvers in the two branches may be batched together.
Stream
The stream resolver embeds an fs2.Stream
and provides the ability to emit a stream of results for a graphql subscription.
Stream semantics
- When atleast one stream emits a new value, the interpreter will begin accumulating values within a configurable time frame.
- Once a batch of updates have been collected, the interpreter will remove redundant updates. An update is redundant when atleast one of it's ancestors have also updated.
- The interpreter will lazily pull values. If a stream emits, that value will be used to perform a re-evaluation, but no more values will be pulled from that stream until re-evaluation has been completed.
- Updates only re-evaluate from the updated node and down the tree.
If different pull semantics are desired, like one that drops updates, then that can be encoded into the stream on the user side, like a fs2.concurrent.Signal
's discrete
method.
For instance a schema designed like the following, emits incremental updates regarding the price for some symbol:
type PriceChange {
difference: Float!
}
type Subscription {
priceChanges(symbolId: ID!): PriceChange!
}
And here is a schema that represents an api that emits updates regarding the current price of a symbol, where old values can safely be dropped dropped:
type SymbolState {
price: Float!
}
type Subscription {
price(symbolId: ID!): SymbolState!
}
For a given stream node, it must be able to hold all child resources alive. As such, for a given stream, gql must await a next element from the stream before releasing any currently held resources sub-tree. This means that gql must be able to pull one element before closing the old one.
Here is an example of some streams in action:
import scala.concurrent.duration._
import cats.effect.unsafe.implicits.global
case class Streamed(value: Int)
implicit lazy val streamed: Type[IO, Streamed] = tpe[IO, Streamed](
"Streamed",
"value" -> build[IO, Streamed](_.streamMap{ s =>
fs2.Stream
.bracket(IO(println(s"allocating $s")))(_ => IO(println(s"releasing $s"))) >>
fs2.Stream
.iterate(0)(_ + 1)
.evalTap(n => IO(println(s"emitting $n for $s")))
.meteredStartImmediately(((5 - s.value) * 20).millis)
.as(Streamed(s.value + 1))
})
)
def query = """
subscription {
streamed {
value {
value {
value {
__typename
}
}
}
}
}
"""
def schema = SchemaShape.unit[IO](
fields("ping" -> lift(_ => "pong")),
subscription = Some(fields("streamed" -> lift(_ => Streamed(0))))
)
Schema.simple(schema)
.map(Compiler[IO].compile(_, query))
.flatMap { case Right(Application.Subscription(stream)) => stream.take(4).compile.drain }
.unsafeRunSync()
// allocating Streamed(0)
// emitting 0 for Streamed(0)
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// releasing Streamed(2)
// emitting 1 for Streamed(0)
// releasing Streamed(1)
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// releasing Streamed(2)
// emitting 2 for Streamed(0)
// releasing Streamed(1)
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// releasing Streamed(2)
// releasing Streamed(1)
// emitting 3 for Streamed(0)
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// releasing Streamed(1)
// releasing Streamed(2)
// releasing Streamed(0)
gql also allows the user to specify how much time the interpreter may await more stream updates:
Schema.simple(schema).map(Compiler[IO].compile(_, query, accumulate=Some(10.millis)))
Furthermore, gql can also emit interpreter information if you want to look into what gql is doing:
Schema.simple(schema)
.map(Compiler[IO].compile(_, query, debug=gql.server.interpreter.DebugPrinter[IO](s => IO(println(s)))))
.flatMap { case Right(Application.Subscription(stream)) => stream.take(3).compile.drain }
.unsafeRunSync()
// allocating Streamed(0)
// emitting 0 for Streamed(0)
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// awaiting updates
// releasing Streamed(2)
// releasing Streamed(1)
// emitting 1 for Streamed(0)
// updates arrived
// interpreting for 1 inputs
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// done interpreting
// awaiting updates
// releasing Streamed(2)
// releasing Streamed(1)
// emitting 2 for Streamed(0)
// updates arrived
// interpreting for 1 inputs
// allocating Streamed(1)
// emitting 0 for Streamed(1)
// allocating Streamed(2)
// emitting 0 for Streamed(2)
// done interpreting
// releasing Streamed(1)
// releasing Streamed(2)
// releasing Streamed(0)
Steps
A Step
is the low-level algebra for a resolver, that describes a single step of evaluation for a query.
The variants of Step
are clearly listed in the source code. Most variants of step provide orthogonal properties.