Skip to main content

Scaling a Pricing System Using GraphQL Subscriptions

IMG 1901 (1)

Introduction

In the realm of modern business infrastructure, pricing systems serve as the backbone of revenue generation. As user bases expand and real-time data becomes increasingly crucial, the conventional methods of architecture often reveal their limitations. 

Pricing systems have traditionally relied on a client-server polling strategy, where clients repeatedly query the server for updated pricing information. While this approach has served its purpose, it introduces inefficiencies and limitations as the volume of clients and frequency of updates grow. Some of those limitations are the following: 

  • Network Congestion and Latency: In a polling architecture, each client sends repeated requests to the server at fixed intervals, regardless of whether there are actual updates to be received. As the number of clients increases, this flood of unnecessary requests can congest the network, leading to increased latency and delayed responses. 
  • Limited Real-Time Responsiveness: Polling architectures inherently introduce delays in delivering real-time updates. Since clients can only receive new pricing information when they actively poll the server, there is a delay between the server’s update and the client’s receipt of that update. This lag can be especially problematic in time-sensitive industries, such as financial trading. 
  • Scalability Challenges: As the number of clients increases, the burden on the server grows exponentially due to the cumulative effect of numerous polling requests. Scaling such systems to accommodate spikes in demand becomes complex and resource-intensive, often requiring complex load-balancing strategies and additional server provisioning. 

During a recent FanDuel Hackathon, our internal pricing system was revisited to replace its polling strategy by a server-push strategy leveraging the power of GraphQL subscriptions and WebSockets. 

 

 

In this article we are going to delve into the skeleton of the solution that was implemented using a demo application to demonstrate the concepts involved. Only the relevant parts of the code will be included in this article but the full project can be found on Github. 

 

GraphQL 

GraphQL is a query language for APIs. It was developed by Facebook in 2012 and later open-sourced in 2015. Unlike traditional REST APIs, where endpoints dictate the shape and structure of responses, GraphQL enables clients to specify the exact data requirements they have. This client-centric approach allows for a more efficient data retrieval process, reducing the amount of unnecessary data transferred over the network. With GraphQL, the server responds with precisely the requested data, eliminating the need for multiple round-trips and streamlining the client-server interaction. 

 

WebSockets 

WebSockets is a communication protocol that provides a full-duplex, bidirectional communication channel over a single, long-lived connection between a client and a server. Unlike traditional HTTP communication, which involves sending requests from clients and receiving responses from servers, WebSockets allow both clients and servers to send data to each other independently, without the overhead of creating new connections for each interaction. 

 

GraphQL Subscriptions 

One of the less well-known features of GraphQL is the GraphQL subscriptions. This feature extends the capabilities of the GraphQL query language to enable real-time data updates going from the server to the client. While standard GraphQL queries and mutations are designed for fetching and modifying data, subscriptions are tailored for scenarios where clients need to receive live updates when specific data changes on the server. 

One of the possible ways to implement GraphQL subscriptions is to rely on the WebSockets protocol to propagate the data from the server to the client when necessary. This is what we did to scale our internal pricing system and we will now go in detail into the technical solution.

 

Application Architecture and Implementation Steps 

We are going through the solution that was implemented and dissect it to explain the concepts that allowed us to scale our pricing system. 

1. The first thing that needed to be done was to specify the schema to be used by the GraphQL engine: 

schema { 
    query: Query # Schemas must have at least a query root type 
    subscription : Subscription 
} 
 
type Query { 
    dummyQueryValue : String 
} 
 
type Subscription { 
    marketPrices(marketIds:[String]!) : MarketPrice! 
} 
 
type MarketPrice { 
    id : String 
    name : String 
    price : Float 
}


Here, it was specified a GraphQL subscription for market prices notifications. Using that subscription, it is possible to subscribe for prices notifications of specific markets using their IDs. It is also possible to select the information we want to get in each notification from the list of available parameters - ID, name and price. Note that a real pricing system would probably provide a lot more information, but we are going to keep it simple for demonstration purposes. Note also that we needed to specify a Query type at the root of the schema even though we did not need it - this is a requirement from the GraphQL engine which requires us to define a top-level query type. Learn more about the syntax of the GraphQL schemas

2. Then, we created a Java class to map the GraphQL MarketPrice type into a Java object. Note that the names of the fields in the Java class need to match the names defined in the schema type so that the GraphQL library is able to properly do the required mapping. 

