Skip to content

ChatCompletion

OpenAIChatCompletion

Bases: _BaseOpenAI, ChatCompletionModel

OpenAI Chat Completion.

Source code in src/msgflux/models/providers/openai.py
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
@register_model
class OpenAIChatCompletion(_BaseOpenAI, ChatCompletionModel):
    """OpenAI Chat Completion."""

    def __init__(  # noqa: C901
        self,
        model_id: str,
        *,
        max_tokens: Optional[int] = None,
        reasoning_effort: Optional[str] = None,
        prompt_cache_retention: Optional[Literal["in_memory", "24h"]] = None,
        enable_thinking: Optional[bool] = None,
        return_reasoning: Optional[bool] = True,
        reasoning_in_tool_call: Optional[bool] = True,
        validate_typed_parser_output: Optional[bool] = False,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        stop: Optional[Union[str, List[str]]] = None,
        logprobs: Optional[bool] = None,
        top_logprobs: Optional[int] = None,
        parallel_tool_calls: Optional[bool] = True,
        modalities: Optional[List[str]] = None,
        audio: Optional[Dict[str, str]] = None,
        verbosity: Optional[str] = None,
        web_search_options: Optional[Dict[str, Any]] = None,
        extra_body: Optional[Dict[str, Any]] = None,
        verbose: Optional[bool] = False,
        base_url: Optional[str] = None,
        context_length: Optional[int] = None,
        reasoning_max_tokens: Optional[int] = None,
        enable_cache: Optional[bool] = False,
        cache_size: Optional[int] = 128,
        retry: Optional[Any] = None,
    ):
        """Args:
        model_id:
            Model ID in provider.
        max_tokens:
            An upper bound for the number of tokens that can be
            generated for a completion, including visible output
            tokens and reasoning tokens.
        reasoning_effort:
            Constrains effort on reasoning for reasoning models.
            Currently supported values are low, medium, and high.
            Reducing reasoning effort can result in faster responses
            and fewer tokens used on reasoning in a response.
            Can be: "minimal", "low", "medium" or "high".
        prompt_cache_retention:
            OpenAI-only prompt cache retention policy.
            Allowed values are "in_memory" and "24h".
        enable_thinking:
            If True, enable the model reasoning.
        return_reasoning:
            If the model returns the `reasoning` field it will be added
            along with the response.
        reasoning_in_tool_call:
            If True, maintains the reasoning for using the tool call.
        validate_typed_parser_output:
            If True, use the generation_schema to validate typed parser output.
        temperature:
            What sampling temperature to use, between 0 and 2.
            Higher values like 0.8 will make the output more random,
            while lower values like 0.2 will make it more focused and
            deterministic.
        stop:
            Up to 4 sequences where the API will stop generating further
            tokens. The returned text will not contain the stop sequence.
        top_p:
            An alternative to sampling with temperature, called nucleus
            sampling, where the model considers the results of the tokens
            with top_p probability mass. So 0.1 means only the tokens
            comprising the top 10% probability mass are considered.
        logprobs:
            Token log probability output. When enabled, the response
            metadata includes the token-level logprob payload.
        top_logprobs:
            Number of alternative tokens to return per generated token.
            Use with `logprobs=True`.
        parallel_tool_calls:
            If True, enable parallel tool calls.
        modalities:
            Types of output you would like the model to generate.
            Can be: ["text"], ["audio"] or ["text", "audio"].
        audio:
            Audio configurations. Define voice and output format.
        verbosity:
            Constrains the verbosity of the model's response. Lower
            values will result in more concise responses, while higher
            values will result in more verbose responses. Currently
            supported values are low, medium, and high.
        web_search_options:
            This tool searches the web for relevant results to use in a response.
            OpenAI and OpenRouter only.
        extra_body:
            Provider-specific request body extensions forwarded to
            OpenAI-compatible clients.
        verbose:
            If True, Prints the model output to the console before it is transformed
            into typed structured output.
        base_url:
            URL to model provider.
        context_length:
            The maximum context length supported by the model.
        reasoning_max_tokens:
            OpenRouter-only maximum number of tokens for reasoning/thinking.
            This maps to ``extra_body={"reasoning": {"max_tokens": ...}}``
            and cannot be combined with ``reasoning_effort``.
        enable_cache:
            If True, enable response caching to avoid redundant API calls.
        cache_size:
            Maximum number of cached responses (default: 128).
        """
        super().__init__()
        self.model_id = model_id
        self.context_length = context_length
        self.reasoning_max_tokens = reasoning_max_tokens
        self.enable_cache = enable_cache
        self.cache_size = cache_size
        self.sampling_params = {"base_url": base_url or self._get_base_url()}
        sampling_run_params = {"max_tokens": max_tokens}
        if temperature:
            sampling_run_params["temperature"] = temperature
        if top_p:
            sampling_run_params["top_p"] = top_p
        if stop:
            sampling_run_params["stop"] = stop
        if self.provider == "openai" and logprobs is not None:
            sampling_run_params["logprobs"] = logprobs
        if self.provider == "openai" and top_logprobs is not None:
            sampling_run_params["top_logprobs"] = top_logprobs
        if verbosity:
            sampling_run_params["verbosity"] = verbosity
        if modalities:
            sampling_run_params["modalities"] = modalities
        if web_search_options:
            sampling_run_params["web_search_options"] = web_search_options
        if extra_body is not None:
            sampling_run_params["extra_body"] = dict(extra_body)
        if audio:
            sampling_run_params["audio"] = audio
        if reasoning_effort:
            sampling_run_params["reasoning_effort"] = reasoning_effort
        if self.provider == "openrouter" and reasoning_max_tokens is not None:
            if reasoning_effort is not None:
                raise ValueError(
                    "`reasoning_max_tokens` cannot be used together with "
                    "`reasoning_effort` for OpenRouter."
                )
            sampling_run_params["reasoning_max_tokens"] = reasoning_max_tokens
        if self.provider == "openai" and prompt_cache_retention is not None:
            sampling_run_params["prompt_cache_retention"] = prompt_cache_retention
        self.sampling_run_params = sampling_run_params
        self.enable_thinking = enable_thinking
        self.parallel_tool_calls = parallel_tool_calls
        self.reasoning_in_tool_call = reasoning_in_tool_call
        self.validate_typed_parser_output = validate_typed_parser_output
        self.return_reasoning = return_reasoning
        self.verbose = verbose
        self.retry = retry
        self._initialize()
        self._get_api_key()

    def _adapt_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        params.pop("provider_tools", None)
        if self.provider == "openai":
            max_tokens = params.pop("max_tokens", None)
            if max_tokens is not None:
                params["max_completion_tokens"] = max_tokens
        return params

    def _build_usage_metadata(self, model_output) -> dotdict:
        metadata = dotdict()
        usage = self._serialize_openai_value(getattr(model_output, "usage", None))
        if usage is not None:
            metadata.usage = usage
        return metadata

    @staticmethod
    def _serialize_openai_value(value):
        if value is None:
            return None
        if hasattr(value, "to_dict"):
            return value.to_dict()
        if hasattr(value, "model_dump"):
            return value.model_dump()
        if isinstance(value, Mapping):
            return dict(value)
        return value

    def _set_stop_metadata(
        self,
        metadata: dotdict,
        *,
        finish_reason: Optional[str] = None,
        stop_reason: Optional[str] = None,
    ) -> None:
        if finish_reason is None and stop_reason is not None:
            finish_reason = stop_reason
        if stop_reason is None and finish_reason is not None:
            stop_reason = finish_reason
        if finish_reason is not None:
            metadata.finish_reason = finish_reason
        if stop_reason is not None:
            metadata.stop_reason = stop_reason

    @staticmethod
    def _extract_reasoning(message) -> Optional[str]:
        return (
            getattr(message, "reasoning_content", None)
            or getattr(message, "reasoning", None)
            or getattr(message, "thinking", None)
        )

    @staticmethod
    def _extract_finish_reason(choice) -> Optional[str]:
        return getattr(choice, "finish_reason", None)

    @classmethod
    def _extract_annotations(cls, message) -> Optional[list]:
        annotations = getattr(message, "annotations", None)
        if annotations:
            return [cls._serialize_openai_value(item) for item in annotations]
        return None

    @staticmethod
    def _extract_logprobs(choice):
        return OpenAIChatCompletion._serialize_openai_value(
            getattr(choice, "logprobs", None)
        )

    def _build_completion_metadata(self, model_output, choice, message=None) -> dotdict:
        metadata = self._build_usage_metadata(model_output)
        self._set_stop_metadata(
            metadata, finish_reason=self._extract_finish_reason(choice)
        )
        if message is None:
            message = getattr(choice, "message", None)
        annotations = self._extract_annotations(message)
        if annotations:
            metadata.annotations = annotations
        logprobs = self._extract_logprobs(choice)
        if logprobs is not None:
            metadata.logprobs = logprobs
        return metadata

    @staticmethod
    def _merge_logprobs_metadata(metadata: dotdict, logprobs) -> None:
        if logprobs is None:
            return
        existing = metadata.get("logprobs")
        if existing is None:
            metadata.logprobs = logprobs
            return
        if isinstance(existing, Mapping) and isinstance(logprobs, Mapping):
            existing_content = existing.get("content")
            new_content = logprobs.get("content")
            if isinstance(existing_content, list) and isinstance(new_content, list):
                existing_content.extend(new_content)
                return
        metadata.logprobs = logprobs

    @staticmethod
    def _process_stream_tool_calls(delta, stream_response, aggregator):
        tool_call = delta.tool_calls[0]
        if stream_response.response_type is None:
            stream_response.set_response_type("tool_call")
        aggregator.process(
            tool_call.index,
            tool_call.id,
            tool_call.function.name,
            tool_call.function.arguments,
        )

    @staticmethod
    def _stream_add_chunk(stream_response, chunk, response_type):
        if stream_response.response_type is None:
            stream_response.set_response_type(response_type)
        stream_response.add(chunk)

    @staticmethod
    def _stream_add_reasoning_chunk(stream_response, chunk):
        stream_response.add_reasoning(chunk)

    def _execute_model(self, **kwargs):
        prefilling = kwargs.get("prefilling")
        params = {**self.sampling_run_params, **kwargs}
        params.pop("prefilling", None)
        if prefilling:
            params["messages"] = [
                *params["messages"],
                {"role": "assistant", "content": prefilling},
            ]
        adapted_params = self._adapt_params(params)
        model_output = self.client.chat.completions.create(**adapted_params)

        return model_output

    async def _aexecute_model(self, **kwargs):
        prefilling = kwargs.get("prefilling")
        params = {**self.sampling_run_params, **kwargs}
        params.pop("prefilling", None)
        if prefilling:
            params["messages"] = [
                *params["messages"],
                {"role": "assistant", "content": prefilling},
            ]
        adapted_params = self._adapt_params(params)
        model_output = await self.aclient.chat.completions.create(**adapted_params)

        return model_output

    def _process_completion_model_output(  # noqa: C901
        self,
        model_output,
        typed_parser=None,
        generation_schema=None,
        transport_generation_schema=None,
    ):
        """Build a ModelResponse from the raw OpenAI completion output.

        `generation_schema` is the canonical msgflux schema exposed to callers.
        `transport_generation_schema` is an OpenAI-specific wire schema used when
        the canonical schema must be lowered to satisfy Structured Outputs
        constraints, for example lowering ``Dict[K, V]`` to an ``entries`` list.
        """
        response = ModelResponse()
        choice = model_output.choices[0]
        metadata = self._build_completion_metadata(model_output, choice)

        reasoning = self._extract_reasoning(choice.message)

        reasoning_tool_call = reasoning if self.reasoning_in_tool_call else None

        reasoning_content = None
        if self.return_reasoning is True and reasoning is not None:
            reasoning_content = reasoning

        annotations = self._extract_annotations(choice.message)
        if annotations:
            metadata.annotations = annotations

        if choice.message.tool_calls:
            aggregator = ToolCallAggregator(reasoning_tool_call)
            response.set_response_type("tool_call")
            for call_index, tool_call in enumerate(choice.message.tool_calls):
                tool_id = tool_call.id
                name = tool_call.function.name
                arguments = tool_call.function.arguments
                aggregator.process(call_index, tool_id, name, arguments)
            response_content = aggregator
        elif choice.message.content:
            if (typed_parser or generation_schema) and self.verbose:
                repr_str = f"[{self.model_id}][raw_response] {choice.message.content}"
                cprint(repr_str, lc="r", ls="b")
            if typed_parser is not None:
                response.set_response_type("structured")
                parser = typed_parser_registry[typed_parser]
                response_content = dotdict(parser.decode(choice.message.content))
                if generation_schema and self.validate_typed_parser_output:
                    decoder = self._get_decoder(generation_schema)
                    decoder.decode(self._encoder.encode(response_content))
            elif generation_schema is not None:
                response.set_response_type("structured")
                # The raw payload follows the OpenAI transport schema, which may be
                # a lowered or dynamically generated version of the logical msgflux
                # generation schema.
                transport_info = transport_generation_schema or {}
                decoder_schema = transport_info.get("decoder_schema", generation_schema)
                normalize = transport_info.get("normalize")

                if decoder_schema is None:
                    response_content = msgspec.json.decode(choice.message.content)
                else:
                    decoder = self._get_decoder(decoder_schema)
                    struct = decoder.decode(choice.message.content)
                    response_content = struct_to_dict(struct)

                if normalize is not None:
                    response_content = normalize(response_content)

                decoder = self._get_decoder(generation_schema)
                struct = decoder.decode(self._encoder.encode(response_content))
                response_content = dotdict(struct_to_dict(struct))
            else:
                response.set_response_type("text_generation")
                response_content = choice.message.content
        elif choice.message.audio:
            response_content = dotdict(
                {
                    "id": choice.message.audio.id,
                    "audio": base64.b64decode(choice.message.audio.data),
                }
            )
            if choice.message.audio.transcript:
                response.set_response_type("audio_text_generation")
                response_content.text = choice.message.audio.transcript
            else:
                response.set_response_type("audio_generation")
        else:
            response.set_response_type("text_generation")
            response_content = ""

        response.reasoning = reasoning_content
        response.add(response_content)
        response.set_metadata(metadata)
        return response

    def _process_model_output(
        self,
        model_output,
        typed_parser=None,
        generation_schema=None,
        transport_generation_schema=None,
    ):
        return self._process_completion_model_output(
            model_output,
            typed_parser,
            generation_schema,
            transport_generation_schema,
        )

    def _check_cache(self, **kwargs):
        if self.enable_cache and self._response_cache:
            cache_key = generate_cache_key(**kwargs)
            hit, cached_response = self._response_cache.get(cache_key)
            if hit:
                return cached_response
        return None

    def _store_cache(self, response, **kwargs):
        if self.enable_cache and self._response_cache:
            cache_key = generate_cache_key(**kwargs)
            self._response_cache.set(cache_key, response)

    def _prepare_generate_kwargs(self, kwargs):
        """Prepare generation kwargs and derive the OpenAI transport schema.

        `generation_schema` remains the canonical schema for msgflux.
        `transport_generation_schema` is the schema sent to OpenAI in
        `response_format`; it may be the same type or a lowered variant that only
        exists to satisfy OpenAI Structured Outputs restrictions.
        """
        typed_parser = kwargs.pop("typed_parser", None)
        generation_schema = kwargs.pop("generation_schema", None)
        tool_definitions = kwargs.pop("tool_definitions", None)
        transport_generation_schema = None

        if generation_schema is not None and typed_parser is None:
            if issubclass(generation_schema, ToolFlowControl):
                response_format = generation_schema.build_provider_response_format(
                    tool_definitions
                )
                if response_format is not None:
                    transport_generation_schema = {
                        "decoder_schema": None,
                        "normalize": lambda payload: (
                            generation_schema.normalize_provider_response(
                                payload,
                                tool_definitions=tool_definitions,
                            )
                        ),
                    }
                    kwargs["response_format"] = response_format

            if transport_generation_schema is None:
                # Lower only for the OpenAI transport layer; the logical schema
                # stays unchanged so decoded outputs can be restored to the
                # original shape.
                decoder_schema = lower_msgspec_struct_for_openai(generation_schema)
                normalize = None
                if decoder_schema is not generation_schema:
                    normalize = partial(
                        restore_openai_structured_output,
                        logical_type=generation_schema,
                    )
                transport_generation_schema = {
                    "decoder_schema": decoder_schema,
                    "normalize": normalize,
                }
                kwargs["response_format"] = response_format_from_msgspec_struct(
                    decoder_schema
                )

        return typed_parser, generation_schema, transport_generation_schema

    def _prepare_stream_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
        """Strip internal generation-only args before raw streaming requests."""
        kwargs.pop("typed_parser", None)
        kwargs.pop("generation_schema", None)
        kwargs.pop("tool_definitions", None)
        return kwargs

    def _generate(self, **kwargs: Mapping[str, Any]) -> ModelResponse:
        cached = self._check_cache(**kwargs)
        if cached:
            return cached

        (
            typed_parser,
            generation_schema,
            transport_generation_schema,
        ) = self._prepare_generate_kwargs(kwargs)

        model_output = self._execute_model(**kwargs)
        response = self._process_model_output(
            model_output,
            typed_parser,
            generation_schema,
            transport_generation_schema,
        )

        self._store_cache(
            response,
            **kwargs,
            typed_parser=typed_parser,
            generation_schema=generation_schema,
        )
        return response

    async def _agenerate(self, **kwargs: Mapping[str, Any]) -> ModelResponse:
        cached = self._check_cache(**kwargs)
        if cached:
            return cached

        (
            typed_parser,
            generation_schema,
            transport_generation_schema,
        ) = self._prepare_generate_kwargs(kwargs)

        model_output = await self._aexecute_model(**kwargs)
        response = self._process_model_output(
            model_output,
            typed_parser,
            generation_schema,
            transport_generation_schema,
        )

        self._store_cache(
            response,
            **kwargs,
            typed_parser=typed_parser,
            generation_schema=generation_schema,
        )
        return response

    def _stream_generate(  # noqa: C901
        self, **kwargs: Mapping[str, Any]
    ) -> ModelStreamResponse:
        stream_response = kwargs.pop("stream_response")
        metadata = dotdict()
        reasoning_tool_call = ""
        reasoning_accumulated = ""

        try:
            aggregator = ToolCallAggregator()
            model_output = self._execute_model(**kwargs)
            finish_reason = None

            for chunk in model_output:
                if chunk.choices:
                    choice = chunk.choices[0]
                    delta = choice.delta
                    fr = self._extract_finish_reason(choice)
                    if fr is not None:
                        finish_reason = fr

                    chunk_metadata = self._build_completion_metadata(
                        chunk,
                        choice,
                        delta,
                    )
                    annotations = getattr(chunk_metadata, "annotations", None)
                    if annotations:
                        metadata.annotations = annotations
                    self._merge_logprobs_metadata(
                        metadata,
                        getattr(chunk_metadata, "logprobs", None),
                    )

                    reasoning_chunk = self._extract_reasoning(delta)

                    if reasoning_chunk:
                        if self.reasoning_in_tool_call:
                            reasoning_tool_call += reasoning_chunk
                        if self.return_reasoning:
                            reasoning_accumulated += reasoning_chunk
                            self._stream_add_reasoning_chunk(
                                stream_response,
                                reasoning_chunk,
                            )
                        continue

                    if getattr(delta, "content", None):
                        self._stream_add_chunk(
                            stream_response,
                            delta.content,
                            "text_generation",
                        )
                        continue

                    if getattr(delta, "tool_calls", None):
                        self._process_stream_tool_calls(
                            delta,
                            stream_response,
                            aggregator,
                        )
                        continue

                elif chunk.usage:
                    usage = self._serialize_openai_value(chunk.usage)
                    if usage is not None:
                        metadata.update(usage)
                        metadata.usage = usage

            if aggregator.tool_calls:
                if reasoning_tool_call:
                    aggregator.reasoning = reasoning_tool_call
                stream_response.data = aggregator
                stream_response.first_chunk_event.set()
            stream_response.reasoning = reasoning_accumulated or None
            self._set_stop_metadata(metadata, finish_reason=finish_reason)
        except Exception as e:
            stream_response.set_error(e)
        finally:
            if not stream_response.first_chunk_event.is_set():
                stream_response.first_chunk_event.set()
            if not stream_response._response_type_event.is_set():
                stream_response._response_type_event.set()
            stream_response.set_metadata(metadata)
            stream_response.add_reasoning(None)
            stream_response.add(None)

    async def _astream_generate(  # noqa: C901
        self, **kwargs: Mapping[str, Any]
    ) -> ModelStreamResponse:
        stream_response = kwargs.pop("stream_response")
        metadata = dotdict()
        reasoning_tool_call = ""
        reasoning_accumulated = ""

        try:
            aggregator = ToolCallAggregator()
            model_output = await self._aexecute_model(**kwargs)
            finish_reason = None

            async for chunk in model_output:
                if chunk.choices:
                    choice = chunk.choices[0]
                    delta = choice.delta
                    fr = self._extract_finish_reason(choice)
                    if fr is not None:
                        finish_reason = fr

                    chunk_metadata = self._build_completion_metadata(
                        chunk,
                        choice,
                        delta,
                    )
                    annotations = getattr(chunk_metadata, "annotations", None)
                    if annotations:
                        metadata.annotations = annotations
                    self._merge_logprobs_metadata(
                        metadata,
                        getattr(chunk_metadata, "logprobs", None),
                    )

                    reasoning_chunk = self._extract_reasoning(delta)

                    if reasoning_chunk:
                        if self.reasoning_in_tool_call:
                            reasoning_tool_call += reasoning_chunk
                        if self.return_reasoning:
                            reasoning_accumulated += reasoning_chunk
                            self._stream_add_reasoning_chunk(
                                stream_response,
                                reasoning_chunk,
                            )
                        continue

                    if getattr(delta, "content", None):
                        self._stream_add_chunk(
                            stream_response,
                            delta.content,
                            "text_generation",
                        )
                        continue

                    if getattr(delta, "tool_calls", None):
                        self._process_stream_tool_calls(
                            delta,
                            stream_response,
                            aggregator,
                        )
                        continue

                elif chunk.usage:
                    usage = self._serialize_openai_value(chunk.usage)
                    if usage is not None:
                        metadata.update(usage)
                        metadata.usage = usage

            if aggregator.tool_calls:
                if reasoning_tool_call:
                    aggregator.reasoning = reasoning_tool_call
                stream_response.data = aggregator
                stream_response.first_chunk_event.set()
            stream_response.reasoning = reasoning_accumulated or None
            self._set_stop_metadata(metadata, finish_reason=finish_reason)
        except Exception as e:
            stream_response.set_error(e)
        finally:
            if not stream_response.first_chunk_event.is_set():
                stream_response.first_chunk_event.set()
            if not stream_response._response_type_event.is_set():
                stream_response._response_type_event.set()
            stream_response.set_metadata(metadata)
            stream_response.add_reasoning(None)
            stream_response.add(None)

    def _build_generation_params(
        self,
        messages: Union[str, List[Dict[str, Any]]],
        system_prompt: Optional[str],
        prefilling: Optional[str],
        tool_definitions: Optional[ToolDefinitions],
        *,
        logprobs: Optional[bool] = None,
        top_logprobs: Optional[int] = None,
    ) -> Dict[str, Any]:
        if isinstance(messages, str):
            messages = [ChatBlock.user(messages)]
        else:
            messages = deepcopy(messages)
        if isinstance(system_prompt, str):
            messages.insert(0, ChatBlock.system(system_prompt))

        tool_choice = tool_definitions.choice if tool_definitions else None
        if isinstance(tool_choice, str):
            if tool_choice not in ["auto", "required", "none"]:
                tool_choice = {
                    "type": "function",
                    "function": {"name": tool_choice},
                }

        generation_params = {
            "messages": messages,
            "prefilling": prefilling,
            "model": self.model_id,
        }

        if logprobs is not None:
            generation_params["logprobs"] = logprobs
        if top_logprobs is not None:
            generation_params["top_logprobs"] = top_logprobs

        if tool_definitions and tool_definitions.schemas:
            generation_params["tools"] = tool_definitions.schemas
            generation_params["tool_choice"] = tool_choice
            generation_params["parallel_tool_calls"] = self.parallel_tool_calls

        return generation_params

    @staticmethod
    def _validate_chat_completion_options(
        *,
        prefilling: Optional[str],
        logprobs: Optional[bool],
        top_logprobs: Optional[int],
        generation_schema: Optional[msgspec.Struct],
        typed_parser: Optional[str],
        stream: Optional[bool],
    ) -> None:
        if prefilling is not None and generation_schema is not None:
            raise ValueError(
                "`prefilling` is not compatible with `generation_schema` in "
                "OpenAI chat completions."
            )
        if top_logprobs is not None and logprobs is not True:
            raise ValueError("`top_logprobs` requires `logprobs=True`")
        if stream is True and typed_parser is not None:
            raise ValueError("`typed_parser` is not `stream=True` compatible")

    def __call__(
        self,
        messages: Union[str, List[Dict[str, Any]]],
        *,
        system_prompt: Optional[str] = None,
        prefilling: Optional[str] = None,
        logprobs: Optional[bool] = None,
        top_logprobs: Optional[int] = None,
        stream: Optional[bool] = False,
        generation_schema: Optional[msgspec.Struct] = None,
        tool_definitions: Optional[ToolDefinitions] = None,
        typed_parser: Optional[str] = None,
    ) -> Union[ModelResponse, ModelStreamResponse]:
        """Args:
            messages:
                Conversation history. Can be simple string or list of messages.
            system_prompt:
                A set of instructions that defines the overarching behavior
                and role of the model across all interactions.
            prefilling:
                Forces an initial message from the model. From that message
                it will continue its response from there.
            logprobs:
                Token log probability output for this request.
            top_logprobs:
                Number of alternative tokens to return per generated token
                for this request.
            stream:
                Whether generation should be in streaming mode.
            generation_schema:
                Schema that defines how the output should be structured.
            tool_definitions:
                Optional container with tool schemas, annotations, and
                tool-choice metadata. This is the single tool-calling entrypoint
                for the provider.
            typed_parser:
                Converts the model raw output into a typed-dict. Supported parser:
                `typed_xml`.

        Raises:
            ValueError:
                Raised if `generation_schema` and `stream=True`.
            ValueError:
                Raised if `typed_xml=True` and `stream=True`.
        """
        self._validate_chat_completion_options(
            prefilling=prefilling,
            logprobs=logprobs,
            top_logprobs=top_logprobs,
            generation_schema=generation_schema,
            typed_parser=typed_parser,
            stream=stream,
        )
        is_flow_control = is_subclass_of(generation_schema, ToolFlowControl)
        generation_params = self._build_generation_params(
            messages,
            system_prompt,
            prefilling,
            None if is_flow_control else tool_definitions,
            logprobs=logprobs,
            top_logprobs=top_logprobs,
        )
        if tool_definitions is not None:
            generation_params["tool_definitions"] = tool_definitions

        if stream is True:
            self._prepare_stream_kwargs(generation_params)
            stream_response = ModelStreamResponse(mode="sync")
            F.spawn(
                self._stream_generate,
                **generation_params,
                stream=stream,
                stream_response=stream_response,
                stream_options={"include_usage": True},
            )
            F.wait_for_event(stream_response.first_chunk_event)
            return stream_response
        else:
            if typed_parser and typed_parser not in typed_parser_registry:
                available = ", ".join(typed_parser_registry.keys())
                raise TypedParserNotFoundError(
                    f"Typed parser `{typed_parser}` not found. "
                    f"Available parsers: {available}"
                )
            response = self._generate(
                **generation_params,
                typed_parser=typed_parser,
                generation_schema=generation_schema,
            )
            return response

    async def acall(
        self,
        messages: Union[str, List[Dict[str, Any]]],
        *,
        system_prompt: Optional[str] = None,
        prefilling: Optional[str] = None,
        logprobs: Optional[bool] = None,
        top_logprobs: Optional[int] = None,
        stream: Optional[bool] = False,
        generation_schema: Optional[msgspec.Struct] = None,
        tool_definitions: Optional[ToolDefinitions] = None,
        typed_parser: Optional[str] = None,
    ) -> Union[ModelResponse, ModelStreamResponse]:
        """Async version of __call__. Args:
            messages:
                Conversation history. Can be simple string or list of messages.
            system_prompt:
                A set of instructions that defines the overarching behavior
                and role of the model across all interactions.
            prefilling:
                Forces an initial message from the model. From that message
                it will continue its response from there.
            logprobs:
                Token log probability output for this request.
            top_logprobs:
                Number of alternative tokens to return per generated token
                for this request.
            stream:
                Whether generation should be in streaming mode.
            generation_schema:
                Schema that defines how the output should be structured.
            tool_definitions:
                Optional container with tool schemas, annotations, and
                tool-choice metadata. This is the single tool-calling entrypoint
                for the provider.
            typed_parser:
                Converts the model raw output into a typed-dict. Supported parser:
                `typed_xml`.

        Raises:
            ValueError:
                Raised if `generation_schema` and `stream=True`.
            ValueError:
                Raised if `typed_xml=True` and `stream=True`.
        """
        self._validate_chat_completion_options(
            prefilling=prefilling,
            logprobs=logprobs,
            top_logprobs=top_logprobs,
            generation_schema=generation_schema,
            typed_parser=typed_parser,
            stream=stream,
        )
        is_flow_control = is_subclass_of(generation_schema, ToolFlowControl)
        generation_params = self._build_generation_params(
            messages,
            system_prompt,
            prefilling,
            None if is_flow_control else tool_definitions,
            logprobs=logprobs,
            top_logprobs=top_logprobs,
        )
        if tool_definitions is not None:
            generation_params["tool_definitions"] = tool_definitions

        if stream is True:
            self._prepare_stream_kwargs(generation_params)
            stream_response = ModelStreamResponse(mode="async")
            await F.aspawn(
                self._astream_generate,
                **generation_params,
                stream=stream,
                stream_response=stream_response,
                stream_options={"include_usage": True},
            )
            await F.await_for_event(stream_response.first_chunk_event)
            return stream_response
        else:
            if typed_parser and typed_parser not in typed_parser_registry:
                available = ", ".join(typed_parser_registry.keys())
                raise TypedParserNotFoundError(
                    f"Typed parser `{typed_parser}` not found. "
                    f"Available parsers: {available}"
                )
            response = await self._agenerate(
                **generation_params,
                typed_parser=typed_parser,
                generation_schema=generation_schema,
            )
            return response

