FAQ Database Discussion Community


ActionBlock Framework 4 rx alternative

c#,system.reactive,tpl-dataflow
I'm interested in an ActionBlock implementation for Framework 4.0, since there it seems that TPL.Dataflow isn't supported for Framework 4.0. More particularly, I'm interested in the case of the constructor that receives the Func<TInput, Task> delegate and the MaxDegreeOfParallism = 1 case. I thought about implementing it using reactive extensions,...

Reactive Extensions (Rx) - sample with last known value when no value is present in interval

c#,.net,system.reactive,reactive-programming
I have an observable stream that produces values at inconsistent intervals like this: ------1---2------3----------------4--------------5--- And I would like to sample this but without any empty samples once the a value has been produced: ------1---2------3----------------4--------------5----- ----_----1----2----3----3----3----4----4----4----5----5 I obviously thought Replay().RefCount() could be used here to provide the last known value to...

How to use .NET Rx FromEvent so that add event handler is called once and remove handler is called on demand

.net,system.reactive,observable
Is it possible to use .NET Rx Observable.FromEvent method or is there another method that allows to create an Observable from an event but calls add event handler only once when the first Subscribe method is called or when the FromEvent (or similar method) is called and don't call remove...

ConfigureAwait for IObservable

c#,asynchronous,async-await,system.reactive
I'm experiencing a deadlock when I use blocking code with Task.Wait(), waiting an async method that inside awaits an Rx LINQ query. This is an example: public void BlockingCode() { this.ExecuteAsync().Wait(); } public async Task ExecuteAsync() { await this.service.GetFooAsync().ConfigureAwait(false); //This is the RX query that doesn't support ConfigureAwaitawait await this.service.Receiver...

C# .NET Rx- Where is System.Reactive?

c#,.net,system.reactive,reactive-programming
I have an intensive Java background so forgive me if I'm overlooking something obvious in C#, but my research is getting me nowhere. I am trying to use the reactive Rx .NET library. The compiler is not complaining about the IObservable but it is with the call to the zip...

IObservable - Ignore new elements for a span of time

c#,system.reactive
I'm trying to "throttle" an IObservable in (what I think is) a different way of the standard throttle methods. I want to ignore values for 1s following a first non ignored value in the stream. For example, if 1s=5 dashes source: --1-23--45-----678901234 result: --1-----4------6----1--- Any ideas on how to achieve...

Reactive pipeline - how to control parallelism?

c#,.net,parallel-processing,system.reactive
I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture: The way it is currently working: Pipeline is fetching items from the provider as...

RxJava async subscription

java,multithreading,asynchronous,system.reactive,rx-java
I have a list of tasks which should be handled one-by-one in a new thread and then the result should be displayed in a method by some main thread. However this doesn't seem to work, the flatMap method is invoked in the main thread. Why does not the subscribeOn method...

ReactiveUI ToProperty exceptions

c#,system.reactive,reactiveui
We have been trying to get ToProperty to work correctly but are experiencing exceptions which aren't being caught by ThrownExceptions. Our test view model looks like: public class ViewModel : ReactiveObject, ISupportsActivation { private readonly ViewModelActivator _activator = new ViewModelActivator(); public ViewModelActivator Activator { get { return _activator; } }...

Learning Rx: How can I parse an observable sequence of characters into an observable sequence of strings?

c#,system.reactive
This is probably really simple but I'm at the bottom of the learning curve with Rx. I've spent several hours reading articles, watching videos and writing code but I seem to have a mental block on something that seems like it should be really simple. I'm gathering data from a...

Observing items being added to a list in RX

.net,system.reactive
I am trying to get a simple demo working. I have a collection of strings and I want to watch it for additions, without using any control event code. Somehow I've gotten the impression, perhaps wrongly, that Rx or another part of .Net supports this, without resorting to wiring up...

Stopping all subsequent calls to subscribe in reactive extensions

