Vaughan Reid's blog

Consuming paged Twitter data with async enumerable

A really nice feature in C# 8 is async enumerables. A popular use case is if you want to consume paged data and potentially aggregate each page of data without having to get everything first. Previously the consuming code would have to handle the paging but now your method can just enumerate over the IAsyncEnumerable and process the data after each call. Its actually simpler than I’m making it sound! I created a small console app that will hopefully show you what I mean. If you want to run it locally you can get the source here.

I created an interface and class that does the twitter search. The code doesn’t really have any error handling but hopefully you get the idea. The interesting part is that after each http call, the method will switch context after the yield return. This allows the consuming code to use the result and enumerate afterwards if it requires another call.


public interface ITwitterSearch
{
	IAsyncEnumerable<TwitterStatus[]> Search(string searchTerm, DateTime toDate);
}

public class TwitterSearch : ITwitterSearch
{
	private readonly TwitterSettings _settings;
	private readonly IHttpClientFactory _httpClientFactory;
	private readonly JsonSerializerSettings _jsonSerializerSettings;
	
	public TwitterSearch(IOptions<TwitterSettings> settings, IHttpClientFactory httpClientFactory)
	{
		_settings = settings.Value;
		_httpClientFactory = httpClientFactory;
		_jsonSerializerSettings = new JsonSerializerSettings
		{
			DateFormatString = "ddd MMM dd HH:mm:ss +ffff yyyy"
		};
	}

	public async IAsyncEnumerable<TwitterStatus[]> Search(string searchTerm, DateTime toDate)
	{
		var client = _httpClientFactory.CreateClient("twitter");

		int maxQueries = 20;
		long? lowestId = null;
		while (maxQueries-- > 0)
		{
			string query = $"?q={HttpUtility.UrlEncode(searchTerm)}&result_type=recent&lang=en&count=100";
			if(lowestId != null)
			{
				query += $"&max_id={lowestId-1}";
			}

			var result = await client.GetAsync(query);
			var text = await result.Content.ReadAsStringAsync();

			var response = JsonConvert.DeserializeObject<TwitterResponse>(text,_jsonSerializerSettings);

			yield return response.Statuses.Where(s=>s.Created_At > toDate).ToArray();

			DateTime minDate = response.Statuses.Min(s => s.Created_At);

			if(minDate < toDate)
			{
				break;
			}
			lowestId = response.Statuses.Min(s => s.Id);
		}
	}
}

To use this class I have a service that enumerates with await foreach(var statuses in _twitterSearch.Search(line, DateTime.UtcNow.AddMinutes(-300))). In my example I just log how many statuses I receive but I could potentially use an aggregation function such as to sum the occurrences of specific words without having to keep all the text in memory.


public class ReadFromTwitterService : BackgroundService
{
	private readonly ITwitterSearch _twitterSearch;

	public ReadFromTwitterService(ITwitterSearch twitterSearch)
	{
		this._twitterSearch = twitterSearch;
	}
	
	protected override async Task ExecuteAsync(CancellationToken stoppingToken)
	{
		while (!stoppingToken.IsCancellationRequested)
		{
			Console.WriteLine($"Please enter search term.");

			string line = Console.ReadLine();

			await foreach(var statuses in _twitterSearch.Search(line, DateTime.UtcNow.AddMinutes(-300)))
			{
				Console.WriteLine($"{statuses.Length} statuses received");
			}

			await Task.Delay(TimeSpan.FromSeconds(2));
		}
	}
}

To get an example of how this would look, I took a screenshot of the initial input and the subsequent logs.

TwitterReaderInput

TwitterReaderOutput

Looking at the result, you can see that it makes the http call, then the calling code logs out how many statuses are received. It then makes the next call etc.