cache_size instance-attribute

cache_size = cache_size

context_length instance-attribute

context_length = context_length

enable_cache instance-attribute

enable_cache = enable_cache

enable_thinking instance-attribute

enable_thinking = enable_thinking

model_id instance-attribute

model_id = model_id

parallel_tool_calls instance-attribute

parallel_tool_calls = parallel_tool_calls

reasoning_in_tool_call instance-attribute

reasoning_in_tool_call = reasoning_in_tool_call

reasoning_max_tokens instance-attribute

reasoning_max_tokens = reasoning_max_tokens

retry instance-attribute

retry = retry

return_reasoning instance-attribute

return_reasoning = return_reasoning

sampling_params instance-attribute

sampling_params = {'base_url': base_url or _get_base_url()}

sampling_run_params instance-attribute

sampling_run_params = sampling_run_params

validate_typed_parser_output instance-attribute

validate_typed_parser_output = validate_typed_parser_output

verbose instance-attribute

verbose = verbose

__call__

__call__(
    messages,
    *,
    system_prompt=None,
    prefilling=None,
    logprobs=None,
    top_logprobs=None,
    stream=False,
    generation_schema=None,
    tool_definitions=None,
    typed_parser=None,
)

Parameters:

Name Type Description Default
messages Union[str, List[Dict[str, Any]]]