public record MarketPrice(String id, String name, double price) {}

 

3. After creating a Java class, we needed to implement a WebSocket server to support the GraphQL subscriptions. For that purpose, we have used the spring-boot-starter-websocket dependency that allows to very easily setup a WebSocket server by creating a Spring configuration class that extends the WebSocketConfigurerclass and that it is enriched with the @EnableWebSocket annotation. Then, we registered the WebSocket handler for GraphQL to process requests made to the /ws/graphql path. 

@Configuration 
@EnableWebSocket 
public class PricingNotificationsApplicationConfig implements WebSocketConfigurer { 
 
	@Override 
	public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { 
		GraphQL graphQL = GraphQLMarketPricesInitializer.build( 
				new MarketPricesDummyPublisher(), 
				"graphql/market_price.graphql"); 
		registry.addHandler(new GraphQLWebsocketHandler(graphQL), "/ws/graphql"); 
	} 
}

 

4. For the GraphQL WebSocket handler, we needed to create a GraphQL instance capable of receiving the GraphQL market prices subscriptions and replying with the desired information. The GraphQLMarketPricesInitializer class is responsible for building that instance. It first reads the GraphQL schema exposed above and parses it:  

public class GraphQLMarketPricesInitializer { 
  ... 
  public static GraphQL build(MarketPricesPublisher marketPricesPublisher, String graphQlSchemaFile) { 
    InputStream graphQlSchemaStream = GraphQLMarketPricesInitializer.class.getClassLoader().getResourceAsStream(graphQlSchemaFile); 
    Reader graphQlSchemaReader = new InputStreamReader(graphQlSchemaStream); 
    TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(graphQlSchemaReader); 
    ... 
  } 
  ... 
}

 

5. Then, the GraphQLMarketPricesInitializer class does the required wiring using the GraphQL library to link the marketPrices subscription to the appropriate handler which should be an implementation of the GraphQL library DataFetcherinterface. To handle the market prices subscriptions, we’ve created a DataFetcher that uses Reactive Streams to create a stream of price notifications for the desired markets. Note that the names used here to do the wiring must match the names specified on the GraphQL schema file.

public class GraphQLMarketPricesInitializer { 
 
  private static final String SUBSCRIPTION_WIRING = "Subscription"; 
  private static final String MARKET_PRICES_SUBSCRIPTION = "marketPrices"; 
  private static final String MARKET_PRICES_SUBSCRIPTION_MARKET_IDS = "marketIds"; 
 
  public static GraphQL build(MarketPricesPublisher marketPricesPublisher, String graphQlSchemaFile) { 
    ... 
    RuntimeWiring wiring = RuntimeWiring.newRuntimeWiring() 
            .type(TypeRuntimeWiring 
                    .newTypeWiring(SUBSCRIPTION_WIRING) 
                    .dataFetcher(MARKET_PRICES_SUBSCRIPTION, marketPricesSubscriptionFetcher(marketPricesPublisher)) 
            ) 
            .build(); 
 
    GraphQLSchema schema = new SchemaGenerator().makeExecutableSchema(typeRegistry, wiring); 
    return GraphQL.newGraphQL(schema).build(); 
  } 
 
  private static DataFetcher<Publisher<MarketPrice>> marketPricesSubscriptionFetcher(MarketPricesPublisher marketPricesPublisher) { 
      return environment -> { 
          Set<String> marketIds = Set.copyOf(environment.getArgument(MARKET_PRICES_SUBSCRIPTION_MARKET_IDS)); 
          return marketPricesPublisher.getPublisher(marketIds); 
      }; 
  } 
}

 

6. After implementing the GraphQL WebSocket handler, we needed to create a continuous stream of market price notifications. For that purpose, we’ve created the DummyMarketPricesPublisherclass using Reactor, which is an implementation of the Reactive Streams specification. In the DummyMarketPricesPublisher implementation, we’ve defined a set of dummy markets available for subscription and then we’ve created a continuous stream of market prices notifications that emits new market prices for each dummy market specified above with a periodicity of 1 second.

public interface MarketPricesPublisher { 
    Publisher<MarketPrice> getPublisher(Set<String> marketIds); 
} 
 
public class MarketPricesDummyPublisher implements MarketPricesPublisher { 
  private static final int PRICE_UPDATE_INTERVAL_SECONDS = 1; 
 
  private record DummyMarket(String id, String name) {} 
 