c#,system.reactive,reactive-programming
I am replacing some of my previously written code with reactive extensions. Previously I was using File.ReadAllLines(FileName) and then loop through all the lines of files and in this loop at some point, I break from the loop and perform some operation on the processed records(which now I want to...

How can I pass multiple errors down a ReplaySubject?

c#,error-handling,system.reactive
How can I pass multiple errors down a ReplaySubject? When I call OnError only the first exception is passed. I need to call on multiple times and have all errors/exceptions passed. I see internally RX creates a AnonymousSafeObserver and OnError is calling the Dispose. Can i create my own version...

Using exception as a data transfer object in reactive extensions

.net,system.reactive
In Rx there are three main methods: OnNext<T>, OnError and OnComplete, and only OnNext is supposed to pass data. I have time stamped data points and data flow where there are normal values that are passed via OnNext and account for more than 99% of data points. But then, some...

How can I make an IObservable from a queue, so that the sequence doesn't end when the queue is empty?

c#,system.reactive,reactive-programming
I'm working on something that uses Reactive Extensions for .NET (Rx) and I want to have a sequence that takes its input from a queue (or similar). I've tried doing this: static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>(); //... var observableTransactionSource = TransactionQueue.ToObservable(); //... observableTransactionSource.Subscribe(transactionObserver); It works up to a...

Restoring data 1 minute ago for time-shifted sequence

system.reactive,reactive-programming
This class accumulates values + knows at current moment the difference between current sum and sum 1 minute ago. Its client uses it in such way: adds new value for every incoming data chunk and gets the difference. Now, there's a problem with restoring its state. Suppose application gets recycled,...

C# Rx - Take N values from observable on each interval

c#,system.reactive
I have an observable which streams a value for each ms. , this is done every 250 ms. ( meaning 250 values in 250 ms (give or take) ). Mock sample code : IObservable<IEnumerable<int>> input = from _ in Observable.Interval(TimeSpan.FromMilliseconds(250)) select CreateSamples(250); input.Subscribe(values => { for (int i = 0;...

ReactiveX onComplete triggered before the last onNotify

c#,system.reactive
Consider the following pipeline: Buffer items into packs Observe these packs in a threadpool thread Do some asynchronous processing over these packs If the process has finished, setting the source observable to complete will cause the buffer to emit the current pack as-is. However the processing part will get the...

Executing TPL code in a reactive pipeline and controlling execution via test scheduler

c#,.net,task-parallel-library,system.reactive
I'm struggling to get my head around why the following test does not work: [Fact] public void repro() { var scheduler = new TestScheduler(); var count = 0; // this observable is a simplification of the system under test // I've just included it directly in the test for clarity...

Timeshift an RX sequence into a batch after a quiet period?

c#,.net,system.reactive
I need my system to send an email summarising things that have happened after a period of inactivity. How can I do this with RX?

Creating an Observable around sets of continously changing depended observables

c#,system.reactive
The code snippet below is my attempt at creating the following functionality: Create an observable sequence that subscribes to a collection of subjects When one of the subjects in the collection produces a value, the sequence ends, invokes a method that returns a new set of subjects and restarts at...

C# .NET - Reactive extensions error

c#,system.reactive
I've a C# 4.0 class library projet in which I've referenced Reactive extensions dlls(2.2.5 version). I'm getting compilation errors as mentioned below.Could you advise please? C# code: var observable = System.Reactive.Linq.Observable.Empty<bool>(); foreach (var modelParam in modelParams) observable = observable.Merge(modelParam.ObservePropertyChanged(p => p.IsDirty).Where(p => p)); *Error 522 'System.IObservable' does not contain a...

Close unmanaged resources when Subscription end in Reactive Extensions

c#,system.reactive
I'm writing data to the network from Rx. Naturally I use Finally to close my stream when the subscription end. This works cleanly both on OnError() and OnComplete(). Rx will run OnNext() ... OnNext(), OnComplete(), Finally() in sequence. However, sometimes I want to terminate the sequence early, to do so...

Techniques for testing code that uses multiple schedulers

c#,.net,unit-testing,system.reactive
When a SUT depends on multiple schedulers, what is the best way to keep the test code succinct and focussed? That is, avoid spurious calls to advance multiple different schedulers. Until now, my technique has been to define an application-level service that provides schedulers: public interface ISchedulerService { IScheduler DefaultScheduler...

What are the possible reasons the finally of an async { try… finally…} is not being called? Rx Involved

asynchronous,f#,system.reactive,cancellation
I have something like this: let a = async { try do! Async.AwaitTask someTask finally // clean up } Async.Start (a, cancellationTokenSource.Token) When the task being awaited in a finishes, the finally block executes and the clean up is done, but when async a gets cancelled because cancellationTokenSource gets Cancel...

How to improve this Rx FolderWatcher? [closed]

c#,system.reactive,filesystemwatcher
I started my sample from the one provided here:http://www.jaylee.org/post/2012/08/26/An-update-to-matthieumezil-Rx-and-the-FileSystemWatcher.aspx but the issue is that if you are watching a folder that has many files being modified/created/deleted at all time, it will never return an event since the throttle will never stop. What I needed is to create a new temporary...

RXJava - Split and Combine an Observable

android,system.reactive,rx-java
I am new to RxJava and need some help/guidance on how to do the following: I need to get two values from an Observable a String a List<.ObjectA> I then need to apply two different filters() on this list and then finally combine all of them(String, FilteredListA, FilteredListB) into a...

Cold observable collection with completion

c#,asynchronous,observablecollection,system.reactive
So maybe I am getting this observer pattern all wrong, but this is what I want. I have a method supposed to get some integers from a database. Looking like this: IObservable<int> GetInts() Each time he is done getting an integer, he should tell its subscribers via OnNext(), naturally. However,...

WebClient Timeout takes longer than expected (Using: Rx, Try Catch, Task)

c#,timeout,task,webclient,system.reactive
Problem: I inherited WebClient in ExtendedWebClient where I override the WebRequest's timeout property in the GetWebRequest method. If I set it to 100ms, or even 20ms, it always takes up to more than 30 seconds at least. Sometimes it seems to not get through at all. Also, when the service...

Rx Example not working

winforms,system.reactive
I'm trying to follow along with Jonathan Worthington's airport announcement example in An Event-driven and Reactive Future It compiles. The problem: SayGateChange is never called. I'm new to Rx. I must be leaving something out. What I have here is his code as exactly as I could transcribe it. Sadly,...

Threading argument through observable sequence

c#,system.reactive,reactiveui
I'm working with reactive ui and I've run into a problem, essentially in my login method I want to show a progress dialog, attempt the login and then close the dialog and return the final result. The basic pseudocode of the problem is this: private IObservable<bool> AttemptLogin(CredentialPair pair) { return...

RxJs — replay all events after each spurt of events

system.reactive,rxjs
How do you do it? RxJs is still a mystery to me. I was trying stuff like: filterChanges .delay(400) .replay() .reduce(function(acc,x) { return acc.concat(x) }, []) .subscribe(function(changes) { console.log(changes); ... Or filterChanges.subscribe(function() { filterChanges.aggregate(function(changes) { ... I'm really pretty lost here. The behavior I want is: certain user actions result...

Using RX (Reactive Extensions) to create 20 events with 30 millisecond delay

c#,unity3d,system.reactive
I want to make a quick burst fire code that would call me a function 20 times, at 30 millisecond intervals. This is what I've tried so far: Observable.Repeat(20).Delay(TimeSpan.FromMilliseconds(30)).Subscribe() That seemed like the logical solution to me, but my app is crashing, I assume its because it is not stopping...

Filter IObservable with regex and return matched value

c#,system.reactive,reactiveui
I have a IObservable<string> that I want to transform into an IObservable<int> through a regex, that I can assign to a property through the ToProperty helper. public class MyViewModel : ReactiveObject { public MyViewModel { var o = Observable.Create( .... ); o.Something(s => .... ) .ToProperty(this, x => x.Value, out...

buffer while processing items

c#,.net,system.reactive,reactive-programming
I have an event that fires regularly. Let's assume that processing the event takes ~1s. Instead of waiting 1s for each received event I want to accumulate events until the last processing is done. When processing is done I want to proces the event data I received during the last...

Rx extensions Parallel.ForEach throttling

c#,parallel-processing,system.reactive,reactiveui
I'm following the answer to this question: Rx extensions: Where is Parallel.ForEach? in order to run a number of operations in parallel using Rx. The problem I'm running into is that it seems to be allocating a new thread for every request, whereas using Parallel.ForEach did considerably fewer. The processes...

Implementing data binding in a MVVM framework

c#,mvvm,data-binding,system.reactive
I'm developing a MVVM framework (C#) and I'm looking for information on how to implement data binding / synchronization (one-way, two-way, multi-binding, etc.). Also, is there any other frameworks I should consider utilizing, e.g. Reactive Extensions?

Why is System.Reactive.Windows.Threading loading?

c#,asp.net,system.reactive
I was trying to get a library that required System.Threading.Dll to work with my .NET 3.5 web application project. Since 3.5 doesn't have System.Threading.Dll, I followed a tip to install the Reactive extensions because it has a backported version of System.Threading for 3.5. The attempt still failed and after enough...

Could Reactive Extension capture/detect consecutive cases?

c#,system.reactive
Use this, Rx generates a series of random numbers between 0 and 99. var R = new Random(); var ints = Observable.Interval(TimeSpan.FromSeconds(1)); var RandomNos = ints.Select(i=> R.Next(100)); // was new Random().Next(100) RandomNos.Subscribe(r=> Console.Write(r+ ",")); 1,75,49,23,97,71,45,19,93,66,40,14,88,62,36,10,84 I want to capture/detect when I get 6 more-than-50 numbers in a row. Can Rx...

Observable.Window and .Zip not functioning like I would expect

c#,system.reactive
I'm trying to turn an IEnumerable into an IObservable that delivers its items in chunks one second apart. var spartans = Enumerable.Range(0, 300).ToObservable(); spartans .Window(30) .Zip(Observable.Timer(DateTimeOffset.Now, TimeSpan.FromMilliseconds(1000)), (x, _) => x) .SelectMany(w => w) .Subscribe( n => Console.WriteLine("{0}", n), () => Console.WriteLine("all end")); With this code, the only thing that...

I have a chain of Rx.Subjects (A->B->C->A), but the final step is not working

javascript,system.reactive,reactive-programming,rxjs,reactive-extensions-js
Live example. I'm completely new to Rx*. I'm trying to create a reactive version of MVC using RxJS for my thesis. It's loosely based on https://github.com/staltz/mvi-example I probably should've studied RxJS more before starting to code, but I've realised I usually learn the best by just jumping to the deep...

IObservable with NetMQ receive

c#,async-await,system.reactive,zeromq,netmq
I'm trying to write a typical stock trading program, which receives stock tickers/orders/trades from netmq, turn the streams into IObservable, and show them on a WPF frontend. I try to use async/await with NetMQ blocking ReceiveString (suppose I am expecting some string input) so that the ReceiveString loop wouldn't block...

How do I convert a ListView SelectedItem into an IObservable?

c#,xaml,listview,system.reactive,reactive-programming
This may be a remedial question, but my research is spinning me in circles right now (especially as a Java developer) and looking at several different Rx wrapper libraries. All I want is to take a ListView<MusicNote>.SelectedItem and wrap it into an IObservable, emitting a stream of the current single...

Conditional pairing of two streams - Combine If in Reactive Extensions

c#,.net,linq,system.reactive
I am looking to combine the first two sequences as this Marble diagram depicts In this example different lottery players pick numbers, and the numbers they pick are indicated by the color of the small dot on the first line. When winning numbers are decided (line 2) Lottery winners need...

C# Reactive Extensions - Subscribe to stream of aggregation

c#,system.reactive
I Have a stream of points and would like to combine each 2 points in order to draw a line . public class MyPoint { public int X { get; set; } public int Y { get; set; } } I am looking for something that would combine the functionality...

From IObservable to Task

task-parallel-library,system.reactive,reactive-programming,observable
So the case is this. Suppose somewhere I am filling a Collection. Each time an element is added, an IObservable calls OnNext for its subscribers. Now, there will be a point where the collection will be filled. (I was reading something and I finished reading .. whatever). At that point,...

Fsharpx Async.AwaitObservable does not call cancellation continuation

asynchronous,f#,system.reactive,f#-async
I'm trying to use Fsharpx' Async.AwaitObservable inside an async workflow which is started using Async.StartWithContinuations. For some reason, if the cancellation token used to start this workflow is canceled while it is waiting for the observable (but not during other parts of the workflow), the cancellation continuation is never called....

How to do pattern matching in Rx. Where + Select in a single operator?

f#,pattern-matching,system.reactive
Suppose I have this type: type T = int option and an observable of that type: let o : IObservable<T> = // create the observable I'm looking for a better way to express this: o.Where(function | None -> false | Some t -> true) .Select(function | Some t -> t)...

How to use RxJava for file parsing and SQL generation?

java,multithreading,system.reactive,rx-java,rx-android
Most examples of RxJava I see have to do with network calls. I am new to the framework, so I am wondering if using it for something like parallel file parsing makes sense as well. I have a directory of files, whose data I need to parse into SQL tables....

Schedulers in Rxcpp

c++,threadpool,system.reactive,ppl
I'm trying to figure out the scheduling model in the C++ version of Rx. Knowing the C# version where there is a simple interface with one Schedule method; The C++ version seems rather complex, with stuff like scheduler, worker, and coordination. One major missing piece for me is an implementation...

Post messages from async threads to main thread in F#

.net,powershell,f#,system.reactive,f#-async
There is a subscription to an observable that sends out log messages. Some of the log messages come from other threads because they are are in F# async blocks. I need to be able to write out the messages from the main thread. Here is the code that currently filters...

RX - Group/Batch bursts of elements in an observable sequence

c#,system.reactive,observable
I have an observable sequence. When the first element is inserted, I would like to start a timer and batch subsequent inserted elements during the timespan of the timer. Then, the timer wouldn't start again until another element is inserted in the sequence. So something like this: --------|=====timespan====|---------------|=====timespan====|--------------> 1 2...

Rx on WinRT - dispatch on UI thread

c#,windows-runtime,system.reactive
I'm currently just getting started with Rx and finding it very useful so far. However I'm running into issues when creating an observable from an async method and updating the UI with the result. I'm (unsurprisingly) getting RPC_E_WRONG_THREAD errors with the following code: IDisposable service = null; service = Observable.FromAsync(fn).Subscribe(videoColl...

CSLA/ReactiveUI serialization issue

wpf,system.reactive,reactiveui,csla
I'm attempting to use CSLA (latest version) along with ReactiveUI/Reactive Extensions. When creating the WPF bindings using reactiveUI (Bind/OneWayBind) and using the ToProperty methods, it appears that the underlying logic uses the Rx Observable.FromEventPattern to hook into the model's PropertyChanged events in CSLA's BindableBase. The problem that is occurring is...

Is it the best to implement ObservableBase in this situation or is there another way?

c#,.net,system.reactive,observable
First of all, I didn't find a good example of custom implementation of the ObservableBase or AnonymousObservable. I have no idea which one I need to implement in my case if any. The situation is this. I use a third-party library and there is a class let's call it Producer...

Creating a hot observable and adding things to it

system.reactive
I am trying to create a hot observable where I can add stuff to it. Here's an outline of the basic class public class MyObservable { public IObservable<string> Stream; public MyObservable() { Observable.Create...? } public void AddString(string eventDescription) { //Add to Stream } } Somewhere else in the code I...

How to get latest changed events of IObservable>?

c#,.net,system.reactive,reactive-programming,rx.net
My system has a lot of status objects - connections status, cpu load, logged users and so on. All of such events are merged into a single observable stream. I want to make a admin utility to show actual status of the system and to show all of that counters....

Is there something like ThrottleOrMax in rx?

c#,events,system.reactive,throttling
Use case: I'm writing a thing that monitors changes and saves automatically. I want to Throttle so that I don't save more often than every five seconds. I want to save every 30 seconds if there is a continuous stream of changes. Could not find observable.Throttle(mergeTime, maxTime) in the docs...

Convert IEnumerable to IObservable with variable Period

c#,unit-testing,system.reactive
I am consuming 3-axis accelerometer data using Rx. I need to set up some unit tests. The data frames come in fast, with the median timespan between frames being 80ms, but on occasion it comes in at 120ms. Also, it is never exactly 80ms, but hovers in that range. So...

Will calling subject.OnCompleted() tidy up/ call dispose/not leak memory or do I have to call dispose on the IDisposable myself?

c#,system.reactive
Rx question please, will calling subject.OnCompleted() tidy up/ call dispose /not leak memory or do I have to call dispose on the return IDisposable myself? Basically I'm making a server request/response and want to know if the client code needs to call replaySubject.Dispose() in the example below. Thanks in advance....

How to await on IObservable of IObservables?

c#,async-await,system.reactive
I have an IObservable of objects, which I want to convert to dictionary of lists asynchronously. Here is my code, where GetSource returns an IObservable: await GetSource(...) .GroupBy(o => o.SiteId) .ToDictionary(g => g.Key, g => g.ToList()) Of course, it is wrong, because the result is IDictionary<int, IObservable<IList<T>>>, but I need...

How can i clear the buffer on a ReplaySubject?

c#,system.reactive
how can I clear the buffer on a ReplaySubject? Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory. Ideally i want to keep the same ReplaySubject as the client subscriptions are...

Publishing to multiple subscribes in RX

c#,.net,system.reactive
I am investigating how to develop a plugin framework for a project and Rx seems like a good fit for what i am trying to achieve. Ultimately, the project will be a set of plugins (modular functionality) that can be configured via xml to do different things. The requirements are...

Transform Observable if other Observables emmited mapping function

system.reactive
I'm creating a game in which there's an observable stream of events X representing products delivered by a manufacture. There are also some kind of external events (let's call them Transformers) that affect the performance of the manufacture in various ways and for various time periods. I want to represent...

Why does Rx buffer continuously perform method when buffer contains no items?

c#,.net,system.reactive
I have a Rx Observable that acts as a buffer. Right now it performs the method in Subscribe either when it gets 10 items, or after 100 milliseconds, whichever comes first. I noticed that my method is continuously being called every 100 ms, even when there are no items in...

How can you implement ZipLongest in Rx?

c#,system.reactive,observable
I'm trying to Zip two observable sequences of different lengths but I want the combined sequence to have the length of the largest sequence, padded with the last value of the shortest sequence. That is, if sequence 1 is [0,1,2] and sequence 2 is [0,1,2,3], I would like the result...

Execute OnNext in parallel but sync subscription with UI thread

c#,.net,winforms,system.reactive,reactive-programming
given a Subject like this: var input = new Subject<int>(); and subscribers like this: var observer1 = input .Subscribe(ev => { Thread.Sleep(1000); listBox.Items.Add("o1: " + ev.ToString()); }); var observer2 = input .Subscribe(ev => { Thread.Sleep(1000); listBox.Items.Add("o2: " + ev.ToString()); }); how can I tweak it so that when I do...

Reactive Extensions Buffer Executes EveryTime irrespective of TimeSpan or count

c#,wpf,system.reactive,reactive-programming
I am creating and Search Text and want my application to perform search every 2 seconds or when there are at least 3 characters in the textbox. I am trying to use Reactive Extension's buffer or throttle to achieve the same. But I might not be clear about how these...

Switching streams in RX: Sodium's equivalent of merge and switch in RX

c#,system.reactive,reactive-programming,sodium
How can the television channel problem as explained in this talk at 31th minute be solved by RX ? The problem expressed in Rx is as follows: The are two television channels (channel1 and channel2) which transmit a stream of images, plus a stream of fuzz which represents no channel...

How do I create an Rx observable that gets an immediate value and then samples?

c#,.net,system.reactive,observable
I want to use Sample to reduce the frequency of items coming out of my observable, but I want to immediately see the first event go through without being held up for the sample duration. After that I want the Sample to only give me an item on the sample...

Reactive extensions: Wrap custom delegate event

c#,system.reactive
How do I use Observable.FromEvent to wrap these custom delegates in Rx? public delegate void EmptyDelegate(); public delegate void CustomDelegate( Stream stream, Dictionary<int, object> values ); ...

How to prevent Scan from running multiple times?

.net,system.reactive,reactive-programming
For example var subject = new Subject<int>(); var test = subject.Scan(0, (x, y) => { Console.WriteLine("scan"); return x + 1; }); test.Subscribe(x => Console.WriteLine("subscribe1")); //test.Subscribe(x => Console.WriteLine("subscribe2")); Observable.Range(0, 1).Subscribe(subject); Console.WriteLine("done"); Console.Read(); The output is scan subscribe1 done But if you uncomment second Subscribe the output is scan subscribe1 scan subscribe2...

Force lambda to re-evaluate as part of Rx sequence

c#,lambda,system.reactive
I am trying to generate progress messages and publish them via an IObservable. Func<JobProgressMessage> nextMsg = () => ProgressManager.InProgressMessage("progressing"); var o = Observable .Return(nextMsg()) .Repeat() .Timeout(TimeSpan.FromSeconds(2)) .Retry(100) .Finally(() => job.AddMessage<ProgressCompleted>(ProgressManager.CompletedMessage("Completed"))); I am finding that the nextMsg() gets evaluated just the once and the same message gets published. I was hoping...

How to Make a Custom Extension for Reactive Extensions

c#,.net,extension-methods,system.reactive
It is not hard to find an example of how to make a custom LINQ extension method. But I can't find an example of how to make a custom Rx extension method. Can someone point me to a resource or post an example? I'm using the latest (ver 2.2.5). I'm...

How can I modify an IObservable such that I collect characters until there have been no characters for a period of time?

c#,system.reactive,reactive-programming
I would like to write an Rx query that takes an IObvservable<char> and produces an IObservable<string>. The strings should be buffered until there have been no characters produced for a specified time. The data source is a serial port from which I have captured the DataReceived event and from that...