Conversation history. Can be simple string or list of messages.

required
system_prompt Optional[str]

A set of instructions that defines the overarching behavior and role of the model across all interactions.

None
prefilling Optional[str]

Forces an initial message from the model. From that message it will continue its response from there.

None
logprobs Optional[bool]

Token log probability output for this request.

None
top_logprobs Optional[int]

Number of alternative tokens to return per generated token for this request.

None
stream Optional[bool]

Whether generation should be in streaming mode.

False
generation_schema Optional[Struct]

Schema that defines how the output should be structured.

None
tool_definitions Optional[ToolDefinitions]

Optional container with tool schemas, annotations, and tool-choice metadata. This is the single tool-calling entrypoint for the provider.

None
typed_parser Optional[str]

Converts the model raw output into a typed-dict. Supported parser: typed_xml.

None

Raises:

Type Description
ValueError

Raised if generation_schema and stream=True.

ValueError

Raised if typed_xml=True and stream=True.

Source code in src/msgflux/models/providers/openai.py
def __call__(
    self,
    messages: Union[str, List[Dict[str, Any]]],
    *,
    system_prompt: Optional[str] = None,
    prefilling: Optional[str] = None,
    logprobs: Optional[bool] = None,
    top_logprobs: Optional[int] = None,
    stream: Optional[bool] = False,
    generation_schema: Optional[msgspec.Struct] = None,
    tool_definitions: Optional[ToolDefinitions] = None,
    typed_parser: Optional[str] = None,
) -> Union[ModelResponse, ModelStreamResponse]:
    """Args:
        messages:
            Conversation history. Can be simple string or list of messages.
        system_prompt:
            A set of instructions that defines the overarching behavior
            and role of the model across all interactions.
        prefilling:
            Forces an initial message from the model. From that message
            it will continue its response from there.
        logprobs:
            Token log probability output for this request.
        top_logprobs:
            Number of alternative tokens to return per generated token
            for this request.
        stream:
            Whether generation should be in streaming mode.
        generation_schema:
            Schema that defines how the output should be structured.
        tool_definitions:
            Optional container with tool schemas, annotations, and
            tool-choice metadata. This is the single tool-calling entrypoint
            for the provider.
        typed_parser:
            Converts the model raw output into a typed-dict. Supported parser:
            `typed_xml`.

    Raises:
        ValueError:
            Raised if `generation_schema` and `stream=True`.
        ValueError:
            Raised if `typed_xml=True` and `stream=True`.
    """
    self._validate_chat_completion_options(
        prefilling=prefilling,
        logprobs=logprobs,
        top_logprobs=top_logprobs,
        generation_schema=generation_schema,
        typed_parser=typed_parser,
        stream=stream,
    )
    is_flow_control = is_subclass_of(generation_schema, ToolFlowControl)
    generation_params = self._build_generation_params(
        messages,
        system_prompt,
        prefilling,
        None if is_flow_control else tool_definitions,
        logprobs=logprobs,
        top_logprobs=top_logprobs,
    )
    if tool_definitions is not None:
        generation_params["tool_definitions"] = tool_definitions

    if stream is True:
        self._prepare_stream_kwargs(generation_params)
        stream_response = ModelStreamResponse(mode="sync")
        F.spawn(
            self._stream_generate,
            **generation_params,
            stream=stream,
            stream_response=stream_response,
            stream_options={"include_usage": True},
        )
        F.wait_for_event(stream_response.first_chunk_event)
        return stream_response
    else:
        if typed_parser and typed_parser not in typed_parser_registry:
            available = ", ".join(typed_parser_registry.keys())
            raise TypedParserNotFoundError(
                f"Typed parser `{typed_parser}` not found. "
                f"Available parsers: {available}"
            )
        response = self._generate(
            **generation_params,
            typed_parser=typed_parser,
            generation_schema=generation_schema,
        )
        return response