  private final List<DummyMarket> dummyMarkets = List.of( 
          new DummyMarket("1", "Porto vs Benfica"), 
          new DummyMarket("2", "Liverpool vs Manchester United"), 
          new DummyMarket("3", "Braga vs Madrid"), 
          new DummyMarket("4", "Ajax vs Barcelona"), 
          new DummyMarket("5", "Arsenal vs Milan") 
  ); 
 
  public MarketPricesDummyPublisher() { 
      publisher = Flux.interval(Duration.ofSeconds(PRICE_UPDATE_INTERVAL_SECONDS)) 
              .flatMapIterable(value -> dummyMarkets.stream() 
                      .map(dummyMarket -> new MarketPrice(dummyMarket.id, dummyMarket.name, calculateRandomPrice())) 
                      .collect(Collectors.toList()) 
              ); 
  } 
  ... 
}

 

7. Then, the last step was to handle incoming WebSockets connections and provide the market price updates for the desired markets. For that purpose, we’ve created a class named GraphQLWebsocketHandler that implements the WebSocketHandler interface. This is the class that connects each WebSocket connection to a Reactor stream of market price updates for the requested markets and that leverages the GraphQL toSpecification() method to return only the requested data.

public class GraphQLWebsocketHandler implements WebSocketHandler { 
    ... 
    @Override 
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) { 
        handleNewGraphQlSubscription(session, ((TextMessage) message).getPayload()); 
    } 
    ... 
    private void handleNewGraphQlSubscription(WebSocketSession session, String message) { 
        ExecutionResult executionResult = graphQL.execute(ExecutionInput.newExecutionInput().query(message)); 
        SubscriptionPublisher subscriptionPublisher = executionResult.getData(); 
        Flux.from(subscriptionPublisher) 
                .takeWhile(ignored -> session.isOpen()) 
                .subscribe(marketPrice -> { 
                    try { 
                        session.sendMessage(new TextMessage(marketPrice.toSpecification().toString())); 
                    } catch (IOException e) { 
                        throw new RuntimeException(e); 
                    } 
                }); 
    } 
}

 

The image below shows the final result: in the left panel, you can see the clients making the subscriptions (subscription { marketPrices(marketIds: ["1","3","5"]) { id price } }and subscription { marketPrices(marketIds: ["2","4"]) { id price } }) and getting the desired information. On the right side you can see the server receiving and accepting those subscriptions:

 

We can then leverage the power of GraphQL to request extra data that we may want, such as the name of the market - subscription { marketPrices(marketIds: ["2"]) { price name } }:  

With this solution, we can remove the necessity for constant server polling, and we can select the exact data that we want to receive, which ultimately facilitates the scaling of the system. 

 

Conclusion 

To sum up, the adoption of GraphQL subscriptions in place of a client-server polling approach has been a critical change that has improved system efficiency and scalability. With this change, our system can now handle increased traffic and provide real-time updates, guaranteeing that our pricing information is dynamic, correct and current. We have enabled our system to easily manage growing loads, which greatly enhanced the user experience, minimised network overhead, and optimised our data exchange. 

We've simplified our codebase, making it easier to maintain and modify, by using this contemporary methodology. This change is a calculated attempt to future-proof our pricing structure so that it can adapt to changing market conditions and yet be flexible and responsive. 

The implementation of GraphQL subscriptions have clearly shown their capacity to improve the functionality and dependability of our pricing system, while also laying the groundwork for a more responsive pricing structure that is future-ready, as we continue to embrace cutting-edge tactics and technologies.  

 

 

 

Check out the Backend Developer opportunities we have available and join us!

Related Articles

Learnconcurrentprogrammingwithgo

Exploring Seamless Concurrency: Modern Practices with Go

Disclaimer: The following article content and excerpts are derived from Manning Publications' article titled "Modern Concurrency with Go”. While we have restructured and integrated portions of their original content, both text and images, into our…

View More
Screenshot 2024 02 29 At 22.05.16

Unlocking Success in Core Internal Product Management

Our company recently played host to a transformative Product Weekend and became the epicentre of Product knowledge, welcoming presenters from various tech powerhouses to contribute with their unique perspectives and…

View More
Cactus Day2 034

Test Drive: The Challenges of Race Conditions in Security Testing

Blip has a dedicated Security Testing team that performs penetration testing, continuous testing, and red teaming. By being part of S-SDLC (Secure Software Development Life Cycle) process, the team handles development and infrastructure security…

View More