__init__

__init__(
    model_id,
    *,
    max_tokens=None,
    reasoning_effort=None,
    prompt_cache_retention=None,
    enable_thinking=None,
    return_reasoning=True,
    reasoning_in_tool_call=True,
    validate_typed_parser_output=False,
    temperature=None,
    top_p=None,
    stop=None,
    logprobs=None,
    top_logprobs=None,
    parallel_tool_calls=True,
    modalities=None,
    audio=None,
    verbosity=None,
    web_search_options=None,
    extra_body=None,
    verbose=False,
    base_url=None,
    context_length=None,
    reasoning_max_tokens=None,
    enable_cache=False,
    cache_size=128,
    retry=None,
)

model_id: Model ID in provider. max_tokens: An upper bound for the number of tokens that can be generated for a completion, including visible output tokens and reasoning tokens. reasoning_effort: Constrains effort on reasoning for reasoning models. Currently supported values are low, medium, and high. Reducing reasoning effort can result in faster responses and fewer tokens used on reasoning in a response. Can be: "minimal", "low", "medium" or "high". prompt_cache_retention: OpenAI-only prompt cache retention policy. Allowed values are "in_memory" and "24h". enable_thinking: If True, enable the model reasoning. return_reasoning: If the model returns the reasoning field it will be added along with the response. reasoning_in_tool_call: If True, maintains the reasoning for using the tool call. validate_typed_parser_output: If True, use the generation_schema to validate typed parser output. temperature: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. stop: Up to 4 sequences where the API will stop generating further tokens. The returned text will not contain the stop sequence. top_p: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. logprobs: Token log probability output. When enabled, the response metadata includes the token-level logprob payload. top_logprobs: Number of alternative tokens to return per generated token. Use with logprobs=True. parallel_tool_calls: If True, enable parallel tool calls. modalities: Types of output you would like the model to generate. Can be: ["text"], ["audio"] or ["text", "audio"]. audio: Audio configurations. Define voice and output format. verbosity: Constrains the verbosity of the model's response. Lower values will result in more concise responses, while higher values will result in more verbose responses. Currently supported values are low, medium, and high. web_search_options: This tool searches the web for relevant results to use in a response. OpenAI and OpenRouter only. extra_body: Provider-specific request body extensions forwarded to OpenAI-compatible clients. verbose: If True, Prints the model output to the console before it is transformed into typed structured output. base_url: URL to model provider. context_length: The maximum context length supported by the model. reasoning_max_tokens: OpenRouter-only maximum number of tokens for reasoning/thinking. This maps to extra_body={"reasoning": {"max_tokens": ...}} and cannot be combined with reasoning_effort. enable_cache: If True, enable response caching to avoid redundant API calls. cache_size: Maximum number of cached responses (default: 128).

Source code in src/msgflux/models/providers/openai.py
def __init__(  # noqa: C901
    self,
    model_id: str,
    *,
    max_tokens: Optional[int] = None,
    reasoning_effort: Optional[str] = None,
    prompt_cache_retention: Optional[Literal["in_memory", "24h"]] = None,
    enable_thinking: Optional[bool] = None,
    return_reasoning: Optional[bool] = True,
    reasoning_in_tool_call: Optional[bool] = True,
    validate_typed_parser_output: Optional[bool] = False,
    temperature: Optional[float] = None,
    top_p: Optional[float] = None,
    stop: Optional[Union[str, List[str]]] = None,
    logprobs: Optional[bool] = None,
    top_logprobs: Optional[int] = None,
    parallel_tool_calls: Optional[bool] = True,
    modalities: Optional[List[str]] = None,
    audio: Optional[Dict[str, str]] = None,
    verbosity: Optional[str] = None,
    web_search_options: Optional[Dict[str, Any]] = None,
    extra_body: Optional[Dict[str, Any]] = None,
    verbose: Optional[bool] = False,
    base_url: Optional[str] = None,
    context_length: Optional[int] = None,
    reasoning_max_tokens: Optional[int] = None,
    enable_cache: Optional[bool] = False,
    cache_size: Optional[int] = 128,
    retry: Optional[Any] = None,
):
    """Args:
    model_id:
        Model ID in provider.
    max_tokens:
        An upper bound for the number of tokens that can be
        generated for a completion, including visible output
        tokens and reasoning tokens.
    reasoning_effort:
        Constrains effort on reasoning for reasoning models.
        Currently supported values are low, medium, and high.
        Reducing reasoning effort can result in faster responses
        and fewer tokens used on reasoning in a response.
        Can be: "minimal", "low", "medium" or "high".
    prompt_cache_retention:
        OpenAI-only prompt cache retention policy.
        Allowed values are "in_memory" and "24h".
    enable_thinking:
        If True, enable the model reasoning.
    return_reasoning:
        If the model returns the `reasoning` field it will be added
        along with the response.
    reasoning_in_tool_call:
        If True, maintains the reasoning for using the tool call.
    validate_typed_parser_output:
        If True, use the generation_schema to validate typed parser output.
    temperature:
        What sampling temperature to use, between 0 and 2.
        Higher values like 0.8 will make the output more random,
        while lower values like 0.2 will make it more focused and
        deterministic.
    stop:
        Up to 4 sequences where the API will stop generating further
        tokens. The returned text will not contain the stop sequence.
    top_p:
        An alternative to sampling with temperature, called nucleus
        sampling, where the model considers the results of the tokens
        with top_p probability mass. So 0.1 means only the tokens
        comprising the top 10% probability mass are considered.
    logprobs:
        Token log probability output. When enabled, the response
        metadata includes the token-level logprob payload.
    top_logprobs:
        Number of alternative tokens to return per generated token.
        Use with `logprobs=True`.
    parallel_tool_calls:
        If True, enable parallel tool calls.
    modalities:
        Types of output you would like the model to generate.
        Can be: ["text"], ["audio"] or ["text", "audio"].
    audio:
        Audio configurations. Define voice and output format.
    verbosity:
        Constrains the verbosity of the model's response. Lower
        values will result in more concise responses, while higher
        values will result in more verbose responses. Currently
        supported values are low, medium, and high.
    web_search_options:
        This tool searches the web for relevant results to use in a response.
        OpenAI and OpenRouter only.
    extra_body:
        Provider-specific request body extensions forwarded to
        OpenAI-compatible clients.
    verbose:
        If True, Prints the model output to the console before it is transformed
        into typed structured output.
    base_url:
        URL to model provider.
    context_length:
        The maximum context length supported by the model.
    reasoning_max_tokens:
        OpenRouter-only maximum number of tokens for reasoning/thinking.
        This maps to ``extra_body={"reasoning": {"max_tokens": ...}}``
        and cannot be combined with ``reasoning_effort``.
    enable_cache:
        If True, enable response caching to avoid redundant API calls.
    cache_size:
        Maximum number of cached responses (default: 128).
    """
    super().__init__()
    self.model_id = model_id
    self.context_length = context_length
    self.reasoning_max_tokens = reasoning_max_tokens
    self.enable_cache = enable_cache
    self.cache_size = cache_size
    self.sampling_params = {"base_url": base_url or self._get_base_url()}
    sampling_run_params = {"max_tokens": max_tokens}
    if temperature:
        sampling_run_params["temperature"] = temperature
    if top_p:
        sampling_run_params["top_p"] = top_p
    if stop:
        sampling_run_params["stop"] = stop
    if self.provider == "openai" and logprobs is not None:
        sampling_run_params["logprobs"] = logprobs
    if self.provider == "openai" and top_logprobs is not None:
        sampling_run_params["top_logprobs"] = top_logprobs
    if verbosity:
        sampling_run_params["verbosity"] = verbosity
    if modalities:
        sampling_run_params["modalities"] = modalities
    if web_search_options:
        sampling_run_params["web_search_options"] = web_search_options
    if extra_body is not None:
        sampling_run_params["extra_body"] = dict(extra_body)
    if audio:
        sampling_run_params["audio"] = audio
    if reasoning_effort:
        sampling_run_params["reasoning_effort"] = reasoning_effort
    if self.provider == "openrouter" and reasoning_max_tokens is not None:
        if reasoning_effort is not None:
            raise ValueError(
                "`reasoning_max_tokens` cannot be used together with "
                "`reasoning_effort` for OpenRouter."
            )
        sampling_run_params["reasoning_max_tokens"] = reasoning_max_tokens
    if self.provider == "openai" and prompt_cache_retention is not None:
        sampling_run_params["prompt_cache_retention"] = prompt_cache_retention
    self.sampling_run_params = sampling_run_params
    self.enable_thinking = enable_thinking
    self.parallel_tool_calls = parallel_tool_calls
    self.reasoning_in_tool_call = reasoning_in_tool_call
    self.validate_typed_parser_output = validate_typed_parser_output
    self.return_reasoning = return_reasoning
    self.verbose = verbose
    self.retry = retry
    self._initialize()
    self._get_api_key()

acall async

acall(
    messages,
    *,
    system_prompt=None,
    prefilling=None,
    logprobs=None,
    top_logprobs=None,
    stream=False,
    generation_schema=None,
    tool_definitions=None,
    typed_parser=None,
)

Async version of call. Args: messages: Conversation history. Can be simple string or list of messages. system_prompt: A set of instructions that defines the overarching behavior and role of the model across all interactions. prefilling: Forces an initial message from the model. From that message it will continue its response from there. logprobs: Token log probability output for this request. top_logprobs: Number of alternative tokens to return per generated token for this request. stream: Whether generation should be in streaming mode. generation_schema: Schema that defines how the output should be structured. tool_definitions: Optional container with tool schemas, annotations, and tool-choice metadata. This is the single tool-calling entrypoint for the provider. typed_parser: Converts the model raw output into a typed-dict. Supported parser: typed_xml.

Raises:

Type Description
ValueError

Raised if generation_schema and stream=True.

ValueError

Raised if typed_xml=True and stream=True.

Source code in src/msgflux/models/providers/openai.py
async def acall(
    self,
    messages: Union[str, List[Dict[str, Any]]],
    *,
    system_prompt: Optional[str] = None,
    prefilling: Optional[str] = None,
    logprobs: Optional[bool] = None,
    top_logprobs: Optional[int] = None,
    stream: Optional[bool] = False,
    generation_schema: Optional[msgspec.Struct] = None,
    tool_definitions: Optional[ToolDefinitions] = None,
    typed_parser: Optional[str] = None,
) -> Union[ModelResponse, ModelStreamResponse]:
    """Async version of __call__. Args:
        messages:
            Conversation history. Can be simple string or list of messages.
        system_prompt:
            A set of instructions that defines the overarching behavior
            and role of the model across all interactions.
        prefilling:
            Forces an initial message from the model. From that message
            it will continue its response from there.
        logprobs:
            Token log probability output for this request.
        top_logprobs:
            Number of alternative tokens to return per generated token
            for this request.
        stream:
            Whether generation should be in streaming mode.
        generation_schema:
            Schema that defines how the output should be structured.
        tool_definitions:
            Optional container with tool schemas, annotations, and
            tool-choice metadata. This is the single tool-calling entrypoint
            for the provider.
        typed_parser:
            Converts the model raw output into a typed-dict. Supported parser:
            `typed_xml`.

    Raises:
        ValueError:
            Raised if `generation_schema` and `stream=True`.
        ValueError:
            Raised if `typed_xml=True` and `stream=True`.
    """
    self._validate_chat_completion_options(
        prefilling=prefilling,
        logprobs=logprobs,
        top_logprobs=top_logprobs,
        generation_schema=generation_schema,
        typed_parser=typed_parser,
        stream=stream,
    )
    is_flow_control = is_subclass_of(generation_schema, ToolFlowControl)
    generation_params = self._build_generation_params(
        messages,
        system_prompt,
        prefilling,
        None if is_flow_control else tool_definitions,
        logprobs=logprobs,
        top_logprobs=top_logprobs,
    )
    if tool_definitions is not None:
        generation_params["tool_definitions"] = tool_definitions

    if stream is True:
        self._prepare_stream_kwargs(generation_params)
        stream_response = ModelStreamResponse(mode="async")
        await F.aspawn(
            self._astream_generate,
            **generation_params,
            stream=stream,
            stream_response=stream_response,
            stream_options={"include_usage": True},
        )
        await F.await_for_event(stream_response.first_chunk_event)
        return stream_response
    else:
        if typed_parser and typed_parser not in typed_parser_registry:
            available = ", ".join(typed_parser_registry.keys())
            raise TypedParserNotFoundError(
                f"Typed parser `{typed_parser}` not found. "
                f"Available parsers: {available}"
            )
        response = await self._agenerate(
            **generation_params,
            typed_parser=typed_parser,
            generation_schema=generation_schema,
        )
        return response

VLLMChatCompletion

Bases: _BaseVLLM, OpenAIChatCompletion

vLLM Chat Completion.

Source code in src/msgflux/models/providers/vllm.py
@register_model
class VLLMChatCompletion(_BaseVLLM, OpenAIChatCompletion):
    """vLLM Chat Completion."""

    def _adapt_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        response_format = params.pop("response_format", None)
        extra_body = dict(params.get("extra_body") or {})

        if response_format is not None:
            extra_body["guided_json"] = response_format

        if self.enable_thinking is not None:
            extra_body["chat_template_kwargs"] = {
                "enable_thinking": self.enable_thinking
            }

        params["extra_body"] = extra_body
        return params

OpenRouterChatCompletion

Bases: _BaseOpenRouter, OpenAIChatCompletion

OpenRouter Chat Completion.

Source code in src/msgflux/models/providers/openrouter.py
@register_model
class OpenRouterChatCompletion(_BaseOpenRouter, OpenAIChatCompletion):
    """OpenRouter Chat Completion."""

    def _adapt_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
        extra_body = dict(params.get("extra_body") or {})
        plugins = []

        if params["tool_choice"] is None:
            if params["tools"] is not None:
                params["tool_choice"] = "auto"
            else:
                params["tool_choice"] = "none"

        reasoning_effort = params.pop("reasoning_effort", None)
        reasoning_max_tokens = params.pop("reasoning_max_tokens", None)
        if reasoning_effort is not None and reasoning_max_tokens is not None:
            raise ValueError(
                "`reasoning_max_tokens` cannot be used together with "
                "`reasoning_effort` for OpenRouter."
            )
        if reasoning_effort is not None:
            extra_body["reasoning"] = {"effort": reasoning_effort}
        if reasoning_max_tokens is not None:
            extra_body["reasoning"] = {"max_tokens": reasoning_max_tokens}

        # For non-OpenAI models enable web-search plugin
        web_search_options = params.get("web_search_options", None)
        if web_search_options is not None and "openai" not in params["model"]:
            params.pop("web_search_options")
            web_pluging = {"id": "web"}
            web_pluging.update(web_search_options)
            plugins.append(web_pluging)

        if plugins:
            extra_body["plugins"] = plugins

        params["extra_body"] = extra_body
        params["extra_headers"] = {
            "HTTP-Referer": "msgflux.com",
            "X-Title": "msgflux",
        }
        return params